您好,登錄后才能下訂單哦!
前言
工作中經常用到rabbitmq,而用的語言主要是python,所以也就經常會用到python中的pika模塊,但是這個模塊的使用,也給我帶了很多問題,這里整理一下關于這個模塊我在使用過程的改變歷程已經中間碰到一些問題的解決方法
關于MQ:
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用于諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
剛開寫代碼的小菜鳥
在最開始使用這個rabbitmq的時候,因為本身業務需求,我的程序既需要從rabbitmq消費消息,也需要給rabbitmq發布消息,代碼的邏輯圖為如下:
下面是我的模擬代碼:
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import time import threading import os import json import datetime from multiprocessing import Process # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self, recv_serverid, send_serverid): # self.serverid = MQ_CONFIG.get("serverid") self.exchange = MQ_CONFIG.get("exchange") self.channel = None self.connection = None self.recv_serverid = recv_serverid self.send_serverid = send_serverid def reconnect(self): if self.connection and not self.connection.is_closed(): self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) def consumer_callback(self, channel, method, properties, body): """ 消費消息 :param channel: :param method: :param properties: :param body: :return: """ channel.basic_ack(delivery_tag=method.delivery_tag) process_id = os.getpid() print("current process id is {0} body is {1}".format(process_id, body)) def publish_message(self, to_serverid, message): """ 發布消息 :param to_serverid: :param message: :return: """ message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) def run(self): while True: self.channel.start_consuming() @classmethod def get_instance(cls, *args, **kwargs): """ 單例模式 :return: """ if not hasattr(cls, "_instance"): with cls._instance_lock: if not hasattr(cls, "_instance"): cls._instance = cls(*args, **kwargs) return cls._instance def process1(recv_serverid, send_serverid): """ 用于測試同時訂閱和發布消息 :return: """ # 線程1 用于去 從rabbitmq消費消息 rabbitmq_server = RabbitMQServer.get_instance(recv_serverid, send_serverid) rabbitmq_server.reconnect() recv_threading = threading.Thread(target=rabbitmq_server.run) recv_threading.start() i = 1 while True: # 主線程去發布消息 message = {"value": i} rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) i += 1 time.sleep(0.01) class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 進程1 用于模擬模擬程序1 p = Process(target=process1, args=(recv_serverid, send_serverid, )) p.start() # 主進程用于模擬程序2 process1(send_serverid, recv_serverid)
上面是我的將我的實際代碼更改的測試模塊,其實就是模擬實際業務中,我的rabbitmq模塊既有訂閱消息,又有發布消息的時候,同時,訂閱消息和發布消息用的同一個rabbitmq連接的同一個channel
但是這段代碼運行之后基本沒有運行多久就會看到如下錯誤信息:
Traceback (most recent call last): File "/app/python3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/app/python3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Traceback (most recent call last): File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 144, in <module> process1(send_serverid, recv_serverid) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 109, in process1 rabbitmq_server.publish_message(rabbitmq_server.send_serverid,message) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 76, in publish_message self.channel.basic_publish(exchange=self.exchange, routing_key=to_serverid, body=message) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish mandatory, immediate) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish immediate=immediate) File "/app/python3/lib/python3.6/site-packages/pika/channel.py", line 415, in basic_publish raise exceptions.ChannelClosed() pika.exceptions.ChannelClosed Exception in thread Thread-1: Traceback (most recent call last): File "/app/python3/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/app/python3/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "/app/py_code/\udce5\udc85\udcb3\udce4\udcba\udc8erabbitmq\udce9\udc97\udcae\udce9\udca2\udc98/low_rabbitmq.py", line 80, in run self.channel.start_consuming() File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming self.connection.process_data_events(time_limit=None) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events self._flush_output(common_terminator) File "/app/python3/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output result.reason_text) pika.exceptions.ConnectionClosed: (505, 'UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead')
而這個時候你查看rabbitmq服務的日志信息,你會看到兩種情況的錯誤日志如下:
情況一:
=INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::18:32:37 === accepting AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:32:38 === AMQP connection <0.19446.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:32:38 === closing AMQP connection <0.19446.2> (192.168.90.11:42946 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::18:33:59 === AMQP connection <0.19439.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content header for class 60, got non content header frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::18:33:59 === closing AMQP connection <0.19439.2> (192.168.90.11:42942 -> 192.168.90.11:5672)
情況二:
=INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =INFO REPORT==== 12-Oct-2018::17:41:28 === accepting AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:41:29 === AMQP connection <0.19045.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected content body, got non content body frame instead", 'basic.publish'} =INFO REPORT==== 12-Oct-2018::17:41:29 === closing AMQP connection <0.19045.2> (192.168.90.11:33004 -> 192.168.90.11:5672) =ERROR REPORT==== 12-Oct-2018::17:42:23 === AMQP connection <0.19052.2> (running), channel 1 - error: {amqp_error,unexpected_frame, "expected method frame, got non method frame instead",none} =INFO REPORT==== 12-Oct-2018::17:42:23 === closing AMQP connection <0.19052.2> (192.168.90.11:33008 -> 192.168.90.11:5672)
對于這種情況我查詢了很多資料和文檔,都沒有找到一個很好的答案,查到關于這個問題的連接有:
https://stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-frame
http://rabbitmq.1065348.n5.nabble.com/UNEXPECTED-FRAME-expected-content-header-for-class-60-got-non-content-header-frame-instead-td34981.html
這個問題其他人碰到的也不少,不過查了最后的解決辦法基本都是創建兩個rabbitmq連接,一個連接用于訂閱消息,一個連接用于發布消息,這種情況的時候,就不會出現上述的問題
在這個解決方法之前,我測試了用同一個連接,不同的channel,讓訂閱消息用一個channel, 發布消息用另外一個channel,但是在測試過程依然會出現上述的錯誤。
有點寫代碼能力了
最后我也是選擇了用兩個連接的方法解決出現上述的問題,現在是一個測試代碼例子:
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import os from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: self.reconnect() self.channel.start_consuming() @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 這里分別用兩個線程去連接和發送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 這里也是用兩個連接去連接和發送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)
上面代碼中我分別用了兩個連接去訂閱和發布消息,同時另外一對訂閱發布也是用的兩個連接來執行訂閱和發布,這樣當再次運行程序之后,就不會在出現之前的問題
關于斷開重連
上面的代碼雖然不會在出現之前的錯誤,但是這個程序非常脆弱,當rabbitmq服務重啟或者斷開之后,程序并不會有重連接的機制,所以我們需要為代碼添加重連機制,這樣即使rabbitmq服務重啟了或者
rabbitmq出現異常我們的程序也能進行重連機制
#! /usr/bin/env python3 # .-*- coding:utf-8 .-*- import pika import threading import json import datetime import time from pika.exceptions import ChannelClosed from pika.exceptions import ConnectionClosed # rabbitmq 配置信息 MQ_CONFIG = { "host": "192.168.90.11", "port": 5672, "vhost": "/", "user": "guest", "passwd": "guest", "exchange": "ex_change", "serverid": "eslservice", "serverid2": "airservice" } class RabbitMQServer(object): _instance_lock = threading.Lock() def __init__(self): self.recv_serverid = "" self.send_serverid = "" self.exchange = MQ_CONFIG.get("exchange") self.connection = None self.channel = None def reconnect(self): try: if self.connection and not self.connection.is_closed: self.connection.close() credentials = pika.PlainCredentials(MQ_CONFIG.get("user"), MQ_CONFIG.get("passwd")) parameters = pika.ConnectionParameters(MQ_CONFIG.get("host"), MQ_CONFIG.get("port"), MQ_CONFIG.get("vhost"), credentials) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.exchange, exchange_type="direct") if isinstance(self, RabbitComsumer): result = self.channel.queue_declare(queue="queue_{0}".format(self.recv_serverid), exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=self.exchange, queue=queue_name, routing_key=self.recv_serverid) self.channel.basic_consume(self.consumer_callback, queue=queue_name, no_ack=False) except Exception as e: print(e) class RabbitComsumer(RabbitMQServer): def __init__(self): super(RabbitComsumer, self).__init__() def consumer_callback(self, ch, method, properties, body): """ :param ch: :param method: :param properties: :param body: :return: """ ch.basic_ack(delivery_tag=method.delivery_tag) process_id = threading.current_thread() print("current process id is {0} body is {1}".format(process_id, body)) def start_consumer(self): while True: try: self.reconnect() self.channel.start_consuming() except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, recv_serverid): consumer = cls() consumer.recv_serverid = recv_serverid consumer.start_consumer() class RabbitPublisher(RabbitMQServer): def __init__(self): super(RabbitPublisher, self).__init__() def start_publish(self): self.reconnect() i = 1 while True: message = {"value": i} message = dict_to_json(message) try: self.channel.basic_publish(exchange=self.exchange, routing_key=self.send_serverid, body=message) i += 1 except ConnectionClosed as e: self.reconnect() time.sleep(2) except ChannelClosed as e: self.reconnect() time.sleep(2) except Exception as e: self.reconnect() time.sleep(2) @classmethod def run(cls, send_serverid): publish = cls() publish.send_serverid = send_serverid publish.start_publish() class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime.datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, datetime.date): return obj.strftime("%Y-%m-%d") else: return json.JSONEncoder.default(self, obj) def dict_to_json(po): jsonstr = json.dumps(po, ensure_ascii=False, cls=CJsonEncoder) return jsonstr def json_to_dict(jsonstr): if isinstance(jsonstr, bytes): jsonstr = jsonstr.decode("utf-8") d = json.loads(jsonstr) return d if __name__ == '__main__': recv_serverid = MQ_CONFIG.get("serverid") send_serverid = MQ_CONFIG.get("serverid2") # 這里分別用兩個線程去連接和發送 threading.Thread(target=RabbitComsumer.run, args=(recv_serverid,)).start() threading.Thread(target=RabbitPublisher.run, args=(send_serverid,)).start() # 這里也是用兩個連接去連接和發送, threading.Thread(target=RabbitComsumer.run, args=(send_serverid,)).start() RabbitPublisher.run(recv_serverid)
上面的代碼運行運行之后即使rabbitmq的服務出問題了,但是當rabbitmq的服務好了之后,我們的程序依然可以重新進行連接,但是上述這種實現方式運行了一段時間之后,因為實際的發布消息的地方的消息是從其他線程或進程中獲取的數據,這個時候你可能通過queue隊列的方式實現,這個時候你的queue中如果長時間沒有數據,在一定時間之后來了數據需要發布出去,這個時候你發現,你的程序會提示連接被rabbitmq 服務端給斷開了,但是畢竟你設置了重連機制,當然也可以重連,但是這里想想為啥會出現這種情況,這個時候查看rabbitmq的日志你會發現出現了如下錯誤:
=ERROR REPORT==== 8-Oct-2018::15:34:19 ===
closing AMQP connection <0.30112.1> (192.168.90.11:54960 -> 192.168.90.11:5672):
{heartbeat_timeout,running}
這是我之前測試環境的日志截取的,可以看到是因為這個錯誤導致的,后來查看pika連接rabbitmq的連接參數中有這么一個參數
這個參數默認沒有設置,那么這個heatbeat的心跳時間,默認是不設置的,如果不設置的話,就是根絕服務端設置的,因為這個心跳時間是和服務端進行協商的結果
當這個參數設置為0的時候則表示不發送心跳,服務端永遠不會斷開這個連接,所以這里我為了方便我給發布消息的線程的心跳設置為0,并且我這里,我整理通過抓包,看一下服務端和客戶端的協商過程
從抓包分析中可以看出服務端和客戶端首先協商的是580秒,而客戶端回復的是:
這樣這個連接就永遠不會斷了,但是如果我們不設置heartbeat這個值,再次抓包我們會看到如下
從上圖我們可以刪除最后服務端和客戶端協商的結果就是580,這樣當時間到了之后,如果沒有數據往來,那么就會出現連接被服務端斷開的情況了
特別注意
需要特別注意的是,經過我實際測試python的pika==0.11.2 版本及以下版本設置heartbeat的不生效的,只有0.12.0及以上版本設置才能生效
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。