zoukankan      html  css  js  c++  java
  • 消息总线

    一、MQ介绍

    消息队列 Message Queue,是基础数据结构中“先进先出”的一种数据机构。把要传输的数据(消息)放在队列中,队列机制来实现消息传递——生产者生产消息并把消息放入队列,然后由消费者处理;消费者可以到指定队列拉取消息,或者订阅响应的队列,由MQ服务端给其推送消息。

    参考:https://www.zhihu.com/question/54152397

    消息队列可以简单理解为:把要传输的数据放到队列中

    二、消息队列的好处

    1.     解耦

    2.     异步

     3.     限流

    三、队列(Queue)和主题(Topic)的区别

    4.1 Queue消息递归模型,即点对点(Point-to-Point,简称PTP)

    通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。

    4.2 Topic消息传递模型,即发布/订阅(publish/subscribe,简称pub/sub)

           通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。 

    四、通过Python发送接收MQ消息

    4.1 stomp协议库的安装

    链接:https://pypi.org/project/stomp.py/

    4.2 创建监听名称

     

     4.3 建立连接

     

     如何查看stomp协议的端口号??

    从配置中查看,查看路径 ..AcitveMQconfactivemq.xml

     

    4.4 发送/接收消息

    4.4.1 queue队列

     

    4.4.2 topic消息

     

    Note:

      destination中通过 queue或topic来区分是队列还是主题

      主题模型下,需要提前订阅,所以要先接收主题消息,而且这个接收模式是在整个连接过程中生效的

    五、代码

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import sys
    import time
    import stomp
    
    class MyListener(object):
        def on_error(self, headers, message):
            print('received an error {}'.format(message))
        def on_message(self, headers, message):
            print('received a message {}'.format(message))
    
    conn = stomp.Connection([('30.1.1.57',22030)])   #第一个参数是MQIP,第二个参数是该MQ对应stomp协议的端口号
    conn.set_listener('MyListener', MyListener())   #第一个参数为这个监听的名称,可以任意取值
    conn.start()
    conn.connect('admin','admin', wait=True)    #用户名和密码
    
    # # 发送消息到testQueue队列,id可以任意填写
    # conn.send(body=b'hahah', destination='/queue/testQueue')
    # # 从testQueue队列中接收消息
    # conn.subscribe(destination='/queue/testQueue', id='1')
    # time.sleep(2)
    # #断开连接
    # conn.disconnect()
    
    content = '''"incinf":{
            "calltime":"2020-07-16 08:30:00",
            "telnum":"15622811986",
            "eventaddress":"朗山路11号源证创业大厦5楼002",
            "stationhouse":"450312000000",
            "stationhousename":"漓江分局",
            "casetype":"治安案、事件",
            "caseport":"打架斗殴",
            "casekind":"聚众斗殴",
            "content":"实验室有人打架斗殴,发生流血事件002",
            "eventlevel":"二级警情",
            "operateseatno":"7001",
            "operateloginname":"漓江分局接警席2"} '''
    # 从Topic.ProcessAlarmInfo主题中接收消息
    conn.subscribe(destination='/topic/Topic.ProcessAlarmInfo',id='test')
    # 发送消息到Topic.ProcessAlarmInfo主题
    conn.send(body=content, destination='/topic/Topic.ProcessAlarmInfo')
    # # 发送消息到Topic.ProcessAlarmInfo主题
    # conn.send(body=content, destination='/topic/Topic.ProcessAlarmInfo')
    time.sleep(2)
    conn.disconnect()

     补充:临时队列

    临时队列,SMP中全量更新信息保存在MQ的临时队列中,临时队列无法通过MQ页面查看,必须通过jconsole查看,临时队列在断开连接会释放掉。

  • 相关阅读:
    正则表达式
    爬虫原理和网页构造
    简单的博客系统之二
    配置编辑器geany
    linux删除多文件
    eNSP交换路由基础
    NTP centOS6.5
    shell脚本之lftp上传
    进度条
    maketrans与translate函数
  • 原文地址:https://www.cnblogs.com/like1824/p/13321027.html
Copyright © 2011-2022 走看看