亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python38 RabbitMQ 消息隊列

發布時間:2020-07-30 04:51:29 來源:網絡 閱讀:912 作者:代碼老兵 欄目:編程語言

title: Python38 RabbitMQ
tags: Python學習
grammar_cjkRuby: true


RabbitMQ 消息隊列介紹

RabbitMQ是一種消息隊列,與線程queue和進程QUEUE作用是一樣的。
RabbitMQ是一個中間程序,可以實現不同進程之間的通信(比如python和Java之間,QQ和Word之間等);
普通情況下A進程與B進程之間通信,兩者之間需要建立很多連接和單獨寫一些代碼,但是使用RabbitMQ的話就可以實現幫助不同進程之間的數據通信。
A進程交給RabbitMQ,RabbitMQ在交給B,同樣B交給RabbitMQ,RabbitMQ在交給A,RabbitMQ可以實現A與B進程之間的連接和信息轉換。
使用RabbitMQ可以實現很多個獨立進程之間的交互,所有其他獨立進程都可以用RabbitMQ作為中間程序。

py 消息隊列:
線程 queue(同一進程下線程之間進行交互)
進程 Queue(父子進程進行交互 或者 同屬于同一進程下的多個子進程進行交互)

如果是兩個完全獨立的python程序,也是不能用上面兩個queue進行交互的,或者和其他語言交互有哪些實現方式呢。
【Disk、Socket、其他中間件】這里中間件不僅可以支持兩個程序之間交互,可以支持多個程序,可以維護好多個程序的隊列。
雖然可以通過硬盤的方式實現多個獨立進程交互,但是硬盤速度比較慢,而RabbitMQ則能夠很好的、快速的幫助兩個獨立進程實現交互。

像這種公共的中間件有好多成熟的產品:
RabbitMQ
ZeroMQ
ActiveMQ
……

RabbitMQ:erlang語言 開發的。
Python中連接RabbitMQ的模塊:pika 、Celery(分布式任務隊列) 、haigha
可以維護很多的隊列
其中pika是RabbitMQ常用的模塊

RabbitMQ 教程官網:http://www.rabbitmq.com/getstarted.html

幾個概念說明:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務

Python38 RabbitMQ 消息隊列
RabbitMQ不像之前學的python Queue都在一個隊列里實現交互,RabbitMQ有多個隊列(圖中紅色部分代表隊列),每個隊列都可以將消息發給多個接收端(C是接收端,P是生產消息端)

RabbitMQ基本示例.

1、Rabbitmq 安裝

Windos系統

pip install pika

ubuntu系統

install  rabbitmq-server  # 直接搞定

以下centos系統
1)Install Erlang

# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm

yum install erlang

2)Install RabbitMQ Server

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm

3)use RabbitMQ Server

chkconfig rabbitmq-server on
service rabbitmq-server stop/start
或者
rabbitmq-server start

Python38 RabbitMQ 消息隊列

rabbitmq已經開啟,等待傳輸

2、基本示例

發送端 producer


import pika

# 建立一個實例;相當于建立一個socket。
#通過ctrl+ConnectionParameters可以看到能傳很多參數,如果遠程還可以傳用戶名密碼。
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  # 默認端口5672,可不寫
    )
# 聲明一個管道,在管道里發消息
channel = connection.channel()
# 在管道里聲明一個叫hello的queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello',  # queue名字,將消息發給hello這個queue
                      body='Hello World!')  # 消息內容
print(" [x] Sent 'Hello World!'")
connection.close()  # 發完消息后關閉隊列 

執行結果:

[x] Sent 'Hello World!'

注意一定要開啟rabbitmq,否則會報錯

接收端 consumer

import pika
import time

# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 聲明管道
channel = connection.channel()

# 為什么又聲明了一個‘hello’隊列?
# 如果這個queue確定已經聲明了,可以不聲明。但是你不知道是生產者還是消費者先運行,所以要聲明兩次。如果消費者沒聲明,且消費者先運行的話,就會報錯。
# 生產者先聲明queue,消費者不聲明,但是消費者后運行就不會報錯。
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  # 四個參數為標準格式
    print(ch, method, properties)  # 打印看一下是什么
    # ch是管道內存對象地址;method是內容相關信息  properties后面講  body消息內容
    print(" [x] Received %r" % body)
    #time.sleep(15)
    #ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume(  # 消費消息
        'hello',  # 你要從哪個隊列里收消息 
        callback,  # 如果收到消息,就調用callback函數來處理消息  # 注意調用的函數(callback)以前在basic_consume模塊是放在形參第一個位置的,后面修改到第二個位置了,如果放錯位置會報錯
        # auto_ack=True  # 寫的話,如果接收消息,機器宕機消息就丟了
        # 一般不寫。宕機則生產者檢測到發給其他消費者
        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 開始消費消息(開始接收消息,一直收,如果沒消息就卡主在這里了)

執行結果:
 [*] Waiting for messages. To exit press CTRL+C
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f715d76f128> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.b728277178e746118699d5b4302a0314', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])> <BasicProperties>
 [x] Received b'Hello World!'

收到了bytes格式的 Hello World!

Python38 RabbitMQ 消息隊列

消費者(接收端)這邊看到已經卡主了

Python38 RabbitMQ 消息隊列

如果此時單獨在運行一下生產者(發送端),直接可以從消費者看到新收到的消息


rabbitmq 消息分發輪詢

Python38 RabbitMQ 消息隊列
重新開啟rabbitmq

Python38 RabbitMQ 消息隊列
運行三個接收者(消費者)

Python38 RabbitMQ 消息隊列
運行發送者,可以看到被第一個接收者給收到信息了

Python38 RabbitMQ 消息隊列
第二次運行發送者,第二個接收者收到信息了

Python38 RabbitMQ 消息隊列
第三次運行發送者,第三個接收者收到信息了

上面幾次運行說明了,依次的將信息發送每一個接收者


接收端 consumer

import pika
import time

# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 聲明管道
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    # 正常回調函數(callback)執行完成就表示信息接收完成,如果在還沒執行完成時就出現異常就表示信息沒有正常接收,比如斷網、斷電等,會導致信息不能正常接收。
    # 下面sleep 60秒,在60秒之前就將該模塊終止執行來模擬異常情況。
    time.sleep(60)  
    #ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume(  
        'hello',  
        callback, 
        # auto_ack=True 表示不管消息是否接收(處理)完成,都不會回復確認消息
        # 如果producer不關心 comsumer是否處理完,可以使用該參數
        # 但是一般都不會使用它

        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

Python38 RabbitMQ 消息隊列
在centos中重新執行rabbitmq-server start來清空隊列里的消息
然后在pycharm開啟三個comsumer,在去運行等待接收消息
再去執行producer來發送消息,執行producer后,立即關閉第一個comsumer,這樣消息就會因為第一個comsumer沒接收成功跑到第二個comsumer去,以此類推。

Python38 RabbitMQ 消息隊列
關閉第二個comsumer,第三個comsumer收到信息

Python38 RabbitMQ 消息隊列
這張圖是將三個comsumer同時都關閉了,這樣三個comsumer都收不到消息,說明producer的消息沒有被接收,此時再去開啟第一個comsumer,這時第一個comsumer會將消息給接收過來。

我們將sleep注釋掉,也是這種現象,這是因為comsumer并沒有發送確認消息給producer


import pika
import time

# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
# 聲明管道
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):  
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    time.sleep(30)  
    ch.basic_ack(delivery_tag = method.delivery_tag)   # 告訴生成者,消息處理完成

channel.basic_consume(  
        'hello',  
        callback, 
        # auto_ack=True  

        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

此時的代碼:當其中一個comsumer執行完成,并發送確認消息后再去中斷,下一個comsumer就不會收到消息;反之,如果還沒發送確認消息就中斷了,那么消息就會被下一個comsumer接收到。

rabbitmq 消息持久化

如果producer端宕機,那么隊列的數據也會消失;這樣就需要讓隊列消息持久化

# durable=True 該代碼只是將生成的隊列持久化(不是消息),如果producer宕機,隊列會存在,單消息會丟
# 要注意需要在producer端和 comsumer端都要 寫durable=True
channel.queue_declare(queue='hello',durable=True) 
在centos重新開啟 rabbitmq-server start

在producer端

將producer代碼執行三次,將三個消息放入隊列

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  
    )
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',  
                      body='Hello World!',
                      # 下面的代碼是讓消息持久化
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent 'Hello World!'")
connection.close()  

將producer代碼執行三次,將三個消息放入隊列

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)
def callback(ch, method, properties, body): 
    print(ch, method, properties) 

    print(" [x] Received %r" % body)
    # time.sleep(30) #注釋掉
    ch.basic_ack(delivery_tag = method.delivery_tag)  

channel.basic_consume( 
        'hello', 
        callback
        )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 

Python38 RabbitMQ 消息隊列
可以看到因為producer執行了三次,所以運行comsumer端收到了三條消息


  • 協商處理

producer端沒有改變

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost',5672)  
    )
channel = connection.channel()
channel.queue_declare(queue='hello',durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',  
                      body='Hello World!',
                      properties = pika.BasicProperties(delivery_mode=2)
                      )  
print(" [x] Sent 'Hello World!'")
connection.close()  

comsumer 1(消費者:1)

import pika 
import time 

connection = pika.BlockingConnection(pika.ConnectionParameters( 
               'localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello',durable=True) 
def callback(ch, method, properties, body):  
    print(ch, method, properties)  

    print(" [x] Received %r" % body) 
    # time.sleep(30) #注釋掉 
    ch.basic_ack(delivery_tag = method.delivery_tag)   

# channel.basic_qos可以使其消費者最多同時多少個消息;如果其中一個消費者處理慢(如:CPU處理性能低下),達到了最多處理的限制的話 生產者就不會再發送給該消費者。

channel.basic_qos(prefetch_count=1)  #這里限制最多同時只處理1個消息

channel.basic_consume(  
        'hello',  
        callback 
        ) 

print(' [*] Waiting for messages. To exit press CTRL+C') 
channel.start_consuming()  #  

Python38 RabbitMQ 消息隊列

此時有兩個comsumer模塊,comsumer2比comsumer1多用了sleep 30秒來模擬性能處理慢的情況


comsumer 2(消費者:2)
復制一個comsumer模塊為comsumer2

import pika 
import time 

connection = pika.BlockingConnection(pika.ConnectionParameters( 
               'localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello',durable=True) 
def callback(ch, method, properties, body):  
    print(ch, method, properties)  

    print(" [x] Received %r" % body) 
    time.sleep(30) #comsumer2這里sleep30秒
    ch.basic_ack(delivery_tag = method.delivery_tag)   

channel.basic_qos(prefetch_count=1)  

channel.basic_consume(  
        'hello',  
        callback 
        ) 

print(' [*] Waiting for messages. To exit press CTRL+C') 
channel.start_consuming()  #  

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列

我們運行兩個comsumer后,一直去運行producer。 可以看到comsumer 1接收到了3條信息,而comsumer 2只接收到了1條信息,這是因為comsumer 2 sleep了30秒來模擬信息處理慢的情況;
comsumer 1 和 comsumer 2都指定了同時只能處理1條信息,producer會與comsumer 2協商,因為comsumer2一直沒有處理完限制的1條信息,所以信息都被comsumer 1處理了。


Rabbitmq fanout廣播模式

Python38 RabbitMQ 消息隊列

新建fanout_publiser模塊,用于發送廣播的producer

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

# 定義一個轉發器叫logs,屬于一個中間人的角色,用于將producer的消息轉發給消費者(comsumer)
# 定義廣播類型使用fanout
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# message = ''.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)   # routing_key為空即可,因為是廣播沒有定義隊列,所以也不需要指定隊列,但這里必須要定義為空

print(" [x] Send %r " % message)

connection.close()

新建fanout_consumer模塊,用于接收廣播的消費者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費斷開后,自動將queue刪除
# 就是這里會隨機生成一個隨機的唯一queue,用完之后會將生成的queue刪除
# 這里要寫queue='',如果不指定隊列名字,但也要寫一個空的字符串,不然會報錯缺少參數
result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue  # 拿到隨機生成的queue名字

# producer綁定了logs轉發器
# 消費者將隨機生成的隊列也綁定了logs轉發器
# 這樣producer將消息交給logs轉發器,logs轉發器將消息交給對應綁定的隨機隊列,消費者從隊列里在拿消息
channel.queue_bind(exchange='logs',queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("[x] %r" % body)

channel.basic_consume(

    queue=queue_name, on_message_callback=callback

    # auto_ack=True  # 寫的話,如果接收消息,機器宕機消息就丟了
                        )

channel.start_consuming()

Python38 RabbitMQ 消息隊列
因為是廣播,所以兩個consumer都收到了發送者發送的消息。
不過有一點要注意!!!!!!!!!
要先運行consumer(接收者),在運行發送者。就好比收音機一樣,只有你先打開收音機,發送者才能將信息發給你。 如果發送者先發送,你卻沒有接收,之前發送的信息,你就不會再接收到了。


Rabbitmq direct廣播模式

Python38 RabbitMQ 消息隊列
direct 可以區分廣播,將指定的消息發送給指定的接收者;
圖中顯示了將error級別消息發送給C1,將info、error、warning級別消息發送給C2。

producer


import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 定義消息級別
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ''.join(sys.argv[2:])  or "direct info: Hello World!"
# message = "direct info: Hello World!"

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)   # routing_key為空即可,因為是廣播沒有定義隊列,所以也不需要指定隊列,但這里必須要定義為空

print(" [x] Send %r " % message)

connection.close()

consumer


import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue

# 獲取參數列表
log_levels = sys.argv[1:]

if not log_levels: # 如果沒有參數,就報錯,提示要指定消息級別
    sys.stderr.write("Usage: %s [info] [warning] [error] \n" % sys.argv[0])
    sys.exit(1) # 沒有參數就退出程序

# print(log_levels)

for severity in log_levels:  # 循環參數列表并綁定
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    ) #所有發送到severity的參數,都接收

print('[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print('[x] %r:%r' % (method.routing_key, body))

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

下面在centos中運行代碼

Python38 RabbitMQ 消息隊列
運行C1,只接收error的消息

Python38 RabbitMQ 消息隊列
運行C2,接收 info、warning、error的消息

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列
producer運行,指定發送消息給error,可以看到兩個consumer都接收到了error的消息

Python38 RabbitMQ 消息隊列
只有C2接收到了warning的消息


RabbitMQ topic細致的消息過濾廣播模式

producer


import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 定義消息級別
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'  # 發送*.info的信息

message = ''.join(sys.argv[2:])  or "topic info: Hello World!"
# message = "direct info: Hello World!"

channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)   

print(" [x] Send %r:%r " % (routing_key,message))

connection.close()

consumer


import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost') )

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='',exclusive=True)

queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
    sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

print('[*] Waiting for logs. To exit press CTRL+c')

def callback(ch, method, properties, body):
    print('[x %r:%r' % (method.routing_key, body))

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

Python38 RabbitMQ 消息隊列
圖中顯示過濾中間有".orange."的數據,過濾以rabbit為結尾的數據,過濾以lazy開頭的數據。

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列

運行了兩個consumer。C1接收.info為結尾的數據,C2接收.error為結尾和mysql為開頭的數據。

Python38 RabbitMQ 消息隊列
在運行publisher(已經定義了發送anonymous.info,相當于以.info為結尾的信息)

Python38 RabbitMQ 消息隊列
C1接收到了信息

Python38 RabbitMQ 消息隊列
執行publisher代碼時 后面加上 test.error,然后此時在去看C2

Python38 RabbitMQ 消息隊列
C2 看到test.error相關信息

Python38 RabbitMQ 消息隊列
執行publisher代碼 加上 mysql.info,這樣C1和C2都可以收到消息了

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列

Python38 RabbitMQ 消息隊列

運行C3,代碼后面加一個 '#' 符號,表示C3可以接收所有信息(注意#號要被引號括起來)

Python38 RabbitMQ 消息隊列
在publisher隨意發送信息,C3都能收到

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

怀集县| 永修县| 宣武区| 福鼎市| 博罗县| 博爱县| 新建县| 都安| 铜川市| 栖霞市| 宁南县| 绥棱县| 石嘴山市| 高青县| 广南县| 北宁市| 安宁市| 贵南县| 开原市| 泰顺县| 潮安县| 沙雅县| 文山县| 东乌| 靖边县| 马龙县| 浪卡子县| 景泰县| 阳城县| 裕民县| 江津市| 林州市| 苗栗市| 京山县| 车致| 肥城市| 西盟| 峡江县| 宁乡县| 尖扎县| 阳原县|