1.JMS简介
JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中可以在特定的时候利用生产者生成消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS编程模型
(1) ConnectionFactory
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。
(2) Destination
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。
(3) Connection
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
(4) Session
Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
(5) 消息的生产者
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
(6) 消息消费者
消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
(7) MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
2.ActiveMQ简介
ActiveMQ是Apache软件基金下的一个开源软件,它遵循JMS规范(Java Message Service),是消息驱动中间件软件(MOM)。它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障。ActiveMQ使用Apache许可协议。因此,任何人都可以使用和修改它而不必反馈任何改变。这对于商业上将ActiveMQ用在重要用途的人尤为关键。MOM的工作是在分布式的各应用之间调度事件和消息,使之到达指定的接收者。所以高可用,高性能,高可扩展性尤为关键。
- ActiveMQ特性
⒈支持多种语言客户端,如:Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议有 OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
⒉ 完全支持JMS1.1和J2EE1.4规范,它们包括同步和异步消息传递,一次和只有一次的消息传递,对于预订者的持久消息等。依附于JMS规范意味着,不论JMS消息提供者是谁,同样的基本特性(持久化,XA消息,事务)都是有效的。
⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去。
⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
⒌ ActiveMQ提供各种连接选择,包括HTTP,HTTPS,IP多点传送,SSL,STOMP,TCP,UDP,XMPP等。大量的连接协议支持使之具有更好的灵活性。很多现有的系统使用一种特定协议并且不能改变,所以一个支持多种协议的消息平台降低了使用的门槛。虽然连接很重要,但是和其他容器集成也同样重要。
6.ActiveMQ提供多种持久性方案可供选择,也可以完全按自己需求定制验证和授权。例如,ActiveMQ通过KahaDB提供自己的超快速消息持久方案(ultra-fast message persistence),但也支持标准的JDBC方案。ActiveMQ可以通过配置文件提供简单的验证和授权,也提供标准的JAAS登陆模块。
7.ActiveMQ是为开发者设计的。它并不需要专门的管理工具,因为它提供各种易用且强大的管理特性。有很多方法去监控ActiveMQ的各个方面,可以通过JMX使用JConsole或ActiveMQ web console;可以运行ActiveMQ消息报告;可以用命令行脚本;可以通过日志。
8.代理器集群(Broker clustering)----为了利于扩展,多个ActiveMQ broker能够联合工作。这个方式就是network of brokers并且能支持多种拓扑结构;支持客户端-服务器,点对点。
9.支持Ajax, 支持与Axis的整合
- ActiveMQ优势
1.与OpenJMS、JbossMQ等开源jms provider相比,ActiveMQ有Apache的支持,持续发展的优势明显。
2.消息处理速度很快
3.提高系统资源的利用率,主要是任务的派发不是24小时平均的,而是高峰时期任务量很多,比如1秒1000多个,有的时候很低,比如十几秒钟才来一个。应用服务通过JMS队列一个一个的取任务,做完一个再领一个,使系统资源的运用趋于平均。比如ActiveMQ在赛扬(2.40GHz)机器上能够达到2000/s,消息大小为1-2k。好一些的服务器可以达到2万以上/秒。
3.ActiveMQ安装
ActiveMQ在linux服务上安装操作如下:
1.在官网下载activemq安装文件。地址:http://activemq.apache.org/download.html
2.上传下载的tar.gz安装文件到linux服务器上,并解压到指定目录:如 tar -xf apache-activemq-5.15.2-bin.tar.gz
3.运行activemq,进入到解压的 apache-activemq-5.15.2/bin目录,执行命令:activemq start
4.开放端口8161,61616,保证端口可访问。
运行activemq截图如下:
本机访问启动成功的activemq截图如下:
4.ActiveMQ类别及开发流程
1)、Point-to-Point (点对点)消息模式开发流程
1、生产者(producer)开发流程:
1.1 创建Connection: 根据url,user和password创建一个jms Connection。
1.2 创建Session: 在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。
1.3 创建Destination对象: 需指定其对应的主题(subject)名称,producer和consumer将根据subject来发送/接收对应的消息。
1.4 创建MessageProducer: 根据Destination创建MessageProducer对象,同时设置其持久模式。
1.5 发送消息到队列(Queue): 封装Message消息,使用MessageProducer的send方法将消息发送出去。
2、消费者(consumer)开发流程:
2.1 实现MessageListener接口: 消费者类必须实现MessageListener接口,然后在onMessage()方法中监听消息的到达并处理。
2.2 创建Connection: 根据url,user和password创建一个jms Connection,如果是durable模式,还需要给connection设置一个clientId。
2.3 创建Session和Destination: 与ProducerTool.java中的流程类似,不再赘述。
2.4 创建replyProducer【可选】:可以用来将消息处理结果发送给producer。
2.5 创建MessageConsumer: 根据Destination创建MessageConsumer对象。
2.6 消费message: 在onMessage()方法中接收producer发送过来的消息进行处理,并可以通过replyProducer反馈信息给producer
样例代码:
在消息生产者中定义一个队列,destination_request,提供消息,同时定义一个监听消息的队列拥有接受消费者回复的消息,destination_response。
1 package com.tiantian.springintejms.test; 2 3 import com.tiantian.springintejms.entity.Email; 4 import com.tiantian.springintejms.entity.TestMqBean; 5 import com.tiantian.springintejms.service.ProducerService; 6 import org.apache.activemq.ActiveMQConnectionFactory; 7 import org.junit.Test; 8 import org.junit.runner.RunWith; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.beans.factory.annotation.Qualifier; 11 import org.springframework.test.context.ContextConfiguration; 12 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 13 14 import javax.jms.*; 15 16 @RunWith(SpringJUnit4ClassRunner.class) 17 public class ProducerSendTest { 18 19 @Test 20 public static void main(String[] args) { 21 ConnectionFactory connectionFactory; 22 Connection connection; 23 Session session; 24 Destination destination_request,destination_response; 25 MessageProducer producer; 26 MessageConsumer consumer; 27 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.210.128:61616"); 28 try { 29 connection = connectionFactory.createConnection(); 30 connection.start(); 31 //第一个参数是是否是事务型消息,设置为true,第二个参数无效 32 //第二个参数是 33 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的 34 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的 35 //时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者) 36 //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。 37 //待测试 38 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 39 destination_request = session.createQueue("request-queue"); 40 destination_response = session.createQueue("response-queue"); 41 producer = session.createProducer(destination_request); 42 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 43 44 consumer = session.createConsumer(destination_response); 45 //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢 46 TestMqBean bean = new TestMqBean(); 47 bean.setAge(13); 48 for (int i = 0; i < 10; i++) { 49 bean.setName("send to data -" + i); 50 producer.send(session.createObjectMessage(bean)); 51 } 52 producer.close(); 53 System.out.println("消息发送成功..."); 54 55 consumer.setMessageListener(new MessageListener() { 56 @Override 57 public void onMessage(Message message) { 58 try { 59 if (null != message) { 60 TextMessage textMsg = (TextMessage) message; 61 System.out.println("收到回馈消息" +textMsg.getText()); 62 } 63 } catch (Exception e) { 64 // TODO: handle exception 65 } 66 } 67 }); 68 69 } catch (JMSException e) { 70 e.printStackTrace(); 71 } 72 } 73 }
在消息消费者中定义一个队列,destination_request,用于接受消息,同时定义一个回复收到消息的队列回复生产者已经收到消息,destination_response。
1 package com.tiantian.springintejms.test; 2 3 import com.tiantian.springintejms.entity.TestMqBean; 4 import org.apache.activemq.ActiveMQConnectionFactory; 5 import org.junit.Test; 6 import org.junit.runner.RunWith; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 import javax.jms.*; 10 import java.util.Date; 11 12 @RunWith(SpringJUnit4ClassRunner.class) 13 public class ConsumerReceiveTest { 14 15 @Test 16 public static void main(String[] args) { 17 ConnectionFactory connectionFactory; 18 // Connection :JMS 客户端到JMS Provider 的连接 19 Connection connection = null; 20 // Session: 一个发送或接收消息的线程 21 final Session session; 22 // Destination :消息的目的地;消息发送给谁. 23 Destination destination_request,destination_response; 24 // 消费者,消息接收者 25 MessageConsumer consumer; 26 //回复接收到的消息 27 final MessageProducer producer; 28 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.210.128:61616"); 29 try { 30 // 构造从工厂得到连接对象 31 connection = connectionFactory.createConnection(); 32 // 启动 33 connection.start(); 34 // 获取操作连接 35 //这个最好还是有事务 36 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 37 // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 38 destination_request = session.createQueue("request-queue"); 39 destination_response = session.createQueue("response-queue"); 40 consumer = session.createConsumer(destination_request); 41 42 producer= session.createProducer(destination_response); 43 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 44 45 consumer.setMessageListener(new MessageListener() { 46 @Override 47 public void onMessage(Message message) { 48 try { 49 TestMqBean bean = (TestMqBean) ((ObjectMessage) message).getObject(); 50 System.out.println(bean); 51 if (null != message) { 52 System.out.println("收到消息" + bean.getName()); 53 Message textMessage = session.createTextMessage("已经成功收到消息,现在开始回复"+new Date().toString()); 54 producer.send(textMessage); 55 } 56 } catch (Exception e) { 57 // TODO: handle exception 58 } 59 } 60 }); 61 } catch (Exception e) { 62 e.printStackTrace(); 63 } 64 } 65 }
消息消费者收到消息,并打印出来,同时发送回复消息。截图如下:
消息生产者生产消息,同时接受到消费者回复的消息并打印出来。截图如下:
2)、Publisher/Subscriber(发布/订阅者)消息模式开发流程
1、订阅者(Subscriber)开发流程:
1.1 实现MessageListener接口: 在onMessage()方法中监听发布者发出的消息队列,并做相应处理。
1.2 创建Connection: 根据url,user和password创建一个jms Connection。
1.3 创建Session: 在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。
1.4 创建Topic: 创建2个Topic, topictest.messages用于接收发布者发出的消息,topictest.control 用于向发布者发送消息,实现双方的交互。
1.5 创建consumer和producer对象:根据topictest.messages创建consumer,根据topictest.control创建 producer。
1.6 接收处理消息:在onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作等等),并且可以使用producer对象将处理结果返回给发布者。
2、发布者(Publisher)开发流程:
2.1 实现MessageListener接口:在onMessage()方法中接收订阅者的反馈消息。
2.2 创建Connection: 根据url,user和password创建一个jms Connection。
2.3 创建Session: 在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。
2.4 创建Topic: 创建2个Topic,topictest.messages用于向订阅者发布消息,topictest.control用于接 收订阅者反馈的消息。这2个topic与订阅者开发流程中的topic是一一对应的。
2.5 创建consumer和producer对象: 根据topictest.messages创建publisher; 根据topictest.control 创建consumer,同时监听订阅者反馈的消息。
2.6 给所有订阅者发送消息,并接收反馈消息。 注:可同时运行多个订阅者测试查看此模式效果 。
样例代码:
消息发布者发布消息,定义一个主题example.A
1 package com.tiantian.springintejms.test; 2 3 import com.tiantian.springintejms.entity.TestMqBean; 4 import org.apache.activemq.ActiveMQConnectionFactory; 5 import org.junit.Test; 6 import org.junit.runner.RunWith; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 import javax.jms.*; 10 11 @RunWith(SpringJUnit4ClassRunner.class) 12 public class TopicProducerSendTest { 13 14 private static String user = "admin"; 15 private static String password = "admin"; 16 private static String url = "tcp://192.168.210.128:61616"; 17 18 public static void main(String[] args)throws Exception { 19 // ConnectionFactory :连接工厂,JMS 用它创建连接 20 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); 21 // Connection :JMS 客户端到JMS Provider 的连接 22 Connection connection = connectionFactory.createConnection(); 23 // Connection 启动 24 connection.start(); 25 System.out.println("Connection is start..."); 26 // Session: 一个发送或接收消息的线程 27 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); 28 // Topicr :消息的目的地;消息发送给谁. 29 Topic destination = session.createTopic("example.A"); 30 // MessageProducer:消息发送者 31 MessageProducer producer = session.createProducer(destination); 32 // 设置不持久化,此处学习,实际根据项目决定 33 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 34 // 构造消息,此处写死,项目就是参数,或者方法获取 35 sendMessage(session, producer); 36 session.commit(); 37 38 connection.close(); 39 System.out.println("send text ok."); 40 } 41 42 public static void sendMessage(Session session, MessageProducer producer) 43 throws Exception { 44 for (int i = 1; i <= 10; i++) {//有限制 45 TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i); 46 // 发送消息到目的地方 47 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); 48 producer.send(message); 49 } 50 } 51 52 }
消息订阅者接收消息,定义一个与发布者相对应的主题example.A。
1 package com.tiantian.springintejms.test; 2 3 import org.apache.activemq.ActiveMQConnectionFactory; 4 import org.junit.Test; 5 import org.junit.runner.RunWith; 6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 7 8 import javax.jms.*; 9 10 @RunWith(SpringJUnit4ClassRunner.class) 11 public class TopicSubscriberTest { 12 private static String user = "admin"; 13 private static String password = "admin"; 14 private static String url = "tcp://192.168.210.128:61616"; 15 public static void main(String[] args) throws Exception{ 16 // ConnectionFactory :连接工厂,JMS 用它创建连接 17 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); 18 // Connection :JMS 客户端到JMS Provider 的连接 19 Connection connection = connectionFactory.createConnection(); 20 connection.start(); 21 // Session: 一个发送或接收消息的线程 22 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 23 // Destination :消息的目的地;消息发送给谁. 24 Topic destination=session.createTopic("example.A"); 25 // 消费者,消息接收者 26 MessageConsumer consumer = session.createConsumer(destination); 27 consumer.setMessageListener(new MessageListener(){//有事务限制 28 @Override 29 public void onMessage(Message message) { 30 try { 31 TextMessage textMessage=(TextMessage)message; 32 System.out.println("接收到消息:"+textMessage.getText()); 33 } catch (JMSException e1) { 34 e1.printStackTrace(); 35 } 36 try { 37 session.commit(); 38 } catch (JMSException e) { 39 e.printStackTrace(); 40 } 41 } 42 }); 43 } 44 }
消息发布者发布消息,并打印截图如下:
消息订阅者接受消息并打印截图如下:(消息订阅者需在发布者之前启动,可保证能取到订阅的消息)
3)、延迟消息发送
有时候不希望消息马上被broker投递出去,而是想要消息停留一段时间以后发给消费者,或者想让消息每隔一定时间投递一次,一共投递指定的次数。类似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。开发者使用时只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照定义的行为去处理消息。ActiveMQ定义调度消息参数为:
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMQ_SCHEDULED_CRON | String | Cron表达式 |
//延迟60秒发送消息 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long time = 60 * 1000; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); producer.send(message);
//基于CRON表达式定时投递消息 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);