zoukankan      html  css  js  c++  java
  • python发送消息到activeMQ后java接收到BinaryMessage的坑

    和另一个系统进行对接,使用activemq进行消息通信。对方使用java客户端监听一个topic,我们需要发送TextMessage消息,对方接收后进行处理。而我们因为系统架构的原因只能使用python进行推送,也就只能通过stomp协议发送消息。然后就遇到了问题,发送的消息在java消费者端只能解析成BinaryMessage,而发送的时候根本没有办法指定消息类型。网上搜了很久没有找到相同的情况。
    根据官方通过python往ActiveMQ发送message的demo编写如下代码。

    # -*-coding:utf-8-*-
    import stomp
    import time
    
    queue_name = '/queue/SampleQueue'
    topic_name = '/topic/SampleTopic'
    listener_name = 'SampleListener'
    test_name = "springBootMqQueue"
    springBootMqQueue = '/queue/springBootMqQueue'
    
    
    class SampleListener(object):
        def on_message(self, headers, message):
            print('headers: %s' % headers)
            print('message: %s' % message)
    
    
    # 推送到队列queue
    def send_to_queue(msg):
        conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
        conn.connect('admin', 'admin', wait=True)
        conn.send(springBootMqQueue, msg)
        conn.disconnect()
    
    
    ##从队列接收消息
    def receive_from_queue():
        conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
        conn.set_listener(listener_name, SampleListener())
        conn.connect('admin', 'admin', wait=True)
        conn.subscribe(springBootMqQueue)
        time.sleep(1)  # secs
        conn.disconnect()
    
    
    if __name__ == '__main__':
        send_to_queue('{"content":{"flow":{"network":"5","times":"1-1","url":"http://www.baidu.com","way":"5"},"sms":{"direction":"0","text":"短信内容详情"},"voice":{"connect":"5","key":"挂断"}},"form":"13901295021","formPort":"com4","interval":"2-2","network":"5","taskId":"1dsf3641212434g","times":"1-3","to":"18611010269","type":"1"}')
        receive_from_queue()

    Stomp是一个很简单的协议,协议中不携带TextMessage和BytesMessage相关的信息,而是通过content-length header判断消息类型的。header中有content-length则说明是BytesMessage,否则是TextMessage。
    接下来的问题就简单了,发送的时候不在header中携带content-length就可以了,查看send方法的源码发现

    def __init__(self, transport, auto_content_length=True):
        self.transport = transport
        self.auto_content_length = auto_content_length
        transport.set_listener('protocol-listener', self)
        self.version = '1.0'
    
    def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
        """
        Send a message to a destination.
    
        :param str destination: the destination of the message (e.g. queue or topic name)
        :param body: the content of the message
        :param str content_type: the content type of the message
        :param dict headers: a map of any additional headers the broker requires
        :param keyword_headers: any additional headers the broker requires
        """
        assert destination is not None, "'destination' is required"
        assert body is not None, "'body' is required"
        headers = utils.merge_headers([headers, keyword_headers])
        headers[HDR_DESTINATION] = destination
        if content_type:
            headers[HDR_CONTENT_TYPE] = content_type
        body = encode(body)
        if self.auto_content_length and body and HDR_CONTENT_LENGTH not in headers:
            headers[HDR_CONTENT_LENGTH] = len(body)
        self.send_frame(CMD_SEND, headers, body)

    三个条件都为true则会填充content-length,而auto_content_length是在__init__方法中传入的,默认值为True,所以只需要在创建对象的时候将该值设置为False即可。

    # 推送到队列queue
    def send_to_queue(msg):
        conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
        conn.connect('admin', 'admin', wait=True)
        conn.send(springBootMqQueue, msg)
        conn.disconnect()
    人生就是要不断折腾
  • 相关阅读:
    如何使用SAP Intelligent Robotic Process Automation自动操作Excel
    OpenSAML 使用引导 IV: 安全特性
    Spring Cloud Zuul 网关使用与 OAuth2.0 认证授权服务
    微服务架构集大成者—Spring Cloud (转载)
    Spring Cloud Eureka 服务注册列表显示 IP 配置问题
    使用 Notification API 开启浏览器桌面提醒
    SignalR 中使用 MessagePack 序列化提高 WebSocket 通信性能
    配置 Nginx 的目录浏览功能
    关于 Nginx 配置 WebSocket 400 问题
    Migrate from ASP.NET Core 2.0 to 2.1
  • 原文地址:https://www.cnblogs.com/xiangxiaolin/p/12697610.html
Copyright © 2011-2022 走看看