zoukankan      html  css  js  c++  java
  • ActiveMQ基于JMS的pub/sub传播机制

    1. 原文地址:[ActiveMQ实战]基于JMS的pub/sub传播机制

    发布订阅模型

    就像订阅报纸,我们可以选择一份或者多份报纸。比如:北京日报、人民日报。这些报纸就相当于发布订阅模型中的topic。如果有很多人订阅了相同的报纸,那我们就在同一个topic中注册,对于报纸发行方,它就和所有的订阅者形成了一对多的关系。并且,当你开始订阅报纸之后,正常情况下,你订阅之前的报纸你是拿不到的,除非有人给你留存了;

    pom.xml文件内容

    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.5.1</version>
    </dependency>
    

    消息生产者

    
    /**
     * 基于发布模式的 消息生产者
     */
    public class JMSPubProducer {
    
        //默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认的连接地址
        private static final String BROKEURL = "tcp://xx.xx.xx.xx:61616";
        //发送的消息数量
        private static final int SENNUM = 10;
    
        public static void main(String[] args){
            ConnectionFactory factory ; //连接工厂
            Connection connection = null ; //连接
            Session session ; //会话,接收或者发送消息的线程
            Destination destination; //消息的目的地
            MessageProducer messageProducer; //消息生产者
            //实例化连接工厂
            factory = new ActiveMQConnectionFactory(JMSPubProducer.USERNAME, JMSPubProducer.PASSWORD, JMSPubProducer.BROKEURL);
            //通过连接工厂获取connection
            try {
                connection = factory.createConnection();
                connection.start(); //启动连接
                //创建session
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                //创建主题
                destination = session.createTopic("topic1");
                //创建消息发布者
                messageProducer = session.createProducer(destination);
                //发送消息
                sendMessage(session, messageProducer);
                session.commit();
            } catch (JMSException e) {
                e.printStackTrace();
            }finally{
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 发送消息
         * @param session
         * @param mp
         * @throws JMSException
         */
        public static void sendMessage(Session session, MessageProducer mp) throws JMSException{
            for(int i = 0 ; i < JMSPubProducer.SENNUM;i++){
                TextMessage message = session.createTextMessage("ActiveMq 发布的消息 --->  " + i);
                System.out.println("发布消息:" + "ActiveMq 发布的消息" + i);
                mp.send(message);
            }
        }
    
    }
    

    消息消费者(可以有多个)

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * Created by boothsun on 2017/9/6.
     */
    public class JMSSubConsumer {
    
        //默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认的连接地址
        private static final String BROKEURL = "tcp://119.29.182.145:61616";
    
        public static void main(String[] args) {
            ConnectionFactory factory ; //连接工厂
            Connection connection = null ; //连接
            Session session ; //会话,接收或者发送消息的线程
            Destination destination; //消息的目的地
            MessageConsumer messageConsumer; //消息消费者
            //实例化连接工厂
            factory = new ActiveMQConnectionFactory(JMSSubConsumer.USERNAME, JMSSubConsumer.PASSWORD, JMSSubConsumer.BROKEURL);
            //通过连接工厂获取connection
            try {
                connection = factory.createConnection();
                connection.start(); //启动连接
                //创建session
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                //创建连接消息队列,消息到达的目的地
                destination = session.createTopic("topic1");
                //创建消费者
                messageConsumer = session.createConsumer(destination);
                //注册消息监听
                messageConsumer.setMessageListener(new Listener1());
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 订阅者1的监听
     * 消息监听类
     * @author xx
     */
    class Listener1 implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println("订阅者一收到的消息:" + ((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    测试类

    对于订阅发布模型,要先启动订阅者,订阅者先订阅topic,再发布消息;否则先发布的消息将无法被后启动的订阅者消费。就像是上面说的订阅报纸,订阅之前的报纸,正常情况下,订阅者是拿不到的。

  • 相关阅读:
    Linux Shell编程 sort、wc命令
    Linux Shell编程 sed命令
    Linux Shell编程 awk命令
    Linux Shell编程 cut、print命令
    Linux Shell基础 环境变量配置文件
    Linux Shell基础 read命令
    Linux Shell基础 位置参数变量、预定义变量
    MS DOS 命令大全
    sublime 快捷键
    滚动到页面底部继续加载页面其他内容
  • 原文地址:https://www.cnblogs.com/boothsun/p/7486845.html
Copyright © 2011-2022 走看看