zoukankan      html  css  js  c++  java
  • ActiveMQ学习总结------原生实战操作(下)03

    本篇将继续延续上一篇的内容,作为知识补充篇,为接下来我们学习spring整合ActiveMQ打好基础

    本篇主要学习内容:

      1.ActiveMQ 队列服务监听

      2.ActiveMQ Topic模型


    回顾下上一篇ActiveMQ学习总结我们学习到了:

      1.ActiveMQ术语及API介绍

      2.ActiveMQ 文本消息处理

      3.ActiveMQ 对象消息处理

    相信大现在对ActiveMQ的一些简单操作已经很轻松掌握了

    上一篇文章地址:https://www.cnblogs.com/arebirth/p/activemq02.html


    一 ActiveMQ实现队列服务监听

    在我们上一篇的练习中,所有的消费者都是接收一次消息即断开连接,这样是不是很不方便。

    试想一下,如果我们的provider在consumer接收完第一条消息后又继续发送了一条消息,那么consumer已经断开连接了,是不是就不能连接不间断的实时获取消息?

    解决方案:

      很容易,用我们的队列服务监听即可

    *:根据上一章的学习,大家对环境搭建使用配置,肯定都已经相当清楚了,这里就不过多阐述,直接进行代码实战

    1 消息生产者

    相比之下,我么你的生产者照之前是没有任何变化的,主要的变化还是在cosumer身上

    package cn.arebirth.mq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class ActiveMQQueueListenerProducer {
        public static void sendTextActiveMq(String txt) {
            //定义链接工厂
            ConnectionFactory connectionFactory = null;
    
            //定义链接对象
            Connection connection = null;
    
            //定义会话
            Session session = null;
    
            //目的地
            Destination destination = null;
    
            //定义消息的发送者
            MessageProducer producer = null;
    
            //定义消息
            Message message = null;
    
            try {
                //创建链接工厂
                connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");
    
                //创建链接诶对象
                connection = connectionFactory.createConnection();
    
                //启动链接
                connection.start();
    
                //创建会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                //创建目的地
                destination = session.createQueue("queue-listener");
    
                //创建消息生产者
                producer = session.createProducer(destination);
    
                //创建消息对象
                message = session.createTextMessage(txt);
    
                //发送消息
                producer.send(message);
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                //回收资源
                if (producer != null) {
                    try {
                        producer.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    2 消息消费者

    package cn.arebirth.mq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class ActiveMQQueueListenerConsumer {
        public static void receiveTextActiveMq() {
            // 定义链接工厂
            ConnectionFactory connectionFactory = null;
            // 定义链接对象
            Connection connection = null;
            // 定义会话
            Session session = null;
            // 目的地
            Destination destination = null;
            // 定义消息的发送者
            MessageConsumer consumer = null;
            // 定义消息
            Message message = null;
    
            try {
                //创建链接工厂
                connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");
    
                //创建链接对象
                connection = connectionFactory.createConnection();
    
                //启动链接
                connection.start();
    
                //创建会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                //创建目的地
                destination = session.createQueue("queue-listener");
    
                //创建消息消费者
                consumer = session.createConsumer(destination);
    
                //队列服务监听
                consumer.setMessageListener(new MessageListener() {
                    //ActiveMQ回调方法。通过该方法将消息传递到consumer
                    @Override
                    public void onMessage(Message message) {
                        //处理消息
                        String msg = null;
                        try {
                            msg = ((TextMessage) message).getText();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                        System.out.println("Producer say:" + msg);
                    }
                });
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    3 测试

    3.1 provider测试

    package cn.arebirth.mq;
    
    public class ProducerTest {
        public static void main(String[] args) {
            ActiveMQQueueListenerProducer.sendTextActiveMq("Hello,consumer!");
        }
    }

    观察我们的控制台可以发现已经成功发布到队列

    3.2 consumer测试

    package cn.arebirth.mq;
    
    public class ConsumerTest {
        public static void main(String[] args) {
            ActiveMQQueueListenerConsumer.receiveTextActiveMq();
        }
    }

    我们运行后可以发现,它接收到了消息,但是它的进程并没有关闭,

    我们用provider继续发布一条消息,看看consumer能不能接收到

    可以看到,consumer持续在后台监听我们发布的消息,

    通过上面代码,不难发现,provider没有任何改动,只是consumer修改了一部分

    通过调用匿名内部类的方法来实现持续监听

     consumer.setMessageListener(new MessageListener() {
        @Override
                    public void onMessage(Message message) {
    
            }
        
    }

    注意:因为涉及到队列持续监听,所以我们不能在finally处给资源回收,否则还在监听状态,资源都回收没了,也就无从监听啦。


    二 Topic模型

    在本系列文章第一篇也有介绍过一些Topic模型的概念,那么这里我们将以原理+实战的方式来带领大家掌握

    1 Publish/Subscribe处理模式(Topic)

    消息生产者(发布)消息到topic中,同时有多个消息消费者(订阅)消费该消息。

    和点对点方式不同,发布到Topic的消息会被所有的订阅者消费,而点对点的只能是指定的消费者去消费

    当生产者发布消息,不管是否有消费者,都不会保存消息,也就是说它是发完就啥也不管了那种,

    所以要注意:一定要有消费者,然后在有生产者,否则生产者不发完消息什么也不管了,你消费者在生产者之后才有,那么你是接收不到消息的。

    接下来我们就以实战的方式鼓捣下。

    2 创建生产者

    package cn.arebirth.mq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class ActiveMQTopicProducer {
        public static void sendTextActiveMQ(String txt){
            //定义链接工厂
            ConnectionFactory connectionFactory = null;
    
            //定义链接对象
            Connection connection = null;
    
            //定义会话
            Session session = null;
    
            //目的地
            Destination destination = null;
    
            //定义消息的发送者
            MessageProducer producer = null;
    
            //定义消息
            Message message = null;
    
            try {
                //创建链接工厂
                connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");
    
                //创建链接诶对象
                connection = connectionFactory.createConnection();
    
                //启动链接
                connection.start();
    
                //创建会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                //创建目的地
                destination = session.createTopic("topic-test");
    
                //创建消息生产者
                producer = session.createProducer(destination);
    
                //创建消息对象
                message = session.createTextMessage(txt);
    
                //发送消息
                producer.send(message);
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                //回收资源
                if (producer != null) {
                    try {
                        producer.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    我们可以发现,在创建目的地destination的时候代码有了变动

    destination = session.createTopic("topic-test");

    变成了createTopic,对这就是topic模式了。

    3 创建消费者

    package cn.arebirth.mq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class ActiveMQTopicConsumer implements Runnable {
    
    
        public static void receiveTextActiveMQ(String threadName) {
            // 定义链接工厂
            ConnectionFactory connectionFactory = null;
            // 定义链接对象
            Connection connection = null;
            // 定义会话
            Session session = null;
            // 目的地
            Destination destination = null;
            // 定义消息的发送者
            MessageConsumer consumer = null;
            // 定义消息
            Message message = null;
    
            try {
                //创建链接工厂
                connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");
    
                //创建链接对象
                connection = connectionFactory.createConnection();
    
                //启动链接
                connection.start();
    
                //创建会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                //创建目的地
                destination = session.createTopic("topic-test");
    
                //创建消息的消费者
                consumer = session.createConsumer(destination);
    
                //服务监听
                consumer.setMessageListener(new MessageListener() {
                    //ActiveMQ回调方法。通过该方法将消息传递到consumer
                    @Override
                    public void onMessage(Message message) {
                        //处理消息
                        String msg = null;
                        try {
                            msg = ((TextMessage) message).getText();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                        System.out.println(threadName + "--Producer say:" + msg);
                    }
                });
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            receiveTextActiveMQ(Thread.currentThread().getName());
        }
    }

    我们可以发现,在创建目的地destination的时候代码有了变动

    destination = session.createTopic("topic-test");

    还有实现了Runnable这个是为了一会测试的时候,多线程启动,看效果,是否多个都会接受到,(如果看着糊涂的话,你也可以去掉线程的部分,单独复制多个对象,并启动,效果也是一样的)

    4 测试(要先启动消费者,否则消费者是接收不到消息的!当然,你自己可以试一下

    4.1 测试消费者

    package cn.arebirth.mq;
    
    public class ConsumerTest {
        public static void main(String[] args) {
            ActiveMQTopicConsumer a1 = new ActiveMQTopicConsumer();
            Thread t1 = new Thread(a1,"a1");
    
            ActiveMQTopicConsumer a2 = new ActiveMQTopicConsumer();
            Thread t2 = new Thread(a2,"a2");
    
            ActiveMQTopicConsumer a3 = new ActiveMQTopicConsumer();
            Thread t3 = new Thread(a3,"a3");
    
            t1.start();
            t2.start();
            t3.start();
        }
    }

    可以看到,我们的消费者已经启动了,三个线程。并以监听服务的方式启动

    4.2 测试生产者

    package cn.arebirth.mq;
    
    public class ProducerTest {
        public static void main(String[] args) {
            ActiveMQTopicProducer.sendTextActiveMQ("hello,topic");
        }
    }

    可以看到,在topics下面,我们发布的内容已经有记录了

    然后我们在看下,我们的consumer

    可以发现,三个consumer都已经接收到了

     

    ps:

      如果你对ActiveMQ原理性的东西感到困惑,可以看下我们前面的文章:https://www.cnblogs.com/arebirth/p/activemq01.html

  • 相关阅读:
    写在我第一个虚幻程序之前
    C++学习笔记(十二):重载函数
    JavaScript兼容问题汇总[实时更新]
    Activity跳转时传递Bitmap对象的实现
    Http报头Accept与Content-Type的差别
    第0讲: 准备篇
    Android Design Support Library(二)用NavigationView实现抽屉菜单界面
    Android Design Support Library(一)用TabLayout实现类似网易选项卡动态滑动效果
    Android Support Library 23.2用法简析
    Android Design Support Library初探,NavigationView实践
  • 原文地址:https://www.cnblogs.com/arebirth/p/activemq03.html
Copyright © 2011-2022 走看看