ActiveMQ 在java中的使用,通过单例模式、工厂实现
Jms规范里的两种message传输方式Topic和Queue,两者的对比如下表():
Topic | Queue | |
概要 | Publish Subscribe messaging 发布订阅消息 | Point-to-Point 点对点 |
有无状态 | topic数据默认不落地,是无状态的。 |
Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOMEdatakr-storedata下面。也可以配置成DB存储。 |
完整性保障 | 并不保证publisher发布的每条数据,Subscriber都能接受到。 | Queue保证每条数据都能被receiver接收。 |
消息是否会丢失 | 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 | Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。 |
消息发布接收策略 | 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 | 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。 |
一、导jar包
activemq的依赖包
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.1.RELEASE</version> <exclusions> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </exclusion> </exclusions> </dependency>
二、java代码
创建一下四个java文件,成为mq的公共数据连接池
1、连接工厂 配置
package com.broadsense.iov.base.jms; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.jms.connection.CachingConnectionFactory; /** * 连接工厂 配置 * * @author flm * 2017年10月13日 */ public class ConnectionFactory { private static final String URL = "tcp://10.10.1.1:61616"; private static final String USERNAME = "hkadmin"; private static final String PASSWORD = "hk667"; private static final int SESSIONCACHESIZE = 20; private javax.jms.ConnectionFactory factory; public static synchronized javax.jms.ConnectionFactory getInstance() { if (SingletonHolder.INSTANCE.factory == null) { SingletonHolder.INSTANCE.build(); } return SingletonHolder.INSTANCE.factory; } private void build() { AMQConfigBean bean = loadConfigure(); this.factory = buildConnectionFactory(bean); } private javax.jms.ConnectionFactory buildConnectionFactory(AMQConfigBean bean) { javax.jms.ConnectionFactory targetFactory = new ActiveMQConnectionFactory(bean.getUserName(), bean.getPassword(), bean.getBrokerURL()); CachingConnectionFactory connectoryFacotry = new CachingConnectionFactory(); connectoryFacotry.setTargetConnectionFactory(targetFactory); connectoryFacotry.setSessionCacheSize(bean.getSessionCacheSize()); return connectoryFacotry; } private AMQConfigBean loadConfigure() { if ("tcp://10.10.1.1:61616" != null) { try { return new AMQConfigBean("tcp://10.10.1.1:61616", "hkadmin", "hk667", 20); } catch (Exception e) { throw new IllegalStateException("load amq config error!"); } } throw new IllegalStateException("load amq config error!"); } private static class AMQConfigBean { private String brokerURL; private String userName; private String password; private int sessionCacheSize; public AMQConfigBean() { } public AMQConfigBean(String brokerURL, String userName, String password, int sessionCacheSize) { this.brokerURL = brokerURL; this.userName = userName; this.password = password; this.sessionCacheSize = sessionCacheSize; } public String getBrokerURL() { return this.brokerURL; } public void setBrokerURL(String brokerURL) { this.brokerURL = brokerURL; } public String getUserName() { return this.userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return this.password; } public void setPassword(String password) { this.password = password; } public int getSessionCacheSize() { return this.sessionCacheSize; } public void setSessionCacheSize(int sessionCacheSize) { this.sessionCacheSize = sessionCacheSize; } } private static class SingletonHolder { static ConnectionFactory INSTANCE = new ConnectionFactory(null); } }
2、模版
package com.broadsense.iov.base.jms; import org.springframework.jms.core.JmsTemplate;
/**
* 模板厂
*
* @author flm
* 2017年10月13日
*/
public class JmsTemplateFactory { private final javax.jms.ConnectionFactory factory; private JmsTemplate topicJmsTemplate; private JmsTemplate queueJmsTemplate; public static JmsTemplateFactory getInstance() { return SingletonHolder.INSTANCE; } private JmsTemplateFactory() { this.factory = ConnectionFactory.getInstance(); } public synchronized JmsTemplate getTopicJmsTemplate() { if (this.topicJmsTemplate == null) { this.topicJmsTemplate = createTemplate(this.factory, true); } return this.topicJmsTemplate; } public synchronized JmsTemplate getQueueJmsTemplate() { if (this.queueJmsTemplate == null) { this.queueJmsTemplate = createTemplate(this.factory, false); } return this.queueJmsTemplate; } private JmsTemplate createTemplate(javax.jms.ConnectionFactory factory, boolean pubSubDomain) { JmsTemplate template = new JmsTemplate(factory); template.setPubSubDomain(pubSubDomain); return template; } public static class SingletonHolder { static JmsTemplateFactory INSTANCE = new JmsTemplateFactory(null); } }
3、消费者 模版
package com.broadsense.iov.base.jms; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.jms.Destination; import javax.jms.MessageListener; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.listener.SimpleMessageListenerContainer; /** * JMS监听器 创建消费者 * * @author flm * 2017年10月13日 */ public class JMSListener { private static final Logger LOGGER = LoggerFactory.getLogger(JMSListener.class); private static final Map<String, Destination> MQDESTS = new ConcurrentHashMap();
/**
* 开启一个 点对点的 消息队列监听 的消费者
*
* @param queueName 队列名称
* @param subName 订阅者的名字
* @param listener 监听
*/
public static synchronized void startJmsQueueListener(String queueName, MessageListener listener)
{ startJmsQueueListener(queueName, null, listener); } public static synchronized void startJmsQueueListener(String queueName, String subName, MessageListener listener) { Destination dst = (Destination)MQDESTS.get("QUEUE_" + queueName); if (dst == null) { ActiveMQQueue mq = new ActiveMQQueue(queueName); startJmsListener(mq, subName, listener); MQDESTS.put("QUEUE_" + queueName, mq); } else { LOGGER.warn(queueName + " already started"); } }
/**
* 开启 一对多 主题的 消息监听的消费者
*
* @param topicName 主题消息名称
* @param subName 订阅者的名字
* @param listener 监听
*/
public static synchronized void startJmsTopicListener(String topicName, MessageListener listener) { startJmsTopicListener(topicName, null, listener); } public static synchronized void startJmsTopicListener(String topicName, String subName, MessageListener listener) { ActiveMQTopic mq = new ActiveMQTopic(topicName); startJmsListener(mq, subName, listener); MQDESTS.put("QUEUE_" + topicName, mq); }
/**
* 开始 消息监听器 消费者
*
* @param dest 目的地
* @param subName 持久订阅的名字
* @param msgListener 消息监听器
*/
private static void startJmsListener(Destination dest, String subName, MessageListener msgListener) { javax.jms.ConnectionFactory factory = ConnectionFactory.getInstance(); SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(); listener.setConnectionFactory(factory); listener.setDestination(dest); listener.setMessageListener(msgListener); if ((subName != null) && (subName != "")) { listener.setDurableSubscriptionName(subName); } listener.start(); } }
4、生产者 模版
package com.broadsense.iov.base.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; /** * 创建 jms生产者 * * @author flm * 2017年10月13日 */ public class JMSPublisher {
/**
* 发送消息
* Topic 生产者
*
* @param dest 目的地
* @param msg 消息内容
*/
public static void sendTopicMessage(String dest, String msg) { JmsTemplateFactory.getInstance().getTopicJmsTemplate().send(dest, new MessageCreator(msg) { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(this.val$msg); } }); }
/**
* 发送消息
* Queue 生产者
*
* @param dest 目的地
* @param msg 消息内容
*/
public static void sendQueueMessage(String dest, String msg) { JmsTemplateFactory.getInstance().getQueueJmsTemplate().send(dest, new MessageCreator(msg) { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(this.val$msg); } }); } }
三、activemq的使用
1、创建一个junit测试,@Test 发布、接受、即可看到消息,mq管理后台也可以看到
package com.broadsense.iov.base.jms; import com.broadsense.iov.base.jms.JMSListener; import com.broadsense.iov.base.jms.JMSPublisher; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.junit.Test; /** * * @author flm */ public class JMSPublisherTest { public JMSPublisherTest() { } /** * 生产者 发布消息 * @throws */ @Test public void testSendMessage() throws InterruptedException { for (int idx = 1; idx < 3; idx++) { /* * 生产者 发布 消息到 queue/queue_b 的队列中 */ JMSPublisher.sendQueueMessage("queue/queue_b", String.valueOf(idx * 1111)); /* * 生产者 发布消息 到 topic/send 的Topic 主题中 */ //JMSPublisher.sendTopicMessage("topic/send", String.valueOf(idx * 1111)); } } /** * 消费者 订阅接受消息 */ @Test public void receiver() { /* * 消费者 订阅主题 topic/send 是否有消息发布,有侧打印出来 (通过 onMessage 监听) */ /*JMSListener.startJmsTopicListener("topic/send", new MessageListener() { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println("== 收到一个JMS消息..." + msg.getText()); } } catch (JMSException e) { e.printStackTrace(); } } });*/ /* * 消费者 订阅队列 queue/queue_b 是否有消息发布,有侧打印出来 (通过 onMessage 监听) */ JMSListener.startJmsQueueListener("queue/queue_b" ,new MessageListener() { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println("== 收到一个JMS消息..." + msg.getText()); } } catch (JMSException e) { e.printStackTrace(); } } }); try { System.in.read(); } catch (IOException ex) { Logger.getLogger(JMSPublisherTest.class.getName()).log(Level.SEVERE, null, ex); } } }
2、真正的项目实现
在项目的中具体实现,是加载一个类来实现订阅消息
加载启动一个订阅的主题,给一个类MQ()处理
package com.ifengSearch.track.dao; import org.springframework.stereotype.Repository; import com.broadsense.iov.base.jms.JMSListener; /** * 项目启动即 开启 * 通过 spring 依赖加载 Lister 订阅topic/send * @author flm * @2017年10月16日 */ @Repository public class Lister { public Lister(){ try { JMSListener.startJmsTopicListener("topic/send",new QM());// QM() 订阅 主题 topic/send } catch (Exception e) { } } }
MQ()订阅消息的处理类,通过实现
package com.ifengSearch.track.dao; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 通过 实现 MessageListener 的 onMessage 来监听消息 * 接受、处理消息 * @author flm * @2017年10月16日 */ public class MQ implements MessageListener { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println("== 收到一个JMS消息..." + msg.getText()); } } catch (JMSException e) { e.printStackTrace(); } } }