zoukankan      html  css  js  c++  java
  • ActiveMQ发布-订阅消息模式

    一、订阅杂志
    我们很多人都订过杂志,其过程很简单。只要告诉邮局我们所要订的杂志名、投递的地址,付了钱就OK。出版社定期会将出版的杂志交给邮局,邮局会根据订阅的列表,将杂志送达消费者手中。这样我们就可以看到每一期精彩的杂志了。

    仔细思考一下订杂志的过程,我们会发现这样几个特点:
    1、消费者订杂志不需要直接找出版社;
    2、出版社只需要把杂志交给邮局;
    3、邮局将杂志送达消费者。
    邮局在整个过程中扮演了非常重要的中转作用,在出版社和消费者相互不需要知道对方的情况下,邮局完成了杂志的投递。

    二、 发布-订阅消息模式
    刚刚讲了订阅杂志,下面我们会讲传统调用模式演化到发布-订阅消息模式。

    有些网站在注册用户成功后发一封激活邮件,用户收到邮件后点击激活链接后才能使用该网站。一般的做法是在注册用户业务逻辑中调用发送邮件的逻辑。这样用户业务就依赖于邮件业务。如果以后改为短信激活,注册用户业务逻辑就必须修改为调用发送短信的逻辑。如果要注册后给用户加点积分,再加一段逻辑。经过多次修改,我们发现很简单的注册用户业务已经越来越复杂,越来越难以维护。相信很多开发者都会有类似痛苦的经历。

    即使用户业务实现中对其他业务是接口依赖,也避免不了业务变化带来的依赖影响。怎么办?解耦!将注册用户业务逻辑中注册成功后的处理剥离出来。

    再回头看看“订阅杂志”,如果没有邮局,出版社就必须自己将杂志送达所有消费者。这种情形就和现在的注册用户业务一样。我们发现问题了,在用户业务和其他业务之间缺少了邮局所扮角色。

    我们把邮局抽象成一个管理消息的地方,叫“消息管理器”。注册用户成功后发送一个消息给消息管理器,由消息管理器转发该消息给需要处理的业务。现在,用户业务只依赖于消息管理器了,它再也不会为了注册用户成功后的其他处理而烦恼。

    注册用户的改造就是借鉴了“订阅杂志”这样原始的模式。我们再进一步抽象,用户业务就是消息的“生产者”,它将消息发布到消息管理器。邮件业务就是消息的“消费者”,它将收到的消息进行处理。邮局可以订阅很多种杂志,杂志都是通过某种编号来区分;消息管理器也可以管理多种消息,每种消息都会有一个“主题”来区分,消费者都是通过主题来订阅的。

    发布-订阅消息模式已经呈现在我们面前,在这里,对于发布者来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:

    示例:
    1、Publish.java:消息发布者

    package com.ljq.durian.test.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Publish {
        private ConnectionFactory factory;
        private Connection connection;
        private Session session;
        private MessageProducer producer;
    
        public Publish() {
            try {
                factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false");
                connection = factory.createConnection();
                connection.start();
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                producer = session.createProducer(null);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void sendMessage() throws Exception {
            Destination destination = session.createTopic("Topic001");
            TextMessage msg = session.createTextMessage("我是消息内容...");
            producer.send(destination, msg);
            
            if(connection != null){
                connection.close();
            }    
        }
    
        public static void main(String[] args) throws Exception {
            Publish publish= new Publish();
            publish.sendMessage();
        }
    }

    2、Subscriber1.java:消息订阅者

    package com.ljq.durian.test.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Subscriber1 {
        private ConnectionFactory factory;
        private Connection connection;
        private Session session;
    
        public Subscriber1() {
            try {
                factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false");
                connection = factory.createConnection();
                connection.start();
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void receive() throws Exception {
            Destination topic = session.createTopic("Topic001") ;
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new Listener());
        }
    
        class Listener implements MessageListener {
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage tm = (TextMessage) message;
                    System.out.println("Subscriber1 Received message: " + tm.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        public static void main(String[] args) throws Exception {
            Subscriber1 subscriber = new Subscriber1();
            subscriber.receive();
        }
    }

    3、Subscriber2.java:消息订阅者

    package com.ljq.durian.test.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Subscriber2 {
        private ConnectionFactory factory;
        private Connection connection;
        private Session session;
    
        public Subscriber2() {
            try {
                factory =
                        new ActiveMQConnectionFactory("ljq", "ljq",
                                "failover:(tcp://192.168.1.101:61616)?Randomize=false");
                connection = factory.createConnection();
                connection.start();
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void receive() throws Exception {
            Destination topic = session.createTopic("Topic001") ;
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new Listener());
        }
    
        class Listener implements MessageListener {
            public void onMessage(Message message) {
                System.out.println(message);
                try {
                    TextMessage tm = (TextMessage) message;
                    System.out.println("Subscriber2 Received message: " + tm.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        public static void main(String[] args) throws Exception {
            Subscriber2 subscriber = new Subscriber2();
            subscriber.receive();
        }
    }
  • 相关阅读:
    D. Longest Subsequence
    线段树入门HDU_1754
    poj_2503(map映射)
    HDU_4826
    poj_2251
    day 44 单表查询,多表查询
    day43 字段的修改、添加和删除,多表关系(外键),单表详细操作(增删改查)
    day 42 数据库的配置、数据库与表的一些剩余操作、用户操作、数据库表的引擎、数据库的模式、mysql支持的数据类型、约束
    day41 数据库介绍、数据库基本操作
    day 40 线程队列、线程定时器、进程池和线程池、同步与异步、用多线程来写socket服务端与客户端
  • 原文地址:https://www.cnblogs.com/linjiqin/p/6518780.html
Copyright © 2011-2022 走看看