zoukankan      html  css  js  c++  java
  • activeMQ "HelloWorld"实现

    本文主要介绍activeMQ在应用程序中是如何使用的,同个两个实例进行说明,这两个实例分别针对P2P模式和Pub/Sub模式。

    开发环境

    • 操作系统:Ubuntu 16.10
    • 开发平台:Eclipse Neon Release (4.6.0)
    • ActiveMQ版本:apache-activemq-5.14.3

      具体的环境下载与配置这里就不在详细描述啦

    项目建立与实现

      先为大家展示以下项目最后的结构图:

    操作步骤

    1. 在Eclipse中新建一个最基本的Java Project,本项目命名为“activeMQHelloWorld”
    2. 在项目根目录下建立文件夹libs,并将activemq-all-5.14.3.jar依赖包复制到文件夹中
    3. 通过 JavaBuildPath 中的libraries将依赖包引入项目中

      到目前位置项目框架搭建完毕(简单容易吧)

      分别实现P2P消息模型和Pub/Sub消息模型,首先实现P2P消息模型:

      P2P消息模型

        编写生产者代码QueueProducer.java如下:

     1 package com.unionpay.activemq;
     2 
     3 import javax.jms.Connection;
     4 import javax.jms.ConnectionFactory;
     5 import javax.jms.Destination;
     6 import javax.jms.MessageProducer;
     7 import javax.jms.Session;
     8 import javax.jms.TextMessage;
     9 
    10 import org.apache.activemq.ActiveMQConnection;
    11 import org.apache.activemq.ActiveMQConnectionFactory;
    12 
    13 public class QueueProducer {
    14     
    15     //默认连接用户
    16     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    17     //默认连接密码
    18     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    19     //默认连接地址
    20     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    21     
    22     private static final int SENDNUM = 10;
    23     
    24     public static void main(String[] args){
    25         //连接工厂
    26         ConnectionFactory connectionFactory;
    27         //连接
    28         Connection connection = null;
    29         //会话,接受或者发送消息的线程
    30         Session session;
    31         //消息的目的地
    32         Destination destination;
    33         //消息生产者
    34         MessageProducer messageProducer;
    35         
    36         //实例化连接工厂
    37         connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
    38         
    39         try{
    40             //通过连接工厂获取连接
    41             connection = connectionFactory.createConnection();
    42             //启动连接
    43             connection.start();
    44             //创建session,第一个参数true表示支持事物,false表示不支持事物,Session.AUTO_ACKKNOWLEDGE
    45             //表示自动确认,客户端发送和接受消息不需要做额外的工作
    46             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    47             //创建一个名为HelloWorld的消息队列
    48             destination = session.createQueue("QueueTest");
    49             //创建消息生产者
    50             messageProducer = session.createProducer(destination);
    51             //发送消息
    52             sendMessage(session,messageProducer);
    53             //提交消息
    54             session.commit();
    55             session.close();
    56         }catch(Exception e){
    57             e.printStackTrace();
    58         }finally{
    59             if(connection != null){
    60                 try{
    61                     connection.close();
    62                 }catch(Exception e){
    63                     e.printStackTrace();
    64                 }
    65             }
    66         }
    67     }
    68     
    69     /**
    70      * 发送消息
    71      * @param session
    72      * @param messageProducer
    73      * @throws Exception
    74      */
    75     public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
    76         for(int i=0;i<SENDNUM;i++){
    77             //创建一条文笔消息
    78             TextMessage message = session.createTextMessage("ActiveMQ 发送消息"+ i);
    79             System.out.println("发送消息:Activemq发送消息" + i);
    80             
    81             messageProducer.send(message);
    82         }
    83     }
    84 }

      编写消费者代码QueueConsumer.java代码如下:

     1 package com.unionpay.activemq;
     2 
     3 import javax.jms.Connection;
     4 import javax.jms.ConnectionFactory;
     5 import javax.jms.Destination;
     6 import javax.jms.JMSException;
     7 import javax.jms.MessageConsumer;
     8 import javax.jms.Session;
     9 import javax.jms.TextMessage;
    10 
    11 import org.apache.activemq.ActiveMQConnection;
    12 import org.apache.activemq.ActiveMQConnectionFactory;
    13 
    14 /**
    15  * @author jxwch
    16  *
    17  */
    18 public class QueueConsummer {
    19 
    20     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    21 
    22     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    23 
    24     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    25 
    26     public static void main(String[] args) {
    27 
    28         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
    29         try {
    30             Connection connection = connectionFactory.createConnection();
    31 
    32             connection.start();
    33 
    34             Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    35 
    36             Destination destination = session.createQueue("QueueTest");
    37 
    38             MessageConsumer messageConsumer = session.createConsumer(destination);
    39 
    40             while (true) {
    41                 //100000代表100000毫秒
    42                 TextMessage message = (TextMessage) messageConsumer.receive(100000);
    43                 if (message != null) {
    44                     System.out.println("收到消息:" + message.getText());
    45                 } else {
    46                     break;
    47                 }
    48             }
    49         } catch (JMSException e) {
    50             e.printStackTrace();
    51         }
    52     }
    53 
    54 }

      到此,P2P模型代码已经全部编写完成,可以测试喽:

      当然,我们要测试activeMQ,那么首先一定要启动服务器:

    1 cd apache-activemq-5.14.3/bin
    2 bash activemq start

      通过访问自带监控应用查看服务器是否启动正常:http://127.0.0.1:8161/admin/

      若服务器运行正常,首先在Eclipse中运行QueueProducer.java,终端打印出如下信息:

      此时查看监控程序页面,点击“Queue”出现如下信息:

      从截图中我们可以看到,在Queue消息中,有一个Name为QueueTest的消息队列,其中“Number Of Pending Message”表示队列中存在10条消息,“Message Enqueued” 表示有10条消息正在排队。通过点击Views中的Browser可以查看队列中的消息:

      并且可以通过Delete对这些消息进行删除操作。

      下面我们继续运行QueueConsumer.java,终端打印如下信息:

      Pub/Sub 模型

      首先编写Publisher端文件TopicProducer.java:

     1 package com.unionpay.activemq;
     2 
     3 import javax.jms.Connection;
     4 import javax.jms.ConnectionFactory;
     5 import javax.jms.DeliveryMode;
     6 import javax.jms.Destination;
     7 import javax.jms.JMSException;
     8 import javax.jms.MapMessage;
     9 import javax.jms.MessageProducer;
    10 import javax.jms.Session;
    11 import javax.jms.Topic;
    12 
    13 import org.apache.activemq.ActiveMQConnection;
    14 import org.apache.activemq.ActiveMQConnectionFactory;
    15 
    16 public class TopicProducer {
    17 
    18     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    19     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    20     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    21     private static final int SENDNUM = 10;
    22 
    23     public static void main(String[] args) {
    24 
    25         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
    26         Connection connection = null;
    27         Session session = null;
    28         Topic  topic = null;
    29         MessageProducer messageProducer = null;
    30         try {
    31             connection = connectionFactory.createConnection();
    32 
    33             connection.start();
    34 
    35             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    36 
    37             topic = session.createTopic("NEWS");
    38 
    39             messageProducer = session.createProducer(topic);
    40             
    41             messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    42 
    43             for (int i = 0; i < SENDNUM; i++) {
    44                 MapMessage mapMessage = session.createMapMessage();
    45 
    46                 mapMessage.setLong("count", i);
    47 
    48                 messageProducer.send(mapMessage);
    49 
    50                 System.out.println("发布者发布消息:" + i);
    51 
    52                 session.commit();
    53             }
    54         } catch (JMSException e) {
    55             // TODO Auto-generated catch block
    56             e.printStackTrace();
    57         } finally {
    58             if (session != null) {
    59                 try {
    60                     session.close();
    61                 } catch (JMSException e) {
    62                     e.printStackTrace();
    63                 }
    64             }
    65 
    66             if (connection != null) {
    67                 try {
    68                     connection.close();
    69                 } catch (JMSException e) {
    70                     e.printStackTrace();
    71                 }
    72             }
    73         }
    74     }
    75 }

      然后编辑Subscriber端文件TopicConsumer.java:

    package com.unionpay.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TopicConsumer {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
        
        public static void main(String[] args){
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
            
            Connection connection = null;
            try {
                connection = connectionFactory.createConnection();
                
                connection.start();
                
                Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                
                Topic topic  = session.createTopic("NEWS");
                
                MessageConsumer messageConsumer = session.createConsumer(topic);
                
                while(true){
                    
                    MessageListener messageListener = new MessageListener(){
    
                        @Override
                        public void onMessage(Message message) {
                            // TODO Auto-generated method stub
                            MapMessage mapMessage = null;
                            try{
                                mapMessage = (MapMessage)message;
                                
                                System.out.println("Receiver Message:" + mapMessage.getLong("count"));
                            }catch(JMSException e){
                                e.printStackTrace();
                            }                    
                        }    
                    };
                    messageConsumer.setMessageListener(messageListener);
                }
                
                
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally{
                if(connection != null){
                    try{
                        connection.close();
                    }catch(JMSException e){
                        e.printStackTrace();
                    }
                }
            }
        }
    
    }

      到此,Pub/Sub模型代码编写完毕,下面运行TopicProducer.java文件,终端打印出如下信息:

      然后查看监控程序,点击“Topics”:

      从截图中我们可以看出Name中多了一个NEWS主题,并且Messages Enqueued为10。

      然后运行客户端TopicConsumer.java文件,终端显示如下:

      从截图中我们并没有发现客户端消费了消息,这是为啥呢?

      因为在Pub/Sub模型中,发布者和订阅者有时间上的依赖性,针对某个主题,必须先创建订阅者,然后才能发布消息,这样才能保证订阅者可以收到消息。

      重新运行TopicConsumer.java文件,就可以看见消费信息了:

      至此,两种模式已经全部介绍完毕。

      源码:activeMQHelloWorld.zip

    参考文献

    1. ubuntu下简单使用activemq
  • 相关阅读:
    android 扇形菜单
    Gradle 教程:第一部分,安装【翻译】
    Android Studio 创建虚拟机失败 Failed to load 解决办法
    【翻译】Best Practices for User interface android 适配不同屏幕、不同分辨率
    [转]QT 4.8 静态库编译方法
    使用windows资源管理器打开jar
    一步步理解typedef
    模拟Chrome皮肤
    FAT12格式的引导区实现
    Android-获取Html元素
  • 原文地址:https://www.cnblogs.com/jxwch/p/6495079.html
Copyright © 2011-2022 走看看