点对点(p2p)消息传送模型
在p2p模型中,生产者成为发送者,而消费者成为接收者。点对点最重要的特性如下:
- 消息通过成为队列的一个虚拟通道来进行交换。
- 每条消息仅会传送给一个接收者。可能会有多个接收者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接收者所消费。
- 消息存在先后顺序。一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者。当消息已被消费时,就会从队列中将它们删除。(除非使用了消息优先级外)
- 生产者和消费者之间没有耦合。接收者和发送者可以在运行时动态添加,这使得系统的复杂度随着时间而增长或降低。
供p2p队列使用的消息,既可以是持久性的,也可以使非持久性的。消息还可以有一个优先级和一个有效期。
点对点消息传送模型有两种类型:异步即发即弃(fire-and-forget)处理和异步请求/应答处理。
- 异步即发即弃,消息生产者向某个队列发送一条消息,而且它并不会期望接收到一个相应(至少是不是立刻接收到相应)
- 异步请求/应答处理,消息生产者向队列发送一条消息,然后阻塞等待应答队列,该应答队列正在等待来自接收者的相应。
使用点对点消息传送模型的场景:
- 每条消息都很重要
- 接受者对某个指定的消息进行一次而且仅仅一次处理。它所提供的QueueBrowser允许JMS客户端对队列进行快照(Snapshot),以查看正在等待被消费的消息。
- 需要再组件之间进行同步消息,而那些组件确实不同的编程语言编写的,或者在不同的技术平台上实现的。
- 使用基于消息的负载均衡,可以让服务器端的组件进行更大的吞吐量,特别是对于同构组件(也就是,Java到Java)更是如此。引入p2p模型,可以在体系结构中添加某种程度的并发处理,而不需要处理线程或者Java并发问题。
点对点消息模型的使用:
JMS初始化:
1、在类示例中,所有JMS初始化逻辑都在构造函数中处理。构造函数要做的第一件事就是:通过创建一个InitialContext,建立一个到JMS提供者的连接。
Context ctx = new InitalContext();
类路径中的jndi.properties文件给出了连接JMS提供者所需的连接信息。一旦我们有了一个JNDIcontext,就可以使用传入构造函数参数的JNDI连接工厂名称,得到
QueueConnectionFactory。随后就可以使用QueueConnectionFactory上的工厂方法创建QueueConnection:
QueueConnectionFactory qFactory = (QueueConnectionFactory)ctx.lookup(queuecf);
qConnect = qFactory.createQueueConnection();
到目前为止,我们已经创建了一个与JMS提供者的连接。当创建QueueConnection时,该连接最初是停止(stopped)模式的。这就意味着虽然您可以将消息发送给队列,但
是没有消息消费者(包括QBorrow类,它也是一个消息消费者)能够从这个连接接收到消息,直到它被启动为止。
QueueConnection对象用于创建一个JMSsession对象(特别是创建一个QueueSession),该对象时JMS中的工作线程和事务性工作单元。通常来说,应用程序会在应用
程序启动时创建一个单独的JMSConnection,并维护一个Session对象池,供生产者或消费消息者使用。
QueueSession对象通过QueueConnection对象上的工厂对象来创建。在我们的例子中,QueueConnection变量在构造函数的外部声明,以便可以用QBorrow类的exit方
法来关闭连接。不再使用该连接资源之后,将它关闭是非常重要的。关闭Connection对象还将关闭所有打开的、和该连接有关的Session对象。构造函数中用来创建
QueueSession的语句如下:
qSession = qConnect.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
请注意:createQueueSession方法使用两个参数。第一个参数表示QueueSession是否为事务性的。true表示会话是事务性的,这意味着,在QueueSession预期使用期
内发送到队列的消息,将不会传送给接收者,直到QueueSession上调用了commit方法为止。同样,在QueueSession上调用rollback方法,会山粗事务性会话期间发送的所
有消息。第二个参数表示确认模式。它由3个可选值,分别为Session.AUTO_ACKNOWLEDGE、Session.CLIENT_AKNOWLEGE和Session.DUPS_ACKNOWLEDGE。
如果该会话是事务性的,就会忽略确认模式。
构造函数中的下面两行代码将执行对JMS提供者的JNDI查找,以获得受管的目的地。我们在例子中将JMS destination造型(cast)为一个Queue。提供给每个lookup方法
的参数是一个String值,它包含了类中使用的队列的JNDI名称:
requestQ = (Queue)ctx.lookup(requestQueue);
responseQ = (Queue)ctx.lookup(responseQueue);
最后一行代码启动了这个连接,自此允许在该连接上接收消息。通常来说,子啊启动该连接之前执行所有的初始化逻辑,是一个明智之举:
qconnection.start();
相当有趣的是,如果只是发送消息,就无须启动连接。然而,如果有机会共享该连接或将请求/应答处理添加到发送者类中,通常建议您启动该连接,因为这样就可以做到防患
于未然。
可以从JMS connection获得的另一个有用之物就是关于连接的元数据。调用Connection对象上的getMetaData方法可以获得ConnectionMetaData对象,该对象提供了许
多有用信息,比如JMS版本、JMS提供者名称、JMS提供者版本和JMS提供者支持的JMSX属性名称扩展等。
发送消息和接收响应
一旦QBorrow类被初始化,就可以通过命令行输入工资额和贷款额。这时会从main方法中调用sendLoanRequest方法,向队列发送贷款申请,并等待来自QLender类的响
应。在这种方法一开始,我们选择创建一个MapMessage,但也可以使用5种JMS消息类型中的任何一种:
MapMessage msg = qSession.createMapMessage(); msg.setDouble("Salary",salary); msg.setDouble("LoanAmount",loanAmt); msg.setJMSReplyTo(resonseQ);
请注意:JMS消息是通过和消息类型相匹配的一个工厂方法,是从session对象中创建的。使用关键字new来实例化一个新的JMS消息对象将不会奏效;它必须从session对象
中创建。在创建并加载信息对象之后,我们还为响应队列设置了JMSReplyTo消息头属性,这会进一步解耦生产者和消费者之间的耦合。在使用请求/应答模型时,在消息生产者
中设置JMSReplyTo消息头属性,而不是消息消费者中指定应答队列,这事一种通行的标准做法(standard practice)。