需求:使用 python 程序向 activemq
的主题推送数据,默认推送的数据类型是 BytesMessage,java 程序那边接收较为麻烦,改为推送 TextMessage 类型的数据
解决方法:想要推送 TextMessage
需要指定 auto_content_length=False
示例代码如下:
# coding=utf-8
import stomp
def send_to_topic(msg):
try:
conn = stomp.Connection10([("10.10.19.200", 61613)], auto_content_length=False)
conn.start()
conn.connect()
conn.send('/topic/HIATMP.HISENSE.ILLEGAL.AIREVIEW', msg)
conn.disconnect()
return 1
except Exception as e:
# logging.error(f"send message with activemq failed, error is:{e}")
return 0
if __name__ == "__main__":
result = send_to_topic("ILLEGAL01,2.1,0001,5b9171c2815342c5bce90f601f14d182,1,02,鲁BJ0A92,2019-04-26 15:51:45,12080,601078111050,宁夏路与福州南路路口,370202000000,1,1,370202000000011125,3,01,,0,http://10.10.19.250/1.png,http://10.10.19.250/2.png,http://10.10.19.250/3.png,,717/1846/136/36/1,2,2019-04-26 15:51:53,,,,,,,,,1,不按道行驶,,,,,,,,,1,10")
print result
例外,附上 activemq 关于主题的生产者,消费者代码:
1)python 版本(stomp协议)
生产者:
# coding=utf-8 import stomp def send_to_topic(msg): try: conn = stomp.Connection10([("10.10.19.200", 61613)], auto_content_length=False) conn.start() conn.connect() conn.send('/topic/HIATMP.HISENSE.ILLEGAL.AIREVIEW', msg) conn.disconnect() return 1 except Exception as e: # logging.error(f"send message with activemq failed, error is:{e}") return 0 if __name__ == "__main__": result = send_to_topic("ILLEGAL01,2.1,0001,5b9171c2815342c5bce90f601f14d182,1,02,鲁BJ0A92,2019-04-26 15:51:45,12080,601078111050,宁夏路与福州南路路口,370202000000,1,1,370202000000011125,3,01,,0,http://10.10.19.250/1.png,http://10.10.19.250/2.png,http://10.10.19.250/3.png,,717/1846/136/36/1,2,2019-04-26 15:51:53,,,,,,,,,1,不按道行驶,,,,,,,,,1,10") print result
消费者:
# coding=utf-8 import stomp class SampleListener(object): def on_message(self, headers, message): print('headers: %s' % headers['destination']) print('message: %s ' % message) # 从主题接收消息 def receive_from_topic(): conn = stomp.Connection10([("10.10.19.200", 61613)]) conn.set_listener("", SampleListener()) conn.start() conn.connect() conn.subscribe("/topic/HIATMP.HISENSE.ILLEGAL.AIREVIEW") while True: pass # conn.disconnect() if __name__ == '__main__': receive_from_topic()
2)java 版本(tcp协议)
生产者:
package ActiveMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; import java.util.UUID; public class TopicProducer { public static void main(String[] args) { //连接信息设置 String username = "admin"; String password = "admin"; String brokerURL = "failover://tcp://10.10.19.200:61616"; //连接工厂 ConnectionFactory connectionFactory = null; //连接 Connection connection = null; //会话 接受或者发送消息的线程 Session session = null; //消息的主题 Topic topic = null; //消息生产者 MessageProducer messageProducer = null; //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL); try { //通过连接工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //创建名为TopicTest的主题 // topic = session.createTopic("HIATMP.HISENSE.ILLEGAL"); topic = session.createTopic("HIATMP.HISENSE.ILLEGAL.AIREVIEW"); //创建主题生产者 messageProducer = session.createProducer(topic); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//不将数据持久化 //发送主题 TextMessage message = null; for (int i = 0; i < 1; i--) { //创建要发送的文本信息 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");//设置日期格式 String dateTime = df.format(new Date());// new Date()为获取当前系统时间 // message = session.createTextMessage("illegal" + dateTime); String uuid = UUID.randomUUID().toString().replaceAll("-",""); String message_fmt = String.format("ILLEGAL01,2.1,0001,%s,1,02,鲁BJ0A92,2019-04-26 15:51:45,12080,601078111050,宁夏路与福州南路路口,370202000000,1,1,370202000000011125,3,01,,0,http://10.10.19.250/1.png,http://10.10.19.250/2.png,http://10.10.19.250/3.png,,717/1846/136/36/1,2,2019-04-26 15:51:53,,,,,,,,,1,不按道行驶,,,,,,,,,1,10", uuid); message = session.createTextMessage(message_fmt); //通过主题生产者发出消息 messageProducer.send(message); System.out.println("发送成功:" + message.getText()); session.commit(); // 提交到mq Thread.sleep( 200 * 1 ); } session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (null != connection) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
消费者:
package ActiveMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.BytesMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.transport.stomp.Stomp; public class TopicConsumer { public static void main(String[] args) { // Stomp.Headers.Send.PERSISTENT; //连接信息设置 String username = "admin"; String password = "admin"; String brokerURL = "failover://tcp://10.10.19.200:61616"; // String brokerURL = "failover://stomp://0.0.0.0:61613"; //连接工厂 ConnectionFactory connectionFactory = null; //连接 Connection connection = null; //会话 接受或者发送消息的线程 Session session = null; //主题的目的地 Topic topic = null; //主题消费者 MessageConsumer messageConsumer = null; //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL); try { //通过连接工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建一个连接TopicTest的主题 topic = session.createTopic("HIATMP.HISENSE.ILLEGAL.AIREVIEW"); //创建主题消费者 messageConsumer = session.createConsumer(topic); messageConsumer.setMessageListener(new MyMessageListener()); } catch (JMSException e) { e.printStackTrace(); } } } class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(message); // 接收 BytesMessage // BytesMessage bytesMessage = (BytesMessage) message; TextMessage bytesMessage = (TextMessage) message; try { // 接收 BytesMessage // byte []bt = new byte[(int) bytesMessage.getBodyLength()]; // bytesMessage.readBytes(bt); // String str = new String(bt); // System.out.println("接收订阅主题:" + str); System.out.println("接收订阅主题:" + bytesMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
end~