您好,登錄后才能下訂單哦!
ActiveMQ是一個非常流行的消息隊列服務中間件,實現JMS規范,基于STOMP協議(端口為61613)支持Python訪問。
JMS:Java Message Service
STOMP:Simple(or Streaming) Text Orientated Messaging Protocol,簡單(流)文本定向消息協議
JMS規范定義了2類消息發送接收模型:點對點queue,發布訂閱topic,區別是能夠重復消費和是否保存。
1,點對點queue:不可重復消費,消息被消費前一直保存。
生產者發送消息到queue,一個消費者取出并消費消息。
消息被消費后,queue中不再保存,所有只有一個消費者能夠取到消息。
queue支持多個消費者存在,但是一個消息只有一個消費者可以消費。
當前沒有消費者時,消息一直保存,直到被消費者消費。
2,發布訂閱topic:可重復消費,發布給所有訂閱者。
生產者發布消息到topic中,多個訂閱者收到并消費消息。
和queue不同,發布到topic中的消息會被所有訂閱者消費。
當生產者發布消息時,不管是否有訂閱者,都不保存消息。
JMS規范定義的2類消息傳輸模型queue和topic比較:
Queue | Topic | |
模型 | 點對點Point-to-Point | 發布訂閱publish/subscribe |
有無狀態 | queue消息在消費前被一直保存在mq服務器上的文件或者配置DB | topic數據默認不保存,是無狀態的。 |
完整性保障 | queue保證每條消息都被消費者接收到 | topic不保證生產者發布的每條消息都被訂閱者接收到 |
消息是否會丟失 | 生產者發送消息到queue,消費者接收到消息。如果沒有消費者,將一直保存,不會丟失。 | 生產者發布消息到topic時,當前的訂閱者都能夠接收到消息。如果當前沒有訂閱者,該消息就丟失。 |
消息發布接收策略 | 一對一的消息發布接收策略,一個生產者發送的消息只被一個消費者接收。mq服務器收到回復后,將這個消息刪除。 | 一對多的消息發布接收策略,同一個topic的多個訂閱者都能收到生產者發布的消息。 |
Python集成ActiveMQ使用stomp.py,只需簡單配置,本文在Django框架下進一步封裝服務mq_service.py。典型系統架構示意圖和消息隊列:
時序圖如下:
示例代碼:https://github.com/rickding/HelloPython/tree/master/hello_activemq
├── settings.py
├── mq
│ └── mq_service.py
│ └── mq_listener.py
├── tests
│ └── test_mq_service.py
├── management
│ └── commands
│ └── mq.py
一,Python集成ActiveMQ
代碼文件 | 功能要點 | |
Python集成ActiveMQ | requirements.txt | 安裝stomp.py: stomp.py >= 5.0.1 |
封裝服務 | mq_serivce.py | 封裝ActiveMQ的消息發送和處理功能。在Django框架下,將地址等配置在settings.py中集中管理,注意端口為61613 |
接收處理消息 | mq_listener.py | 增加消息接收處理類,繼承stomp.ConnectionListener |
啟動消息監聽服務 | mq.py | 在Django框架下,將啟動服務代碼封裝成command,方便調用和維護。 |
單元測試 | test_mq_serivce.py | 測試封裝的功能函數 |
功能調用 | views.py | 增加REST接口/chk/mq,調用mq_service發送消息 |
1. 新建Django項目,運行:django-admin startproject hello_activemq
2. 進到目錄hello_activemq,增加應用:python manage.py startapp app
項目的目錄文件結構如下:
3. 安裝stomp.py,pip install stomp.py >= 5.0.1
二,封裝服務mq_service.py,調用ActiveMQ發送消息
1. 增加mq_service.py:
import json
import logging
import stomp
from django.conf import settings
log = logging.getLogger(__name__)
def send_msg(msg_dict, queue_or_topic=settings.MQ_QUEUE):
conn = stomp.Connection10([(settings.MQ_URL, settings.MQ_PORT)])
conn.connect(settings.MQ_USER, settings.MQ_PASSWORD)
msg_str = json.dumps(msg_dict)
log.info('Send msg: %s, %s, %s' % (type(msg_dict), type(msg_str), msg_str))
conn.send(queue_or_topic, msg_str)
conn.disconnect()
2. 打開settings.py,配置ActiveMQ信息:
MQ_URL = '127.0.0.1'
MQ_PORT = 61613
MQ_USER = 'admin'
MQ_PASSWORD = 'admin'
MQ_QUEUE = '/queue/SampleQueue'
MQ_TOPIC = '/topic/SampleTopic'
3. 為了增加代碼的兼容和容錯能力,封裝get_conn(), close_conn()等輔助函數,詳見代碼文件mq_service.py。
三,接收處理消息mq_listener.py
1. 增加mq_listener.py,聲明消息處理類,繼承stomp.ConnectionListener:
import json
import logging
import stomp
log = logging.getLogger(__name__)
class MqListener(stomp.ConnectionListener):
def on_message(self, headers, msg_str):
log.info('Receive msg: %s, %s, %s' % (type(msg_str), msg_str, headers))
msg_dict = None
try:
msg_dict = json.loads(msg_str)
except Exception as e:
log.warning('Exception when parse msg: %s' % str(e))
log.info('Parsed msg: {}, {}'.format(type(msg_dict), msg_dict))
def on_error(self, headers, msg_str):
log.info('Error msg: %s, %s, %s' % (type(msg_str), msg_str, headers))
2. 在on_message()函數中,將消息字符串解析為json,方便業務處理。
3. 聲明on_error()函數處理錯誤信息。
四,啟動消息監聽服務mq.py
1. 將循環接收消息代碼封裝成函數consume_msg(),增加在服務中mq_serivce.py:
import logging
import time
import stomp
from django.conf import settings
log = logging.getLogger(__name__)
def consume_msg(listener, queue=settings.MQ_QUEUE, topic=settings.MQ_TOPIC):
conn = stomp.Connection10([(settings.MQ_URL, settings.MQ_PORT)])
conn.connect(settings.MQ_USER, settings.MQ_PASSWORD)
conn.set_listener('', listener)
conn.subscribe(queue)
conn.subscribe(topic)
while 1:
time.sleep(1000) # secs
conn.disconnect()
2. 調用set_listener()設置消息接收類實例,使用之前創建的MqListener
3. 調用subscribe()訂閱消息,啟動循環監聽。
4. 我們將啟動服務代碼封裝成command,在目錄management/commands中增加mq.py
import logging
from django.core.management.base import BaseCommand
from hello_activemq.mq import mq_service as mq
from hello_activemq.mq.mq_listener import MqListener
log = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'mq starts listener'
def handle(self, *args, **options):
log.info("mq starts")
return mq.consume_msg(MqListener())
5. 運行命令python manage.py mq,看到消息提示,啟動監聽服務成功。
五,單元測試test_mq_service.py
增加測試函數,發送消息:
import logging
from django.test import TestCase
from hello_activemq.mq import mq_service as mq
log = logging.getLogger(__name__)
class MQServiceTest(TestCase):
def test_send_msg(self):
msg_dict = {'content': 'test msg dict', 'msg': 'msg from python'}
mq.send_msg_to_queue(msg_dict)
mq.send_msg_to_topic({'msg': "test msg from python"})
運行python manage.py test,同時看到監聽服務收到并處理消息:
六,發送消息功能調用
1. 在views.py中發送消息,調用mq_servcie.py
import json
from django.http import HttpResponse
from hello_activemq.mq import mq_service as mq
def chk_mq(req):
msg_dict = {
'url': req.get_raw_uri(),
'path': req.get_full_path(),
'host': req.get_host(),
}
mq.send_msg_to_queue(msg_dict)
mq.send_msg_to_topic(msg_dict)
return HttpResponse(json.dumps(msg_dict))
2. 在urls.py中配置路由
from django.urls import path
from app.views import chk_mq
urlpatterns = [
path('', chk_mq, name='chk'),
]
3. 運行命令啟動服務:python manage.py runserver 0.0.0.0:8001
4. REST接口發送消息
七,常見問題和解決方法
1. 啟動服務錯誤:[transport.py: 787, attempt_connection] Could not connect to host 127.0.0.1, port 61613
解決:檢查ActiveMQ是否正常啟動,特別注意是否開啟STOMP協議端口61613
原因:Python連接ActiveMQ使用STOMP協議,端口默認61613
2. 發送消息時錯誤:TypeError: message should be a string or bytes, found <class 'dict'>
解決:將消息內容序列化為JSON,發送時調用json.dumps(),接收時調用json.loads()
原因:Python連接ActiveMQ使用的是STOMP協議,消息格式為簡單文本。
注:JMS規范定義的5類消息:
字符串TextMessage,
鍵值對MapMessage,
序列化對象ObjectMessage
字節流BytesMessage
數據流StreamMessage
ActiveMQ支持5類JMS消息,增加了二進制大文件消息BlobMessage:
3. 跨系統對接時接收到的消息類型不是TextMessage
Python開發的業務處理服務 -> Java開發的API服務,接收到的消息類型為BytesMessage,Python發送時設置conn.send('xx', msg_str, content_type="text/plain")仍然接收不到期望的類型TextMessage
解決:stomp建立連接時配置參數conn = stomp.Connection10([("localhost", 61613)], auto_content_length=False)
原因:Python連接ActiveMQ使用STOMP協議,消息格式為簡單文本,不攜帶類型信息,只通過header中的content-length來判斷TextMessage和BytesMessage,所以發送消息時不在header中添加content-length就可以了。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。