zoukankan      html  css  js  c++  java
  • ActiveMQ入门实例Demo

     前面我们已经搭建和配置好了ActiveMQ,下面来看一个Demo,体验一下MQ。

    JMS 消息模型

      JMS消息服务应用程序结构支持两种模型:点对点模型,发布者/订阅者模型。  

      (1)点对点模型(Queue)

        一个生产者向一个特定的队列发布消息,一个消费者从这个队列中依次读取消息。

        模型特点:只有一个消费者获得消息。

      (2)发布者/订阅者模型(Topic)

        0个或多个订阅者可以接受特定主题的消息。

        模型特点:多个消费者可获得消息。

        Topic和Queue的最大区别在于Topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而Queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

    JMS消息格式

    • MapMessage -- key-value键值对
    • TextMessage -- 字符串对象
    • ObjcetMessage -- 一个序列化的Java对象
    • ByteMessage -- 一个未解释字节的数据流
    • StreamMessage -- Java原始值的数据流

     

    点对点模型Demo

    public class Constants {
    
        public static final String MQ_NAME = "parry";
        
        public static final String MQ_PASSWORD = "parry123";
        
        public static final String MQ_BROKETURL = "tcp://192.168.56.129:61616";
    }
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import com.parry.demo.constant.Constants;
    
    /**
     * 
     * <p>
     * MSProduct 点对点模型-消息生产者
     * <p>
     */
    public class MSProduct {
    
        public static void main(String[] args) {
            // 连接工厂
            ConnectionFactory factory;
            // 连接实例
            Connection connection = null;
            // 收发的线程实例
            Session session;
            // 消息发送目标地址
            Destination destination;
            // 消息创建者
            MessageProducer messageProducer;
            try {
                factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD,
                        Constants.MQ_BROKETURL);
                // 获取连接实例
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息)
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建队列(返回一个消息目的地)
                destination = session.createQueue("parryQuene");
                // 创建消息生产者
                messageProducer = session.createProducer(destination);
                // 创建TextMessage消息实体
                TextMessage message = session.createTextMessage("我是parry,这是我的第一个消息!");
                messageProducer.send(message);
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import com.parry.demo.constant.Constants;
    /**
     * <p>
     * MQConsumer 点对点--消息消费者
     * <p>
     */
    public class MQConsumer {
    
        public static void main(String[] args) {
            // 连接工厂
            ConnectionFactory connectionFactory;
            // 连接实例
            Connection connection = null;
            // 收发的线程实例
            Session session;
            // 消息发送目标地址
            Destination destination;
            try {
                // 实例化连接工厂
                connectionFactory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL);
                // 获取连接实例
                connection = connectionFactory.createConnection();
                // 启动连接
                connection.start();
                // 创建接收或发送的线程实例(消费者就不需要开启事务了)
                session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                // 创建队列(返回一个消息目的地)
                destination = session.createQueue("parryQuene");
                // 创建消息消费者
                MessageConsumer consumer = session.createConsumer(destination);
                //注册消息监听
                consumer.setMessageListener(new MQListerner());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    /**
     * <p>
     * MQListerner 生产者监听器
     * <p>
     */
    public class MQListerner implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println(((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    发布者/订阅者模型Demo

      (1)发布者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import com.parry.demo.constant.Constants;
    
    /**
     * <p>
     * MQProducer 订阅消息的发送者
     * <p>
     */
    public class MQProducer {
    
        public static void main(String[] args) {
            // 连接工厂
            ConnectionFactory factory;
            // 连接实例
            Connection connection = null;
            // 收发的线程实例
            Session session;
            // 消息发送目标地址
            Destination destination;
    
            try {
                // 实例化连接工厂
                factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL);
                // 获取连接实例
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息)
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 创建队列(返回一个消息目的地)
                destination = session.createTopic("parryTopic");
                // 创建消息发布者
                MessageProducer producer = session.createProducer(destination);
                // 创建TextMessage消息
                TextMessage message = session.createTextMessage("你好,这是我发布的第一条消息!");
                // 发布消息
                producer.send(message);
            } catch (JMSException e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    (2)订阅者01

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import com.parry.demo.constant.Constants;
    
    /**
     * <p>
     * MQCousumer01 订阅-发布模式 订阅者01
     * <p>
     */
    public class MQCousumer01 {
    
        public static void main(String[] args) {
            // 连接工厂
            ConnectionFactory factory;
            // 连接实例
            Connection connection = null;
            // 收发的线程实例
            Session session;
            // 消息发送目标地址
            Destination destination;
            try {
                // 实例化连接工厂
                factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL);
                // 获取连接实例
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建接收或发送的线程实例
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                // 创建队列(返回一个消息目的地)
                destination = session.createTopic("parryTopic");
                // 创建消息订阅者
                MessageConsumer consumer = session.createConsumer(destination);
                // 消息发布者添加监听器
                consumer.setMessageListener(new Listerner01());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    /**
     * <p>
     * Listerner01 订阅者01的监听器
     * <p>
     */
    public class Listerner01 implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println("订阅者01接收到消息:" + ((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    (3)订阅者02

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import com.parry.demo.constant.Constants;
    
    /**
     * <p>
     * MQCousumer02 订阅-发布模式 订阅者02
     * <p>
     */
    public class MQCousumer02 {
    
        public static void main(String[] args) {
            // 连接工厂
            ConnectionFactory factory;
            // 连接实例
            Connection connection = null;
            // 收发的线程实例
            Session session;
            // 消息发送目标地址
            Destination destination;
            try {
                // 实例化连接工厂
                factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL);
                // 获取连接实例
                connection = factory.createConnection();
                // 启动连接
                connection.start();
                // 创建接收或发送的线程实例
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                // 创建队列(返回一个消息目的地)
                destination = session.createTopic("parryTopic");
                // 创建消息订阅者
                MessageConsumer consumer = session.createConsumer(destination);
                // 消息发布者添加监听器
                consumer.setMessageListener(new Listerner02());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    /**
     * <p>
     * Listerner02 订阅者02的监听器
     * <p>
     */
    public class Listerner02 implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println("订阅者02接收到消息:"+((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    转自:https://www.cnblogs.com/parryyang/p/6062160.html

  • 相关阅读:
    图论模型--dijstra算法和floyd算法
    灰色预测模型
    多属性决策
    层次分析法
    一元多项式
    9.8一些错误的原因
    http协议笔记(不全)
    计网笔记1.18(不全)
    数据库基本操作
    flask-数据库
  • 原文地址:https://www.cnblogs.com/panchanggui/p/10335060.html
Copyright © 2011-2022 走看看