zoukankan      html  css  js  c++  java
  • SpringBoot+ActiveMq实现订阅模式(Topic)消息队列

    上文已经详细介绍了点对点模式(Queue)下的消息队列,今天就来再介绍一下消息队列的另一种模式:订阅模式。

    一、订阅模式的流程
    生产者产生一条消息message放入一个topic中,该topic已经三个消费者订阅了,那么被放入topic中的这条消息,就会同时被这三个消费者取走(当然他们必须都处于在线状态),并进行“消费”。其实就类似现实生活中的手机接收推送。

     

    二、订阅模式的应用场景
    发布订阅模式下,当发布者消息量很大时,显然单个订阅者的处理能力是不足的。实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,这样订阅者很容易实现消费能力线性扩展。可以看成是一个topic下有多个Queue,每个Queue是点对点的方式,Queue之间是发布订阅方式。

    三、具体实现
    ActiveMq的配置以及pom导入的jar包可以参考上文;

    1、创建生产者:

    /**
    *
    * @author yuyan
    * @create 2018-08-28 16:09
    **/
    @Service
    public class Topic_Producer {

    public void sendMessage(String msg){
    try {
    //创建连接工厂
    ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(
    ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD,
    "tcp://localhost:61616");
    connFactory.setMaxThreadPoolSize(1);

    //连接到JMS提供者
    Connection conn = connFactory.createConnection();
    // conn.setClientID("producer1");
    conn.start();

    //事务性会话,自动确认消息
    Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
    //消息的目的地
    Destination destination = session.createTopic("topic1");
    //消息生产者
    MessageProducer producer = session.createProducer(destination);
    // producer.setDeliveryMode(DeliveryMode.PERSISTENT); //持久化


    // //文本消息
    // TextMessage textMessage = session.createTextMessage("这是文本消息");
    // producer.send(textMessage);

    //键值对消息
    MapMessage mapMessage = session.createMapMessage();
    mapMessage.setString("reqDesc", msg);
    producer.send(mapMessage);
    //
    // //流消息
    // StreamMessage streamMessage = session.createStreamMessage();
    // streamMessage.writeString("这是流消息");
    // producer.send(streamMessage);
    //
    // //字节消息
    // String s = "BytesMessage字节消息";
    // BytesMessage bytesMessage = session.createBytesMessage();
    // bytesMessage.writeBytes(s.getBytes());
    // producer.send(bytesMessage);
    //
    // //对象消息
    // User user = new User("obj_info", "对象消息"); //User对象必须实现Serializable接口
    // ObjectMessage objectMessage = session.createObjectMessage();
    // objectMessage.setObject(user);
    // producer.send(objectMessage);


    session.commit(); //提交会话,该条消息会进入"queue"队列,生产者也完成了历史使命
    producer.close();
    session.close();
    conn.close();
    //在事务性会话中,只有commit之后,消息才会真正到达目的地

    }catch (Exception e){
    e.printStackTrace();

    }

    }

    }
    2、创建消费者

    package com.springjms.queue_message;

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;

    import javax.jms.*;
    import java.util.Date;

    /**
    *
    * @author
    * @create 2018-09-06 9:55
    **/
    @Component
    public class Topic_Consumer implements ApplicationRunner{

    @Override
    public void run(ApplicationArguments args) throws Exception {
    init();
    }

    public void init() throws JMSException {
    ConnectionFactory factory = new ActiveMQConnectionFactory(
    ActiveMQConnectionFactory.DEFAULT_USER,
    ActiveMQConnectionFactory.DEFAULT_PASSWORD,
    "tcp://localhost:61616"
    );

    Connection conn = factory.createConnection();
    // conn.setClientID("consumer1");
    conn.start();

    Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    //与生产者的消息目的地相同
    Destination dest = session.createTopic("topic1");

    MessageConsumer messConsumer = session.createConsumer(dest);

    messConsumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    try {
    MapMessage m = (MapMessage)message;
    System.out.println("consumer1接收到"+m.getString("reqDesc")+"的请求并开始处理,时间是"+new Date());
    System.out.println("这里会停顿5s,模拟系统处理请求,时间是"+new Date());
    Thread.sleep(5000);
    System.out.println("consumer1接收到"+m.getString("reqDesc")+"的请求并处理完毕,时间是"+new Date());
    }catch (Exception e){
    e.printStackTrace();
    }
    }
    });

    }

    }


    现在的生产者和消费者都处于同一个项目中,且是一对一的关系,如果想要验证一个生产者对应多个消费者的情况,可以再新建一个项目,并且创建一个消费者,只要保证topic相同即可。

    3、接口测试

    @RequestMapping(value = "/SendMessageByTopic", method = RequestMethod.GET)
    @ResponseBody
    public void sendTopic(String msg) {
    try {
    System.out.println(msg+"开始发出一次请求,时间是"+new Date());
    topic_producer.sendMessage(msg);
    System.out.println(msg+"请求发送完成,时间是"+new Date());


    }catch (Exception e){
    e.printStackTrace();
    }
    }
    测试结果:

    可以看到两个消费者consumer1、consumer2同时收到了来自topic的请求,并且同时完成了处理;

    观察http://localhost:8161/admin/topics.jsp:

    未发请求时,topic1中有两个消费者,入队列与出队列的消息数都是0:

     

    发出请求后,topic1中有了一条消息,入队列数为1,出队列数为0:

     

    请求处理完毕后,topic1中的出队列数为2,入队列数为1,证明这条消息分别被两个消费者消费了:

     

    这样,消息队列的两种模式就已经介绍完了,文章中介绍的方式都是基于ActiveMq这种传统的消息队列,其实还有诸如rabbitMq、kafka、rocketMq等消息队列,它们的原理和实现方式都不尽相同,以后有时间,还是需要再研究一下!

    ————————————————
    版权声明:本文为CSDN博主「superyu1992」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/superyu1992/java/article/details/82461200

  • 相关阅读:
    BZOJ 3053 The Closest M Points
    Python 语言介绍
    计算机组成与操作系统基础
    Gym 100818I Olympic Parade(位运算)
    Codeforces 602B Approximating a Constant Range(想法题)
    Codeforces 599D Spongebob and Squares(数学)
    Codeforces 599C Day at the Beach(想法题,排序)
    ZOJ 3903 Ant(数学,推公示+乘法逆元)
    ZOJ 3911 Prime Query(线段树)
    UVALive 6910 Cutting Tree(离线逆序并查集)
  • 原文地址:https://www.cnblogs.com/telwanggs/p/13110946.html
Copyright © 2011-2022 走看看