zoukankan      html  css  js  c++  java
  • JMS-ActiveMq-订阅发布模式

    发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。

    question:只有订阅了的才能收到发布的消息,否则收不到消息,也就是消费者要在生产者发布之前订阅才能监听到消息.

    有两种消费形式,我们此次只介绍listener模式(代码冗长,后期的util需自己封装)

     destination=session.createQueue("FirstQueue1"); // 创建消息队列 两种模式:1 点对点 
     destination=session.createTopic("FirstTopic1"); // 创建消息队列 两种模式:2 发布订阅
    • 实时监听,消费者监听生产者的生产状态
    • 定时刷新 recive模式(不实用,不推荐

    版本:apache-activemq-5.11.1 官网下载

    主页:http://activemq.apache.org/
    目前最新版本:5.11.1
    开发包及源码下载地址:http://activemq.apache.org/activemq-5111-release.html
    ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码admin/admin

    1.生产者JMSProducer

    /**
     * 消息生产者-消息发布者
     * @author Administrator
     *
     */
    public class JMSProducer {
    
        private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
        private static final int SENDNUM=10; // 发送的消息数量
        
        public static void main(String[] args) {
            
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageProducer messageProducer; // 消息生产者
            
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
            
            try {
                connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                // destination=session.createQueue("FirstQueue1"); // 创建消息队列
                destination=session.createTopic("FirstTopic1");
                messageProducer=session.createProducer(destination); // 创建消息生产者
                sendMessage(session, messageProducer); // 发送消息
                session.commit();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally{
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }
        
        /**
         * 发送消息
         * @param session
         * @param messageProducer
         * @throws Exception
         */
        public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
            for(int i=0;i<JMSProducer.SENDNUM;i++){
                TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
                System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);
                messageProducer.send(message);
            }
        }
    }

    2.消息监听-订阅者1  listener   

    /**
     * 消息监听-订阅者一
     * @author Administrator
     *
     */
    public class Listener implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            // TODO Auto-generated method stub
            try {
                System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    3.消息监听-订阅者2  listener2

    /**
     * 消息监听-订阅者二
     * @author Administrator
     *
     */
    public class Listener2 implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            // TODO Auto-generated method stub
            try {
                System.out.println("订阅者二收到的消息:"+((TextMessage)message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    4.消息者Consumer1

    /**
     * 消息消费者-消息订阅者一
     * @author Administrator
     *
     */
    public class JMSConsumer {
    
        private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
        
        public static void main(String[] args) {
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer messageConsumer; // 消息的消费者
            
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
                    
            try {
                connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                // destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
                destination=session.createTopic("FirstTopic1");
                messageConsumer=session.createConsumer(destination); // 创建消息消费者
                messageConsumer.setMessageListener(new Listener()); // 注册消息监听
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } 
        }
    }

    5.消息者Consumer2

    /**
     * 消息消费者-消息订阅者二
     * @author Administrator
     *
     */
    public class JMSConsumer2 {
    
        private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
        
        public static void main(String[] args) {
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer messageConsumer; // 消息的消费者
            
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
                    
            try {
                connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                // destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
                destination=session.createTopic("FirstTopic1");
                messageConsumer=session.createConsumer(destination); // 创建消息消费者
                messageConsumer.setMessageListener(new Listener2()); // 注册消息监听
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } 
        }
    }

    6.测试页面

  • 相关阅读:
    WPF编程学习——样式
    WPF编程学习——布局
    AngularJs学习笔记--concepts(概念)
    AngularJs学习笔记--html compiler
    AngularJs学习笔记--bootstrap
    Linq教程
    SQL通用分页存储过程
    JS时间倒计时
    canvas时钟可随着画布变大而比例变大
    ie下placeholder解决办法
  • 原文地址:https://www.cnblogs.com/cbpm-wuhq/p/11943171.html
Copyright © 2011-2022 走看看