zoukankan      html  css  js  c++  java
  • ACTIVEMQ 发布与订阅

    发布消息:

    代码
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.broker.BrokerService;
    import org.apache.activemq.command.ActiveMQTopic;

    public class TopicTest {

    public static void main(String[] args) throws Exception {
            ActiveMQConnectionFactory factory 
    = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection 
    = factory.createConnection();
            connection.start();
            
            
            
            
    //创建一个Topic
            Topic topic= new ActiveMQTopic("testTopic");
            Session session 
    = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            

            
    //创建一个生产者,然后发送多个消息。
            MessageProducer producer = session.createProducer(topic);
            
    for(int i=0; i<10; i++){
                producer.send(session.createTextMessage(
    "zyg:" + i));
            }
            
        }
    }

    订阅消息

    代码
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.broker.BrokerService;
    import org.apache.activemq.command.ActiveMQTopic;


    public class TestComsumer {
        
    public static void main(String[] args) throws Exception {
            ActiveMQConnectionFactory factory 
    = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection 
    = factory.createConnection();
            connection.start();
           
            
            
            
    //创建一个Topic
            Topic topic= new ActiveMQTopic("testTopic");
            Session session 
    = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            
    //注册消费者1
            MessageConsumer comsumer1 = session.createConsumer(topic);
            comsumer1.setMessageListener(
    new MessageListener(){
                
    public void onMessage(Message m) {
                    
    try {
                        System.out.println(
    "Consumer1 get " + ((TextMessage)m).getText());
                    } 
    catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
    //注册消费者2
            MessageConsumer comsumer2 = session.createConsumer(topic);
            comsumer2.setMessageListener(
    new MessageListener(){
                
    public void onMessage(Message m) {
                    
    try {
                        System.out.println(
    "Consumer2 get " + ((TextMessage)m).getText());
                    } 
    catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
    //创建一个生产者,然后发送多个消息。
           
        }

    }
  • 相关阅读:
    Spark Mllib里的向量标签概念、构成(图文详解)
    Spark Mllib里的本地向量集(密集型数据集和稀疏型数据集概念、构成)(图文详解)
    Spark Mllib里的Mllib基本数据类型(图文详解)
    [转]Debugging into .NET Core源代码的两种方式
    [转]在ASP.NET Core使用Middleware模拟Custom Error Page功能
    [转]Asp.net MVC中的ViewData与ViewBag
    [转]细说 ASP.NET Cache 及其高级用法
    [转]分布式中使用Redis实现Session共享(二)
    [转]Asp.net Core 使用Redis存储Session
    [转]JS跨域解决方式 window.name
  • 原文地址:https://www.cnblogs.com/yg_zhang/p/1847531.html
Copyright © 2011-2022 走看看