JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
整合activeMQ到springmvc项目也很简单
只需要增加如下的maven依赖即可,初学者请直接添加all这个jar,否则如果jar包冲突会影响信心的
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
开发步骤:
① 添加好maven依赖后,请到 http://activemq.apache.org/ 下载最新版的activeMQ, 我这里下载的是5.13.2,下载解压后执行bin中的activemq.bat进行启动
② 理解JMS工作原理
1.首先配置链接信息
和操作数据库一样,我们要根据数据库地址和链接信息,来配置datasource一样,activemq类同,底层首先需要由activemq厂商提供的驱动,根据具体地址,封装一个ConnectionFactory, 这是最基本的配置, 同时由于是由Spring进行统一管理, 所以需要将ConnectionFactory注入到sping。jms中的ConnectionFactory。这样链接就配置好了。
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
同理 spring.jms 提供了 SingleConnectionFactory和CachingConnectionFactory,这里我们使用的SingleConnectionFactory,根据意思也知道得八九不离十了,
SingleConnectionFactory就是每次请求都返回同一个链接,从启动开始就一直打开,不会关闭。 而CachingConnectionFactory就和数据库的链接池一样,可以缓存很多信息,在用户很多的情况下,同等情况下会增加效率。
同时activeMQ也提供了PooledConnectionFactory,这样也可以缓存很多信息,减少资源的使用,配置如下
<!-- 使用pool进行链接 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> -->
<!-- <property name="brokerURL" value="tcp://localhost:61616"/> -->
<!-- </bean> -->
<!-- <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> -->
<!-- <property name="connectionFactory" ref="targetConnectionFactory"/> -->
<!-- <property name="maxConnections" value="10"/> -->
<!-- </bean> -->
<!-- <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> -->
<!-- <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> -->
<!-- </bean> -->
2。配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的,所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发,为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象。
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
在发送消息的时候,除了知道activemq的地址外,还需要让发送者知道具体发给谁?
activemq提供了两种模式,一个是点对点的 ,一个是发送给多个订阅者这种订阅模式
<!--这个是队列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"/>
</bean>
这样发送者相关配置就已经完成了。
③ 发送者发送消息的服务类: 很简单,调用上面配置好的jmsTemplate。send方法就好了
package xiaochangwei.zicp.net.jms;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
@Service
public class ProducerServiceImpl implements ProducerService {
@Resource
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination, final String message) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
至此发送者相关的都已经全部完成了,下面配置接受者
④ 接收者配置,接收者和发送者的链接信息都是一样的,不然收不到信息是不, 除了链接信息,要知道什么时候有消息发送过来,接收者这边就要实现一个消息监听器,
当监听到消息后,进行相应的业务处理,每个目的地都有一个MessageListenerContainer,配置MessageListenerContainer需要链接信息,目的地信息,和接受者的消息监听器
<!-- 消息监听器 -->
<bean id="consumerMessageListener"
class="xiaochangwei.zicp.net.jms.ConsumerMessageListener" />
<!-- 消息监听容器 -->
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
</bean>
这里我们进行一个业务模拟
A系统通过activemq发送命令串给B系统, B系统收到命令后,根据约定的规则进行解析,并进行业务处理
A系统调用ProducerServiceImpl中的 sendMessage 进行消息发送,为减少传递量,均采用json发送,代码如下:
package xiaochangwei.zicp.net.web.controller;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.jms.Destination;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import xiaochangwei.zicp.net.common.tools.JsonUtil;
import xiaochangwei.zicp.net.entity.TestEntity;
import xiaochangwei.zicp.net.jms.ProducerService;
@Controller
@RequestMapping("jms")
public class JmsTestController {
@Autowired
private ProducerService producerService;
@Autowired
@Qualifier("queueDestination")
private Destination destination;
@RequestMapping("test")
public @ResponseBody String testSend() throws Exception {
//系统业务需要, 需要更新用户表中信息,根据id更新name
List<TestEntity> list = new LinkedList<TestEntity>();
TestEntity en = new TestEntity();
en.setId(100);
en.setName("name1");
list.add(en);
TestEntity en2 = new TestEntity();
en2.setId(1002);
en2.setName("name2");
list.add(en2);
Map<String,Object> mapEntity = new HashMap<String, Object>();
mapEntity.put("user", list);
Map<String,Object> map = new HashMap<String, Object>();
map.put("update", mapEntity);
System.out.println("发送方发送内容为:" + JsonUtil.object2String(map));
//发送更新数据请求
producerService.sendMessage(destination, JsonUtil.object2String(map));
return "jms exute complete";
}
}
package xiaochangwei.zicp.net.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMsg = (TextMessage) message;
try {
System.out.println("接收者受到消息:" + textMsg.getText());
System.out.println("开始进行解析并调用service执行....");
} catch (JMSException e) {
e.printStackTrace();
}
}
}