zoukankan      html  css  js  c++  java
  • ActiveMQ入门代码案例

    ActiveMQ入门代码案例:

    pom依赖:

    <!--  activemq 所需要的jar 包-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.9</version>
    </dependency>
    <!--  activemq 和 spring 整合的基础包 -->
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>3.16</version>
    </dependency>
    

    JMS编码总体规范:

    Destination(目的地)简介:

    Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。

     

    Destination分为两种:队列和主题。下图介绍:

    Destination之队列(Queue)

    消息队列生产者

    案例代码:

    public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616";
    public static final String ACTIVE_NAME = "active1";
    public static void main(String[] args) throws Exception {
        /*1、创建连接工厂*/
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        /*2、打开连接*/
        final Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        /*3、创建Session会话:参数1:事务,参数2:签收*/
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        /*4、创建目的地:具体是队列还是主题topic*/
        Queue queue = session.createQueue(ACTIVE_NAME);
        /*5、创建消息的消费者*/
        final MessageProducer producer = session.createProducer(queue);
        for (int i = 1; i <= 60; i++) {
            final TextMessage textMessage = session.createTextMessage("生产者生产消息:Message=>" + i);
            producer.send(textMessage);
        }
        /*6、关闭资源*/
        producer.close();
        session.close();
        connection.close();
        System.out.println("*****生产者生产完成*****");
    }
    

    控制台:

    Number Of Pending Messages:

      等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。

    Number Of Consumers:

      消费者数量,消费者端的消费者数量。

    Messages Enqueued:

      进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。

    Messages Dequeued:

      出队消息数,可以理解为是消费者消费掉的数量。

    总结:

      当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。

      当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。

      当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

    消息队列消费者

    案例代码:

    public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616";
    public static final String ACTIVE_NAME = "active1";
    public static void main(String[] args) throws Exception {
        /*1、创建连接工厂*/
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        /*2、打开连接*/
        final Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        /*3、创建Session会话:参数1:事务,参数2:签收*/
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        /*4、创建目的地:具体是队列还是主题topic*/
        Queue queue = session.createQueue(ACTIVE_NAME);
        final MessageConsumer consumer = session.createConsumer(queue);
        /*5、消费者消费方式一:订阅(同步阻塞)*/
        while (true) {
            final TextMessage receive = (TextMessage) consumer.receive(30000);
            if (receive != null) {
                System.out.println("*******消费者收到消息*******" + receive.getText());
            }else{
                break;
            }
        }
        consumer.close();
        session.close();
        connection.close();
        System.out.println("****消费者结束消费*****");
    }
    

    控制台:

     异步监听式消费者(MessageListener)

     1 public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616";
     2 public static final String ACTIVE_NAME = "active1";
     3 public static void main(String[] args) throws Exception {
     4     /*1、创建连接工厂*/
     5     ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
     6     /*2、打开连接*/
     7     final Connection connection = activeMQConnectionFactory.createConnection();
     8     connection.start();
     9     /*3、创建Session会话:参数1:事务,参数2:签收*/
    10     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    11     /*4、创建目的地:具体是队列还是主题topic*/
    12     Queue queue = session.createQueue(ACTIVE_NAME);
    13     final MessageConsumer consumer = session.createConsumer(queue);
    14     /*6、消费者消费方式二:监听器(异步非阻塞)*/
    15     consumer.setMessageListener(new MessageListener() {
    16         @Override
    17         public void onMessage(Message message) {
    18             if (message instanceof TextMessage) {
    19                 final TextMessage textMessage = (TextMessage) message;
    20                 try {
    21                     System.out.println("*******消费者收到消息:" + textMessage.getText());
    22                 } catch (JMSException e) {
    23                     e.printStackTrace();
    24                 }
    25             }
    26         }
    27     });
    28     System.in.read();
    29     consumer.close();
    30     session.close();
    31     connection.close();
    32     System.out.println("****消费者结束消费*****");
    33 }

    队列消息(Queue)总结

    两种消费方式:

    同步阻塞方式(receive())

      订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。

    异步非阻塞方式(监听器onMessage())

      订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

     队列的特点:

     

     消息消费情况

    情况1:只启动消费者1。

      结果:消费者1会消费所有的数据。

    情况2:先启动消费者1,再启动消费者2。

      结果:消费者1消费所有的数据。消费者2不会消费到消息。

    情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。

      结果:消费者1和消费者2平摊了消息。各自消费3条消息。

    疑问:怎么去将消费者1和消费者2不平均分摊呢?而是按照各自的消费能力去消费。我觉得,现在activemq就是这样的机制。

     

  • 相关阅读:
    内存泄露的小问题(转载)
    脚本错误:"return 语句不能在函数之外" (转载)
    转载:asp.net网页防刷新重复提交、防后退解决办法集!
    转载:一行代码搞定你的QueryString
    转载 PowerDesigner Name/Code自动调整
    无刷新仿google波形扭曲彩色Asp.net验证码
    转载:认识Web.config文件
    转载:将数据库从SQL2000迁移到SQL2005时,无法查看关系图的解决办法
    [转]WTL的windows mobile环境的配置(vs2008)[最终版,验证通过]
    [转].NET中Cache用法分析
  • 原文地址:https://www.cnblogs.com/zhangzhixi/p/15514557.html
Copyright © 2011-2022 走看看