任何一个系统从整体上来看,其实质就是由无数个小的服务或事件(我们可以称之为事务单元)有机地组合起来的。对于系统中任何一个比较复杂的功能,都是通过调用各个独立的事务单元以实现统一的协调运作而实现的。现在我们的问题是,如果有两个完全独立的服务(比如说两个不同系统间的服务)需要相互交换数据,我们该如何实现?
第一种方法,估计也是用的最多的,就是RPC模式。这种方法就是在自己的代码中远程调用其它程序中的代码以达到交换数据的目的。但是这种方法很显然地存在了一个问题:就是一定要等到获取了数据之后才能继续下面的操作。当然,如果一些逻辑是需要这些数据才能操作,那这就是我们需要的。
第二种方法就是Hessian,我个人觉得Hessian的实现在本质上与rpc模式的一样,只是它采用了配置,简化了代码。
上面这两个方法,基本上能解决所有的远程调用的问题了。但是美中不足的是,如果我在A系统中有一个操作是需要让B系统做一个响应的,但我又不需要等它响应完才做下面的操作,这该怎么办?于是新的解决方案就需要被提出来,而SUN公司的设计师们也考虑到了,在JAVA中这就被体现为JMS(java message service)。
一、JMS消息
JAVA 消息服务(JMS)定义了Java 中访问消息中间件的接口。JMS 只是接口,并没有给予实现,实现JMS 接口的消息中间件称为JMS Provider,例如ActiveMQ。
消息分为:消息生产者,消息服务器和消息消费者。
生产者与消费者之间是透明的,生产者在产生消息之后,把消息发送到消息服务器,再由消息服务器发给消费者,因此它们构成了JMS的3点结构;
用JMS 的应用程序被称为JMS 客户端,处理消息路由与传递的消息系统被称为JMS Provider,而JMS 应用则是由多个JMS 客户端和一个JMS Provider 构成的业务系统。发送消息的JMS 客户端被称为生产者(producer),而接收消息的JMS 客户端则被称为消费者(consumer)。同一JMS 客户端既可以是生产者也可以是消费者。
JMS 的编程过程很简单,概括为:应用程序A 发送一条消息到消息服务器(也就是JMS Provider)的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A 和应用程序B 没有直接的代码关连,所以两者实现了解偶。如下图:
二、消息的传递模型
JMS支持两种消息传递模型:点对点(point-to-point,简称PTP)和发布/订阅(publish/subscribe,简称pub/sub)。这两种消息传递模型非常相似,但有以下区别:
A、PTP消息传递模型规定了一条消息只能够传递费一个接收方;
B、Pub/Sub消息传递模型允许一条消息传递给多个接收方;
每个模型都通过扩展公用基类来实现。例如:javax.jms.Queue和Javax.jms.Topic都扩展自javax.jms.Destination类。
1. 点对点消息传递
通过点对点的消息传递模型,一个应用程序可以向另外一个应用程序发送消息。在此传递模型中,目标类型时队列。消息首先被传送至队列目标,然后从队列中将消息传送至对此队列进行监听的某个消费者,如下图:
一个队列可以关联多个队列发送方和接收方,但一条消息仅传递给一个接收方。如果多个接收方正在监听队列上的消息,JMS Provider将根据“先来者优先”的原则确定由哪个接收方接受下一条消息。如果没有接收方在监听队列,消息将保留在队列中,直至接收方连接到队列为止。这种消息传递模型是传统意义上的拉模型或轮询模型。在此列模型中,消息不时自动推动给客户端的,而是要由客户端从队列中请求获得.
2. 发布/订阅消息传递
通过发布/订阅消息传递模型,应用程序能够将一条消息发送到多个接收方。在此传送模型中,目标类型是主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的或消费者。如下图:
主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,Pub/Sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。Pub/Sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消费者无须通过主动请求或轮询主题的方法来获得新的消息。
上面两种消息传递模型里,我们都需要定义消息生产者和消费者,生产者把消息发送到JMS Provider的某个目标地址(Destination),消息从该目标地址传送至消费者。消费者可以同步或异步接收消息,一般而言,异步消息消费者的执行和伸缩性都优于同步消息接收者,体现在:
1. 异步消息接收者创建的网络流量比较小。单向对东消息,并使之通过管道进入消息监听器。管道操作支持将多条消息聚合为一个网络调用。
2. 异步消息接收者使用线程比较少。异步消息接收者在不活动期间不使用线程。同步消息接收者在接收调用期间内使用线程,结果线程可能会长时间保持空闲,尤其是如果该调用中指定了阻塞超时。
3.对于服务器上运行的应用程序代码,使用异步消息接收者几乎总是最佳选择,尤其是通过消息驱动Bean。使用异步消息接收者可以防止应用程序代码在服务器上执行阻塞操作。而阻塞操作会是服务器端线程空闲,甚至会导致死锁。阻塞操作使用所有线程时则发生死锁。如果没有空余的线程可以处理阻塞操作自身解锁所需的操作,这该操作永远无法停止阻塞。
三、 JMS Provider(ActiveMQ)
ActiveMQ的特性及优势如下:
1. 实现JMS1.1规范,支持J2EE1.4以上。
2. 可运行与任何JVM和大部分web容器(ActiveMQ works great in any JVM)
3. 支持多种语言客户端(java, C, C++, Ajax, ActionScript等等)
4. 支持多种协议(stomp, openwire, REST)
5. 良好的Spring支持(ActiveMQ has great Spring Support)
6. 速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ)
7. 与OpenJMS、JBossMQ等开源jms provider相比,ActiveMQ有apache的支持,持续发展的优势明显
四、消息生产者与消息消费者主要步骤
消息生产者与消费者实现的主要步骤,两种模型的步骤差不多,只是一个是创建队列(Queue),一个是创建主题(Topic)。
消息生产者实现主要步骤:
//1. 由ConnectionFactory 创建连接,一般 ConnectionFactory 从 JNDI 中获得
Connection connection = connectionFactory.createConnection();
//2. 创建 Session,
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//3. P2P 中创建 Destination, 这里创建了一个 Queue,队列名为 “Hello.Unmi”
Destination destination = session.createQueue(“Hello.Unmi”); //实际应用中队列是从 JNDI 中获得
//3. Sub/Pub 模型时,创建 Destination, 创建了一个 Topic,主题为 “Unmi.Learn.ActiveMQ”
Destination destination = session.createTopic(“Unmi.Learn.ActiveMQ”);
//4. 创建 Producer,
MessageProducer producer = session.createProducer(destination);
//5. 创建 Message,这里创建的是一个文本消息,可创建多种类型的消息
Message message = session.createTextMessage(“Hello JMS Sended.”);
//6. 发送消息
producer.send(message);
消息消费者实现主要步骤:
//1. 创建 Connection,//ActiveMQConnection 实现了 QueueConnection, TopicConnection
Connection connection = connectionFactory.createConnection();
//2. 创建 Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//3. 创建 Destination, 一个 Queue,队列名与上同,这样就能接收到前面生产者发来的消息
Destination destination = session.createQueue(“Hello.Unmi”);//实际应用中队列是从 JNDI 中获得
//3. Sub/Pub 模型时,创建 Destination, 一个 Topic,主题名同上,可接上前面发布的消息
Destination destination = session.createTopic(“Unmi.Learn.ActiveMQ”); //实际应用中队列是从 JNDI 中获得
//4. 创建 Consumer
MessageConsumer consumer = session.createConsumer(destination);
//5. 注册消息监听器,当消息到达时被触发并处理消息,也可阻塞式监听 consumer.receive()
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
//Do something with the message.
}
});
参见:http://unmi.cc/jms-introduction-message-model/