zoukankan      html  css  js  c++  java
  • ActiveMQ——activemq的使用java代码实例(精选)

    ActiveMQ 在java中的使用,通过单例模式、工厂实现

    Jms规范里的两种message传输方式Topic和Queue,两者的对比如下表():

      Topic Queue
    概要 Publish  Subscribe messaging 发布订阅消息 Point-to-Point  点对点
    有无状态 topic数据默认不落地,是无状态的。

    Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOMEdatakr-storedata下面。也可以配置成DB存储。

    完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。
    消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。
    消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

    一、导jar包

     activemq的依赖包

    <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>4.12</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>jul-to-slf4j</artifactId>
                <version>1.6.1</version>
            </dependency>
    
    <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.13.3</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>4.3.1.RELEASE</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-messaging</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-context</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-beans</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-aop</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-tx</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-core</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>

    二、java代码

     创建一下四个java文件,成为mq的公共数据连接池

    1、连接工厂 配置

    package com.broadsense.iov.base.jms;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.jms.connection.CachingConnectionFactory;
    /**
     * 连接工厂 配置
     * 
     * @author flm
     * 2017年10月13日
     */
    public class ConnectionFactory
    {
      private static final String URL = "tcp://10.10.1.1:61616";
      private static final String USERNAME = "hkadmin";
      private static final String PASSWORD = "hk667";
      private static final int SESSIONCACHESIZE = 20;
      private javax.jms.ConnectionFactory factory;
    
      public static synchronized javax.jms.ConnectionFactory getInstance()
      {
        if (SingletonHolder.INSTANCE.factory == null) {
          SingletonHolder.INSTANCE.build();
        }
        return SingletonHolder.INSTANCE.factory;
      }
    
      private void build()
      {
        AMQConfigBean bean = loadConfigure();
        this.factory = buildConnectionFactory(bean);
      }
    
      private javax.jms.ConnectionFactory buildConnectionFactory(AMQConfigBean bean) {
        javax.jms.ConnectionFactory targetFactory = new ActiveMQConnectionFactory(bean.getUserName(), bean.getPassword(), bean.getBrokerURL());
    
        CachingConnectionFactory connectoryFacotry = new CachingConnectionFactory();
        connectoryFacotry.setTargetConnectionFactory(targetFactory);
        connectoryFacotry.setSessionCacheSize(bean.getSessionCacheSize());
    
        return connectoryFacotry;
      }
    
      private AMQConfigBean loadConfigure() {
        if ("tcp://10.10.1.1:61616" != null) {
          try {
            return new AMQConfigBean("tcp://10.10.1.1:61616", "hkadmin", "hk667", 20);
          } catch (Exception e) {
            throw new IllegalStateException("load amq config error!");
          }
        }
        throw new IllegalStateException("load amq config error!");
      }
    
      private static class AMQConfigBean
      {
        private String brokerURL;
        private String userName;
        private String password;
        private int sessionCacheSize;
    
        public AMQConfigBean() {
        }
    
        public AMQConfigBean(String brokerURL, String userName, String password, int sessionCacheSize) {
          this.brokerURL = brokerURL;
          this.userName = userName;
          this.password = password;
          this.sessionCacheSize = sessionCacheSize;
        }
    
        public String getBrokerURL() {
          return this.brokerURL;
        }
    
        public void setBrokerURL(String brokerURL) {
          this.brokerURL = brokerURL;
        }
    
        public String getUserName() {
          return this.userName;
        }
    
        public void setUserName(String userName) {
          this.userName = userName;
        }
    
        public String getPassword() {
          return this.password;
        }
    
        public void setPassword(String password) {
          this.password = password;
        }
    
        public int getSessionCacheSize() {
          return this.sessionCacheSize;
        }
    
        public void setSessionCacheSize(int sessionCacheSize) {
          this.sessionCacheSize = sessionCacheSize;
        }
      }
    
      private static class SingletonHolder
      {
        static ConnectionFactory INSTANCE = new ConnectionFactory(null);
      }
    }

    2、模版

    package com.broadsense.iov.base.jms;
    
    import org.springframework.jms.core.JmsTemplate;

    /**
    * 模板厂
    *
    * @author flm
    * 2017年10月13日
    */

    public class JmsTemplateFactory
    {
      private final javax.jms.ConnectionFactory factory;
      private JmsTemplate topicJmsTemplate;
      private JmsTemplate queueJmsTemplate;
    
      public static JmsTemplateFactory getInstance()
      {
        return SingletonHolder.INSTANCE;
      }
    
      private JmsTemplateFactory()
      {
        this.factory = ConnectionFactory.getInstance();
      }
    
      public synchronized JmsTemplate getTopicJmsTemplate() {
        if (this.topicJmsTemplate == null) {
          this.topicJmsTemplate = createTemplate(this.factory, true);
        }
        return this.topicJmsTemplate;
      }
    
      public synchronized JmsTemplate getQueueJmsTemplate() {
        if (this.queueJmsTemplate == null) {
          this.queueJmsTemplate = createTemplate(this.factory, false);
        }
        return this.queueJmsTemplate;
      }
    
      private JmsTemplate createTemplate(javax.jms.ConnectionFactory factory, boolean pubSubDomain) {
        JmsTemplate template = new JmsTemplate(factory);
        template.setPubSubDomain(pubSubDomain);
        return template;
      }
    
      public static class SingletonHolder
      {
        static JmsTemplateFactory INSTANCE = new JmsTemplateFactory(null);
      }
    }

    3、消费者 模版

    package com.broadsense.iov.base.jms;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import javax.jms.Destination;
    import javax.jms.MessageListener;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.listener.SimpleMessageListenerContainer;
    /**
     * JMS监听器  创建消费者
     * 
     * @author flm
     * 2017年10月13日
     */
    public class JMSListener
    {
      private static final Logger LOGGER = LoggerFactory.getLogger(JMSListener.class);
      private static final Map<String, Destination> MQDESTS = new ConcurrentHashMap();
    

      

      /**
      * 开启一个 点对点的 消息队列监听 的消费者
      *
      * @param queueName 队列名称
      * @param subName 订阅者的名字
      * @param listener 监听
      */

       public static synchronized void startJmsQueueListener(String queueName, MessageListener listener)

      {
        startJmsQueueListener(queueName, null, listener);
      }
      public static synchronized void startJmsQueueListener(String queueName, String subName, MessageListener listener) {
        Destination dst = (Destination)MQDESTS.get("QUEUE_" + queueName);
        if (dst == null) {
          ActiveMQQueue mq = new ActiveMQQueue(queueName);
          startJmsListener(mq, subName, listener);
          MQDESTS.put("QUEUE_" + queueName, mq);
        } else {
          LOGGER.warn(queueName + " already started");
        }
      }
    

      /**
      * 开启 一对多 主题的 消息监听的消费者
      *
      * @param topicName 主题消息名称
      * @param subName 订阅者的名字
      * @param listener 监听
      */

    public static synchronized void startJmsTopicListener(String topicName, MessageListener listener)
      {
        startJmsTopicListener(topicName, null, listener);
      }
    
      public static synchronized void startJmsTopicListener(String topicName, String subName, MessageListener listener) {
        ActiveMQTopic mq = new ActiveMQTopic(topicName);
        startJmsListener(mq, subName, listener);
        MQDESTS.put("QUEUE_" + topicName, mq);
      }
    
      

      /**
      * 开始 消息监听器 消费者
      *
      * @param dest 目的地
      * @param subName 持久订阅的名字
      * @param msgListener 消息监听器
      */

    private static void startJmsListener(Destination dest, String subName, MessageListener msgListener)
      {
        javax.jms.ConnectionFactory factory = ConnectionFactory.getInstance();
    
        SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();
        listener.setConnectionFactory(factory);
        listener.setDestination(dest);
        listener.setMessageListener(msgListener);
        if ((subName != null) && (subName != "")) {
          listener.setDurableSubscriptionName(subName);
        }
        listener.start();
      }
    }

    4、生产者 模版

    package com.broadsense.iov.base.jms;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    
    /**
     * 创建 jms生产者
     * 
     * @author flm
     * 2017年10月13日
     */
    public class JMSPublisher
    {

      


      /**
      * 发送消息
      * Topic 生产者
      *
      * @param dest 目的地
      * @param msg 消息内容
      */

    public static void sendTopicMessage(String dest, String msg)
      {
        JmsTemplateFactory.getInstance().getTopicJmsTemplate().send(dest, new MessageCreator(msg)
        {
          public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(this.val$msg);
          }
        });
      }
    

      /**
      * 发送消息
      * Queue 生产者
      *
      * @param dest 目的地
      * @param msg 消息内容
      */

    public static void sendQueueMessage(String dest, String msg)
      {
        JmsTemplateFactory.getInstance().getQueueJmsTemplate().send(dest, new MessageCreator(msg)
        {
          public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(this.val$msg);
          }
        });
      }
    }

    三、activemq的使用

    1、创建一个junit测试,@Test 发布、接受、即可看到消息,mq管理后台也可以看到

    package com.broadsense.iov.base.jms;
    
    import com.broadsense.iov.base.jms.JMSListener;
    import com.broadsense.iov.base.jms.JMSPublisher;
    import java.io.IOException;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    import org.junit.Test;
    
    /**
     *
     * @author flm
     */
    public class JMSPublisherTest {
    
        public JMSPublisherTest() {
        }
        
        /**
         * 生产者 发布消息
         * @throws 
         */
        @Test
        public void testSendMessage() throws InterruptedException {
            for (int idx = 1; idx < 3; idx++) {
                
                /*
                 * 生产者 发布 消息到 queue/queue_b 的队列中
                 */
                JMSPublisher.sendQueueMessage("queue/queue_b", String.valueOf(idx * 1111));
                
                
                /*
                 * 生产者 发布消息 到  topic/send 的Topic 主题中 
                 */
                //JMSPublisher.sendTopicMessage("topic/send", String.valueOf(idx * 1111));
            }
        }
    
        
        /**
         * 消费者 订阅接受消息
         */
        @Test
        public void receiver() {
            /*
             * 消费者 订阅主题  topic/send 是否有消息发布,有侧打印出来  (通过 onMessage 监听)
             */
            /*JMSListener.startJmsTopicListener("topic/send", new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage msg = (TextMessage) message;
                            System.out.println("== 收到一个JMS消息..." + msg.getText());
                        } 
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });*/
            
            /*
             * 消费者 订阅队列  queue/queue_b 是否有消息发布,有侧打印出来  (通过 onMessage 监听)
             */
           JMSListener.startJmsQueueListener("queue/queue_b" ,new MessageListener() {
                @Override
                 public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage msg = (TextMessage) message;
                            System.out.println("== 收到一个JMS消息..." + msg.getText());
                        } 
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            try {
                System.in.read();
            } catch (IOException ex) {
                Logger.getLogger(JMSPublisherTest.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
       
    }
    View Code

     2、真正的项目实现

    在项目的中具体实现,是加载一个类来实现订阅消息

    加载启动一个订阅的主题,给一个类MQ()处理

    package com.ifengSearch.track.dao;
    
    import org.springframework.stereotype.Repository;
    
    import com.broadsense.iov.base.jms.JMSListener;
    
    /**
     * 项目启动即 开启
     * 通过 spring 依赖加载 Lister 订阅topic/send
     * @author flm
     * @2017年10月16日
     */
    @Repository
    public class Lister {
        public Lister(){
            try {
                JMSListener.startJmsTopicListener("topic/send",new QM());// QM() 订阅 主题  topic/send
            } catch (Exception e) {
            }
        }
    }
    View Code

    MQ()订阅消息的处理类,通过实现

    package com.ifengSearch.track.dao;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 通过 实现 MessageListener 的 onMessage 来监听消息 
     * 接受、处理消息
     * @author flm
     * @2017年10月16日
     */
    public class MQ implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    TextMessage msg = (TextMessage) message;
                    System.out.println("== 收到一个JMS消息..." + msg.getText());
                } 
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    View Code
  • 相关阅读:
    INT 3 中断调试处理流程
    调试事件的处理结束
    [IDA] 自动下载符号
    06 内存断点
    05 软件断点
    C#2.0新增功能02 泛型
    C#2.0新增功能01 分部类与分部方法
    xBIM 实战04 在WinForm窗体中实现IFC模型的加载与浏览
    xBIM 实战03 使用WPF技术实现IFC模型的加载与浏览
    xBIM 实战02 在浏览器中加载IFC模型文件并设置特效
  • 原文地址:https://www.cnblogs.com/lemon-flm/p/7668076.html
Copyright © 2011-2022 走看看