一、什么是JMS
JMS是java message service的缩写即java消息服务,是java定义的消息中间件(MOM)的技术规范(类似玉JDBC)。用于程序之间的异步通信,如果两个应用程序需要通信,则可以通过JMS来进行转发,达到解耦的目的。
二、JMS的消息模型
JMS有两种消息模型:
1、点对点或队列模型(Point-to-Point Messaging Domain):消息的生产者将生产出的消息加入到一个队列中,消息的接受者从队列中获取消息。队列保留着消息,直到他们被消费或者超时。
特点:①每个消息都只有一个消费者,如果队列中的消息被某个消费者消费,该消息会移出队列。
②消息的生产者和消费者之间没有时间的约束,生产者可以生产消息,无论消费者是否在运行。同样只要消息队列中有消息,消费者就可以消费和生产者的状态无关。
③消费者接受完消息之后要向队列反馈成功的信息。
2、发布订阅模型(Publish/Subscribe Messaging Domain ):发布者发布消息,所有的订阅者都会接受到消息。
特点:①发布一个消息会有多个订阅者接受到消息
②消息发布者和订阅者之间有时间上的相关性。订阅一个主题的订阅者只能接收到自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许订阅者接受它在未处于激活状态时发送的消息。
三、JMS的对象模型
1、连接工厂(ConnectionFactory):用于创建JMS连接
2、JMS连接(Connection):表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
3、JMS会话(Session):表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。
4、JMS目的地(Destination):生产者发送消息的地方。点对点模式中的目的地是queue,发布订阅模式中的目的地就是topic。
5、生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。
四、ActiveMQ
上面说到JMS是java消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。
ActiveMQ就是Apache组织的一个开源的消息中间件,它实现了JMS技术规范。
1、ActiveMQ点对点模式
下面的代码都是测试代码,消费者如果关闭连接,则只能消费一次消息,所以没有关闭。
public class MsgSender { //ActiveMq 的默认用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMq 的默认登录密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的链接地址 private static final String BROKEN_URL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { //创建一个链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); Connection connection = null; Session session = null; MessageProducer messageProducer = null; try { //从工厂中创建一个链接 connection= connectionFactory.createConnection(); //开启链接 connection.start(); //创建一个事务(这里通过参数可以设置事务的级别) session = connection.createSession(true,Session.SESSION_TRANSACTED); //创建一个消息队列 Queue queue = session.createQueue("myqueue"); //消息生产者 messageProducer = session.createProducer(queue); int count = 0; while(true){ Thread.sleep(1000); //创建一条消息 TextMessage msg = session.createTextMessage("生产消息"+count); System.out.println("生产消息"+count++); //发送消息 messageProducer.send(msg); //提交事务 session.commit(); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ try { if(connection != null){ connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
public class MsgReciver { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("myqueue"); MessageConsumer consumer = null; consumer = session.createConsumer(queue); MsgListener listener=new MsgListener(); consumer.setMessageListener(listener); } catch (JMSException e) { e.printStackTrace(); } } }
public class MsgListener implements MessageListener { @Override public void onMessage(Message msg) { TextMessage tmsg = (TextMessage)msg; try { System.out.println("接收到的数据:"+tmsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
2、发布订阅模式
public class MsgSender { //ActiveMq 的默认用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMq 的默认登录密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的链接地址 private static final String BROKEN_URL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { //创建一个链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); Connection connection = null; Session session = null; MessageProducer messageProducer = null; try { //从工厂中创建一个链接 connection= connectionFactory.createConnection(); //开启链接 connection.start(); //创建一个事务(这里通过参数可以设置事务的级别) session = connection.createSession(true,Session.SESSION_TRANSACTED); //创建一个Topic Topic myTopic = session.createTopic("mytopic"); //消息生产者 messageProducer = session.createProducer(myTopic); int count = 0; while(true){ Thread.sleep(10000); //创建一条消息 TextMessage msg = session.createTextMessage("发布消息"+count); System.out.println("发布消息"+count++); //发送消息 messageProducer.send(msg); //提交事务 session.commit(); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally{ try { if(connection != null){ connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
public class MsgReciver { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = "tcp://111.230.239.152:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Topic myTopic = session.createTopic("mytopic"); MessageConsumer consumer = null; consumer = session.createConsumer(myTopic); MsgListener listener=new MsgListener(); consumer.setMessageListener(listener); } catch (JMSException e) { e.printStackTrace(); } } }
public class MsgListener implements MessageListener { @Override public void onMessage(Message msg) { TextMessage tmsg = (TextMessage)msg; try { System.out.println("接收到的数据:"+tmsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
3、spring集成ActiveMQ
引入jar包
<!-- 整合activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.4</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.10.RELEASE</version> </dependency>
activemq.brokerURL=tcp://127.0.0.1:61616 activemq.queue=myqueue activemq.topic=mytopic
<!-- 集成ActiveMQ --> <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 --> <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg name="brokerURL" value="${activemq.brokerURL}"/> </bean> <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="tagertConnectionFactory"/> </bean> <!-- 配置生产者 --> <!-- Spring使用JMS工具类,可以用来发送和接收消息 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这里是配置的spring用来管理connectionfactory的connectionfactory --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 配置destination --> <!-- 队列目的地 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="${activemq.queue}"/> </bean> <!-- 话题目的地 --> <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="${activemq.topic}"/> </bean> <!-- 配置监听器 --> <bean id="myQueueListener" class="com.myproject.listener.MqQueueListener"/> <bean id="mqTopicListener" class="com.myproject.listener.MqTopicListener"/> <!-- 系统监听器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="myQueueListener"/> </bean> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="itemAddTopic"/> <property name="messageListener" ref="mqTopicListener"/> </bean>
public class MqQueueListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage testMessage = (TextMessage)message; try { System.out.println("收到信息:"+testMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
public class MqTopicListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage testMessage = (TextMessage)message; try { System.out.println("收到信息:"+testMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
@Service public class UserServiceImpl implements UserService { @Autowired private UserMapper userMapper; @Autowired private RedisCacheUtil redisCacheUtil; @Autowired private JmsTemplate jmsTemplate; @Resource(name="itemAddTopic") private Destination destination; @Override public User getById(Map<String, Object> params) { Integer userId = Integer.parseInt(params.get("id").toString()); User user = (User)redisCacheUtil.get(RedisKey.USER_BASIC_INFO+userId); if(user == null ){ user = userMapper.selectByPrimaryKey(userId); if(user != null ){ redisCacheUtil.set(RedisKey.USER_BASIC_INFO+userId, user); } } ObjectMapper mapper = new ObjectMapper(); try { final String userInfo = mapper.writeValueAsString(user); //获取到用户后发送一个消息给其他模块 jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(userInfo); return msg; } }); } catch (JsonProcessingException e) { e.printStackTrace(); } return user; } }