zoukankan      html  css  js  c++  java
  • ActiveMQ Pub/Sub版的HelloWorld

    1. pom.xml

    这个和上一篇是一样的:

     
     
    1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    2.     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
    3.     <modelVersion>4.0.0</modelVersion>  
    4.   
    5.     <groupId>org.ygy</groupId>  
    6.     <artifactId>activemq</artifactId>  
    7.     <version>0.0.1-SNAPSHOT</version>  
    8.     <packaging>jar</packaging>  
    9.   
    10.     <name>activemq</name>  
    11.     <url>http://maven.apache.org</url>  
    12.   
    13.     <properties>  
    14.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
    15.     </properties>  
    16.   
    17.     <dependencies>  
    18.         <dependency>  
    19.             <groupId>junit</groupId>  
    20.             <artifactId>junit</artifactId>  
    21.             <version>4.10</version>  
    22.             <scope>test</scope>  
    23.         </dependency>  
    24.   
    25.         <!-- activemq,学习中 -->  
    26.         <dependency>  
    27.             <groupId>org.apache.activemq</groupId>  
    28.             <artifactId>activemq-core</artifactId>  
    29.             <version>5.7.0</version>  
    30.         </dependency>  
    31.   
    32.         <dependency>  
    33.             <groupId>org.slf4j</groupId>  
    34.             <artifactId>slf4j-api</artifactId>  
    35.             <version>1.5.6</version>  
    36.         </dependency>  
    37.   
    38.         <dependency>  
    39.             <groupId>org.slf4j</groupId>  
    40.             <artifactId>slf4j-log4j12</artifactId>  
    41.             <version>1.5.6</version>  
    42.         </dependency>  
    43.   
    44.   
    45.     </dependencies>  
    46. </project>  

    2. Pub/Sub版的HelloWorld

    生产者:

     
     
    1. package org.ygy.mq.lesson01;  
    2.   
    3. import javax.jms.Connection;  
    4. import javax.jms.ConnectionFactory;  
    5. import javax.jms.DeliveryMode;  
    6. import javax.jms.Destination;  
    7. import javax.jms.JMSException;  
    8. import javax.jms.MessageProducer;  
    9. import javax.jms.Session;  
    10. import javax.jms.TextMessage;  
    11.   
    12. import org.apache.activemq.ActiveMQConnectionFactory;  
    13. import org.ygy.mq.constants.MQConstants;  
    14.   
    15. public class HelloTopicProducer {  
    16.   
    17.     public void send(String msg) {  
    18.         // 生产者的主要流程  
    19.         Connection connection = null;  
    20.   
    21.         try {  
    22.             // 1.初始化connection工厂,使用默认的URL  
    23.             // failover://tcp://localhost:61616  
    24.             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();  
    25.   
    26.             // 2.创建Connection  
    27.             connection = connectionFactory.createConnection();  
    28.   
    29.             // 3.打开连接  
    30.             connection.start();  
    31.   
    32.             // 4.创建Session,(是否支持事务)  
    33.             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    34.   
    35.             // 5.创建消息目标  
    36.             Destination destination_send = session.createTopic(MQConstants.DESTINATION_SEND);  
    37.   
    38.             // 6.创建生产者  
    39.             MessageProducer producer = session.createProducer(destination_send);  
    40.   
    41.             // 7.配置消息是否持久化  
    42.             /* 
    43.              * DeliverMode有2种方式: 
    44.              *  
    45.              * public interface DeliveryMode { static final int NON_PERSISTENT = 
    46.              * 1;//不持久化:服务器重启之后,消息销毁 
    47.              *  
    48.              * static final int PERSISTENT = 2;//持久化:服务器重启之后,该消息仍存在 } 
    49.              */  
    50.             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
    51.   
    52.             // 8.初始化要发送的消息  
    53.             TextMessage message = session.createTextMessage(msg);  
    54.   
    55.             // 9.发送消息  
    56.             producer.send(message);  
    57.               
    58.             connection.close();  
    59.   
    60.         } catch (JMSException e) {  
    61.             e.printStackTrace();  
    62.         }  
    63.     }  
    64.   
    65.     public static void main(String[] args) {  
    66.         new HelloTopicProducer().send("我来试一试发布/订阅...");  
    67.     }  
    68.   
    69. }  


    消费者:

     
     
    1. package org.ygy.mq.lesson01;  
    2.   
    3. import javax.jms.Connection;  
    4. import javax.jms.ConnectionFactory;  
    5. import javax.jms.Destination;  
    6. import javax.jms.JMSException;  
    7. import javax.jms.Message;  
    8. import javax.jms.MessageConsumer;  
    9. import javax.jms.MessageListener;  
    10. import javax.jms.Session;  
    11. import javax.jms.TextMessage;  
    12.   
    13. import org.apache.activemq.ActiveMQConnectionFactory;  
    14. import org.ygy.mq.constants.MQConstants;  
    15.   
    16. public class HelloTopicConsumer implements MessageListener {  
    17.   
    18.     @Override  
    19.     public void onMessage(Message message) {  
    20.         if (message instanceof TextMessage) {  
    21.             TextMessage txtMsg = (TextMessage) message;  
    22.   
    23.             try {  
    24.                 System.out.println("哈,我接收到了消息:" + txtMsg.getText());  
    25.             } catch (JMSException e) {  
    26.                 e.printStackTrace();  
    27.             }  
    28.   
    29.         }  
    30.     }  
    31.   
    32.     public void receive() {  
    33.         // 消费者的主要流程  
    34.         Connection connection = null;  
    35.   
    36.         try {  
    37.             // 1.初始化connection工厂  
    38.             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();  
    39.   
    40.             // 2.创建Connection  
    41.             connection = connectionFactory.createConnection();  
    42.   
    43.             // 3.打开连接  
    44.             connection.start();  
    45.   
    46.             // 4.创建session  
    47.             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    48.   
    49.             // 5.创建消息目标  
    50.             Destination destination = session.createTopic(MQConstants.DESTINATION_SEND);  
    51.   
    52.             // 6.创建消费者  
    53.             MessageConsumer consumer = session.createConsumer(destination);  
    54.   
    55.             // 7.配置监听  
    56.             consumer.setMessageListener(new HelloTopicConsumer());  
    57.   
    58.         } catch (JMSException e) {  
    59.             e.printStackTrace();  
    60.         }  
    61.     }  
    62.   
    63.     public static void main(String[] args) {  
    64.         new HelloTopicConsumer().receive();  
    65.     }  
    66.   
    67. }  

    3.测试

    访问网页:http://localhost:8161/admin/topics.jsp

    单击那个Topics连接。

    这里显示的是服务器上的主题,这些显示的都没有用,可以都删掉。

    Name:主题的名称

    Number Of Consumers:正在运行的消费者

    Message Enqueued:进入消息队列的

    Message Dequeued:出消息队列的

    Operations:操作

    下面就可以开始运行程序了,

    注意顺序:先运行消费者:

    这里会产生好几个主题,我们只看我们自己用的那个,(其实,其他几个是干嘛的,暂时还不清楚,以后再研究吧.....)

    我们的消费者一直在运行

    接下来,运行生产者:

    控制台会输出:

    再一次,刷新界面:

    消费者还在运行,只生产了一条消息,而且已经被消费了。

  • 相关阅读:
    [array] leetcode
    [array] leetcode
    [array] leetcode
    无法将“Scaffold-DbContext”项识别为 cmdlet、函数、脚本文件或可运行程序的名称...
    远程桌面报错解决:No Remote Desktop License Servers Available
    linux设置开机自启动
    阿里云ECS服务器环境搭建 ubuntu 16.04 图形界面的安装
    VS C#程序打包覆盖安装不能更新的解决方法
    MySql EF6 DBFirst 向导无法生成 edmx 解决方法(同:您的项目引用了最新实体框架;但是,找不到数据链接所需的与版本兼容的实体框架数据库提供程序)
    "docker build" requires exactly 1 argument(s).
  • 原文地址:https://www.cnblogs.com/wangshouchang/p/8474076.html
Copyright © 2011-2022 走看看