一、MQ介绍
消息队列 Message Queue,是基础数据结构中“先进先出”的一种数据机构。把要传输的数据(消息)放在队列中,队列机制来实现消息传递——生产者生产消息并把消息放入队列,然后由消费者处理;消费者可以到指定队列拉取消息,或者订阅响应的队列,由MQ服务端给其推送消息。
参考:https://www.zhihu.com/question/54152397
消息队列可以简单理解为:把要传输的数据放到队列中
二、消息队列的好处
1. 解耦
2. 异步
3. 限流
三、队列(Queue)和主题(Topic)的区别
4.1 Queue消息递归模型,即点对点(Point-to-Point,简称PTP)
通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。
4.2 Topic消息传递模型,即发布/订阅(publish/subscribe,简称pub/sub)
通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。
四、通过Python发送接收MQ消息
4.1 stomp协议库的安装
链接:https://pypi.org/project/stomp.py/
4.2 创建监听名称
4.3 建立连接
如何查看stomp协议的端口号??
从配置中查看,查看路径 ..AcitveMQconfactivemq.xml
4.4 发送/接收消息
4.4.1 queue队列
4.4.2 topic消息
Note:
destination中通过 queue或topic来区分是队列还是主题
主题模型下,需要提前订阅,所以要先接收主题消息,而且这个接收模式是在整个连接过程中生效的
五、代码
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import time import stomp class MyListener(object): def on_error(self, headers, message): print('received an error {}'.format(message)) def on_message(self, headers, message): print('received a message {}'.format(message)) conn = stomp.Connection([('30.1.1.57',22030)]) #第一个参数是MQIP,第二个参数是该MQ对应stomp协议的端口号 conn.set_listener('MyListener', MyListener()) #第一个参数为这个监听的名称,可以任意取值 conn.start() conn.connect('admin','admin', wait=True) #用户名和密码 # # 发送消息到testQueue队列,id可以任意填写 # conn.send(body=b'hahah', destination='/queue/testQueue') # # 从testQueue队列中接收消息 # conn.subscribe(destination='/queue/testQueue', id='1') # time.sleep(2) # #断开连接 # conn.disconnect() content = '''"incinf":{ "calltime":"2020-07-16 08:30:00", "telnum":"15622811986", "eventaddress":"朗山路11号源证创业大厦5楼002", "stationhouse":"450312000000", "stationhousename":"漓江分局", "casetype":"治安案、事件", "caseport":"打架斗殴", "casekind":"聚众斗殴", "content":"实验室有人打架斗殴,发生流血事件002", "eventlevel":"二级警情", "operateseatno":"7001", "operateloginname":"漓江分局接警席2"} ''' # 从Topic.ProcessAlarmInfo主题中接收消息 conn.subscribe(destination='/topic/Topic.ProcessAlarmInfo',id='test') # 发送消息到Topic.ProcessAlarmInfo主题 conn.send(body=content, destination='/topic/Topic.ProcessAlarmInfo') # # 发送消息到Topic.ProcessAlarmInfo主题 # conn.send(body=content, destination='/topic/Topic.ProcessAlarmInfo') time.sleep(2) conn.disconnect()
补充:临时队列
临时队列,SMP中全量更新信息保存在MQ的临时队列中,临时队列无法通过MQ页面查看,必须通过jconsole查看,临时队列在断开连接会释放掉。