zoukankan      html  css  js  c++  java
  • ActiveMQ入门操作示例

    1. Queue

    1.1 Producer

      生产者:生产消息,发送端。

      把jar包添加到工程中。

      

    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    第二步:使用ConnectionFactory对象创建一个Connection对象。
    第三步:开启连接,调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
    第六步:使用Session对象创建一个Producer对象。
    第七步:创建一个Message对象,创建一个TextMessage对象。
    第八步:使用Producer对象发送消息。
    第九步:关闭资源。
     1     /**
     2      * 点到点形式发送消息
     3      */
     4     @Test
     5     public void testQueueProducer() throws Exception{
     6         //1、创建一个连接工厂对象,需要指定服务的ip及端口。
     7         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.83.102:61616");
     8         //2、使用工厂对象创建一个Connection对象。
     9         Connection connection = connectionFactory.createConnection();
    10         //3、开启连接,调用Connection对象的start方法。
    11         connection.start();
    12         //4、创建一个Session对象。
    13         //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
    14         //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
    15         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    16         //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
    17         Queue queue = session.createQueue("test-queue");
    18         //6、使用Session对象创建一个Producer对象。
    19         MessageProducer producer = session.createProducer(queue);
    20         //7、创建一个Message对象,可以使用TextMessage。
    21         /*TextMessage textMessage = new ActiveMQTextMessage();
    22         textMessage.setText("hello Activemq");*/
    23         TextMessage textMessage = session.createTextMessage("hello activemq");
    24         //8、发送消息
    25         producer.send(textMessage);
    26         //9、关闭资源
    27         producer.close();
    28         session.close();
    29         connection.close();
    30     }

      

    1.2. Consumer

      消费者:接收消息。

    第一步:创建一个ConnectionFactory对象。
    第二步:从ConnectionFactory对象中获得一个Connection对象。
    第三步:开启连接。调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
    第六步:使用Session对象创建一个Consumer对象。
    第七步:接收消息。
    第八步:打印消息。
    第九步:关闭资源
     1     @Test
     2     public void testQueueConsumer() throws Exception {
     3         //创建一个ConnectionFactory对象连接MQ服务器
     4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.83.102:61616");
     5         //创建一个连接对象
     6         Connection connection = connectionFactory.createConnection();
     7         //开启连接
     8         connection.start();
     9         //使用Connection对象创建一个Session对象
    10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    11         //创建一个Destination对象。queue对象
    12         Queue queue = session.createQueue("test-queue");
    13         //使用Session对象创建一个消费者对象。
    14         MessageConsumer consumer = session.createConsumer(queue);
    15         //接收消息
    16         consumer.setMessageListener(new MessageListener() {
    17 
    18             @Override
    19             public void onMessage(Message message) {
    20                 //打印结果
    21                 TextMessage textMessage = (TextMessage) message;
    22                 String text;
    23                 try {
    24                     text = textMessage.getText();
    25                     System.out.println(text);
    26                 } catch (JMSException e) {
    27                     e.printStackTrace();
    28                 }
    29 
    30             }
    31         });
    32         //等待接收消息
    33         System.in.read();
    34         //关闭资源
    35         consumer.close();
    36         session.close();
    37         connection.close();
    38     }

      

      

    2. Topic

    2.1.    Producer

      使用步骤:

    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    第二步:使用ConnectionFactory对象创建一个Connection对象。
    第三步:开启连接,调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
    第六步:使用Session对象创建一个Producer对象。
    第七步:创建一个Message对象,创建一个TextMessage对象。
    第八步:使用Producer对象发送消息。
    第九步:关闭资源。
     1     @Test
     2     public void testTopicProducer() throws Exception {
     3         //1、创建一个连接工厂对象,需要指定服务的ip及端口。
     4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.83.102:61616");
     5         //2、使用工厂对象创建一个Connection对象。
     6         Connection connection = connectionFactory.createConnection();
     7         //3、开启连接,调用Connection对象的start方法。
     8         connection.start();
     9         //4、创建一个Session对象。
    10         //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
    11         //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
    12         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    13         //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
    14         Topic topic = session.createTopic("test-topic");
    15         //6、使用Session对象创建一个Producer对象。
    16         MessageProducer producer = session.createProducer(topic);
    17         //7、创建一个Message对象,可以使用TextMessage。
    18         /*TextMessage textMessage = new ActiveMQTextMessage();
    19         textMessage.setText("hello Activemq");*/
    20         TextMessage textMessage = session.createTextMessage("topic message");
    21         //8、发送消息
    22         producer.send(textMessage);
    23         //9、关闭资源
    24         producer.close();
    25         session.close();
    26         connection.close();
    27     }

      

    2.2. Consumer

    消费者:接收消息。

    第一步:创建一个ConnectionFactory对象。
    第二步:从ConnectionFactory对象中获得一个Connection对象。
    第三步:开启连接。调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
    第六步:使用Session对象创建一个Consumer对象。
    第七步:接收消息。
    第八步:打印消息。
    第九步:关闭资源。 

    Topic模式接收消息消费者必须处于启动状态。

     1    @Test
     2     public void testTopicConsumer() throws Exception {
     3         //创建一个ConnectionFactory对象连接MQ服务器
     4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.83.102:61616");
     5         //创建一个连接对象
     6         Connection connection = connectionFactory.createConnection();
     7         //开启连接
     8         connection.start();
     9         //使用Connection对象创建一个Session对象
    10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    11         //创建一个Destination对象。topic对象
    12         Topic topic = session.createTopic("test-topic");
    13         //使用Session对象创建一个消费者对象。
    14         MessageConsumer consumer = session.createConsumer(topic);
    15         //接收消息
    16         consumer.setMessageListener(new MessageListener() {
    17 
    18             @Override
    19             public void onMessage(Message message) {
    20                 //打印结果
    21                 TextMessage textMessage = (TextMessage) message;
    22                 String text;
    23                 try {
    24                     text = textMessage.getText();
    25                     System.out.println(text);
    26                 } catch (JMSException e) {
    27                     e.printStackTrace();
    28                 }
    29 
    30             }
    31         });
    32         System.out.println("topic消费者1启动。。。。");
    33         //等待接收消息
    34         System.in.read();
    35         //关闭资源
    36         consumer.close();
    37         session.close();
    38         connection.close();
    39     }

      

    总结:

      Queue(点对点):若消息没有被接收,服务器会一直保存

      Topic(广播):不会持久化,只负责发送

        若想要持久化保存消息,需要进行订阅。

  • 相关阅读:
    vue-cli element axios sass 安装
    Rem-- ui图与适配手机的px换算
    REM
    Canvas画半圆扇形统计图
    在vue中安装SASS
    在vue中引用.SCSS的公用文件
    Echart--折线图手柄触发事件
    vue 脚手架webpack打包后,解决能看到vue源码的问题
    js 函数 /变量/ 以及函数中形参的预解析的顺序
    【算法日记】4.递归和快速排序
  • 原文地址:https://www.cnblogs.com/116970u/p/11415143.html
Copyright © 2011-2022 走看看