zoukankan      html  css  js  c++  java
  • python发送消息到activeMQ后java接收到BinaryMessage的坑

    和另一个系统进行对接,使用activemq进行消息通信。对方使用java客户端监听一个topic,我们需要发送TextMessage消息,对方接收后进行处理。而我们因为系统架构的原因只能使用python进行推送,也就只能通过stomp协议发送消息。然后就遇到了问题,发送的消息在java消费者端只能解析成BinaryMessage,而发送的时候根本没有办法指定消息类型。网上搜了很久没有找到相同的情况。
    根据官方通过python往ActiveMQ发送message的demo编写如下代码。

    # -*-coding:utf-8-*-
    import stomp
    import time
    
    queue_name = '/queue/SampleQueue'
    topic_name = '/topic/SampleTopic'
    listener_name = 'SampleListener'
    test_name = "springBootMqQueue"
    springBootMqQueue = '/queue/springBootMqQueue'
    
    
    class SampleListener(object):
        def on_message(self, headers, message):
            print('headers: %s' % headers)
            print('message: %s' % message)
    
    
    # 推送到队列queue
    def send_to_queue(msg):
        conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
        conn.connect('admin', 'admin', wait=True)
        conn.send(springBootMqQueue, msg)
        conn.disconnect()
    
    
    ##从队列接收消息
    def receive_from_queue():
        conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
        conn.set_listener(listener_name, SampleListener())
        conn.connect('admin', 'admin', wait=True)
        conn.subscribe(springBootMqQueue)
        time.sleep(1)  # secs
        conn.disconnect()
    
    
    if __name__ == '__main__':
        send_to_queue('{"content":{"flow":{"network":"5","times":"1-1","url":"http://www.baidu.com","way":"5"},"sms":{"direction":"0","text":"短信内容详情"},"voice":{"connect":"5","key":"挂断"}},"form":"13901295021","formPort":"com4","interval":"2-2","network":"5","taskId":"1dsf3641212434g","times":"1-3","to":"18611010269","type":"1"}')
        receive_from_queue()

    Stomp是一个很简单的协议,协议中不携带TextMessage和BytesMessage相关的信息,而是通过content-length header判断消息类型的。header中有content-length则说明是BytesMessage,否则是TextMessage。
    接下来的问题就简单了,发送的时候不在header中携带content-length就可以了,查看send方法的源码发现

    def __init__(self, transport, auto_content_length=True):
        self.transport = transport
        self.auto_content_length = auto_content_length
        transport.set_listener('protocol-listener', self)
        self.version = '1.0'
    
    def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
        """
        Send a message to a destination.
    
        :param str destination: the destination of the message (e.g. queue or topic name)
        :param body: the content of the message
        :param str content_type: the content type of the message
        :param dict headers: a map of any additional headers the broker requires
        :param keyword_headers: any additional headers the broker requires
        """
        assert destination is not None, "'destination' is required"
        assert body is not None, "'body' is required"
        headers = utils.merge_headers([headers, keyword_headers])
        headers[HDR_DESTINATION] = destination
        if content_type:
            headers[HDR_CONTENT_TYPE] = content_type
        body = encode(body)
        if self.auto_content_length and body and HDR_CONTENT_LENGTH not in headers:
            headers[HDR_CONTENT_LENGTH] = len(body)
        self.send_frame(CMD_SEND, headers, body)

    三个条件都为true则会填充content-length,而auto_content_length是在__init__方法中传入的,默认值为True,所以只需要在创建对象的时候将该值设置为False即可。

    # 推送到队列queue
    def send_to_queue(msg):
        conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)
        conn.connect('admin', 'admin', wait=True)
        conn.send(springBootMqQueue, msg)
        conn.disconnect()
    人生就是要不断折腾
  • 相关阅读:
    POJ 2778 DNA Sequence(AC自动机+矩阵)
    Challenge & Growth —— 从这里开始
    京东云
    [Done] Codeforces Round #562 (Div. 2) 题解
    HDU 3486 Interviewe
    Codeforces Round #529 (Div. 3) 题解
    Wannafly 挑战赛 19 参考题解
    第十六届上海大学程序设计联赛春季赛暨上海高校金马五校赛 题解
    2018年长沙理工大学第十三届程序设计竞赛 题解
    POJ 3017 Cut the Sequence
  • 原文地址:https://www.cnblogs.com/xiangxiaolin/p/12697610.html
Copyright © 2011-2022 走看看