zoukankan      html  css  js  c++  java
  • JMS 企业开发流程实现

    关于JMS的一些介绍参见【http://blog.csdn.net/aking21alinjuju/article/details/6051421】

    【补充】

    消息的组成

    1. 头(head)

    每条JMS 消息都必须具有消息头。头字段包含用于路由和识别消息的值。可以通过多种方式来设置消息头的值:

    a. 由JMS 提供者在生成或传送消息的过程中自动设置

    b. 由生产者客户机通过在创建消息生产者时指定的设置进行设置

    c. 由生产者客户机逐一对各条消息进行设置

    2. 属性(property)

    消息可以包含称作属性的可选头字段。他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,其中可以包括如下信息:创建数据的进程、数据的创建时间以及每条数据的结构。JMS提供者也可以添加影响消息处理的属性,如是否应压缩消息或如何在消息生命周期结束时废弃消息。

    3. 主体(body)

    包含要发送给接收应用程序的内容。每个消息接口特定于它所支持的内容类型。JMS为不同类型的内容提供了他们各自的消息类型,但是所有消息都派生自Message接口。

    StreamMessage   一种主体中包含Java基元值流的消息。其填充和读取均按顺序进行。

    MapMessage     一种主体中包含一组键--值对的消息。没有定义条目顺序。

    TextMessage       一种主体中包含Java字符串的消息(例如,XML消息)。

    ObjectMessage    一种主体中包含序列化Java对象的消息。

    BytesMessage     一种主体中包含连续字节流的消息。

    例如:MapMessage 消息格式

    【开发流程】

    P2P开发模型

    【生产者】

    1、创建connection

    1 // 根据url,user和password创建一个jms Connection。
    2     ActiveMQConnectionFactory connectionFactory   =   new ActiveMQConnectionFactory (user, password, url);
    3     connection = connectionFactory.createConnection();
    4     connection.start();

    2、创建session

    /**在connection的基础上创建一个session,同时设置是否支持事务ACKNOWLEDGE标识。
    • AUTO_ACKNOWLEDGE:自动确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收。 
    • CLIENT_ACKNOWLEDGE:客户端确认模式。会话对象依赖于应用程序对被接收的消息调用一个acknowledge()方法。一旦这个方法被调用,会话会确认最后一次确认之后所有接收到的消息。这种模式允许应用程序以一个调用来接收,处理并确认一批消息。注意:在管理控制台中,如果连接工厂的Acknowledge Policy(确认方针)属性被设置为"Previous"(提前),但是你希望为一个给定的会话确认所有接收到的消息,那么就用最后一条消息来调用acknowledge()方法。 
    • DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。注意:如果你的应用程序无法处理重复的消息的话,你应该避免使用这种模式。如果发送消息的初始化尝试失败,那么重复的消息可以被重新发送。
    •  SESSION_TRANSACTED**/
    
    Session session = connection.createSession(
        transacted, Session.AUTO_ACKNOWLEDGE);

    3、创建destination对象

      //需指定其对应的主题(subject)名称,producer和consumer将根据subject来发送/接收对应的消息

    	if (topic) {
    	destination = session.createTopic(subject);
    	} else {
    	destination = session.createQueue(subject);
    	}

    4、创建producer

     1 //根据Destination创建MessageProducer对象,同时设置其持久模式。 
     2 MessageProducer producer = session.createProducer(destination);
     3 if (persistent) {
     4          producer.setDeliveryMode(DeliveryMode.PERSISTENT);
     5 } else {
     6          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
     7 }
     8 if (timeToLive != 0) {
     9           producer.setTimeToLive(timeToLive);
    10 }

    5、发送消息到队列 //封装TextMessage消息,使用MessageProducer的send方法将消息发送出去。

    1 TextMessage message = session.createTextMessage("createMessageText");
    2 producer.send(message);

    【消费者开发流程】

     1、实现MessageListener接口

     2、创建Connection

    1 //根据url,user和password创建一个jms Connection,如果是durable模式,还需要给connection设置一个clientId。
    2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    3 Connection connection = connectionFactory.createConnection();
    4 if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
    5       connection.setClientID(clientId);
    6 }
    7 connection.setExceptionListener(this);
    8 connection.start();

     3、创建session和destination

      与生产者类似

     4、创建replayProducer【可选】

      

    1 //可以用来将消息处理结果发送给producer。 
    2 replyProducer = session.createProducer(null);
    3 replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);<span style="white-space: normal;">&nbsp;</span>

     5、创建MessageComsumer

    1 //根据Destination创建MessageConsumer对象。
    2 MessageConsumer consumer = null;
    3 if (durable && topic) {
    4      consumer = session.createDurableSubscriber((Topic)destination, consumerName);
    5 } else {
    6      consumer = session.createConsumer(destination);
    7 }<span style="white-space: normal;">&nbsp;</span>

     6、获取消息

    1 //在onMessage()方法中接收producer发送过来的消息进行处理,并可以通过replyProducer反馈信息给producer 
    2 if (message.getJMSReplyTo() != null) { 
    3     replyProducer.send(message.getJMSReplyTo(), 
    4     session.createTextMessage("Reply: " + message.getJMSMessageID())); 
    5 }  

    =======================================华丽丽的分割线============================================

    Publish/Subscriber(发布/订阅者)消息开发模式

    【订阅者】

        1、 实现MessageListener接口

      在onMessage()方法中监听发布者发出的消息队列,并做相应处理。

        2、 创建Connection

              根据url, user和password创建一个jms connection

        3、 创建Session

      在connection的基础上创建一个session,同时设置是否支持和ACKNOWLEDGE标志。

        4、 创建 Topic

              创建两个Topic,topictest.message用于接收发布者发出的消息,topictest.control用于向发布者发送消息,实现双方的交互。

        5、 创建consumer和producer对象

      根据topictest.message创建consumer,根据topictest.control创建producer

        6、 接收处理消息

      在onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作等等),并且可以使用   producer对象处理结果返回给发布者

    ----------------------------------------------------------------------------小分割---------------------------------------------------------------------------------------

    【发布者】

      1、 实现MessageListener接口 

       在onMessage()方法中接收订阅者的反馈消息。

      2、 创建Connection

       根据url, user和password 创建一个 jms Connection。

      3、 创建session

       在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标志。

      4、 创建Topic

       创建两个Topic,topictest.messages用于向订阅者发布消息,topictest.control用于接收订阅者反馈的消息。这两个Topic与订阅者开发流程中的topic是一一对应的。

      5、 创建consumer和producer对象

       根据topictest.message创建publisher;

       根据topictest.contro创建consumer,同时监听订阅者反馈的消息

     【引用】http://boy00fly.iteye.com/blog/1103586

  • 相关阅读:
    负载均衡(负载平衡)
    JavaScript中绑定事件监听函数的通用方法[ addEvent() ]
    有趣的浏览器检测
    IE6 bug之 href= “javascript:void(0);”
    SVN使用技巧 不要把不必要的文件版本化 *.suo,*.bin,*.obj
    CacheDependency缓存依赖里面的 absoluteExpiration(绝对到期时间),弹性到期时间(slidingExpiration)
    TimeSpan 和 DateTime
    字符串数组 string[] 转换为 字符串(用逗号,作为分隔符),linq Except的用法,linq获取两个字符串数组相同的部分
    List的ToLookup 分组方法
    mysql 返回查询结果,返回out返回值,多表联合查询的分页存储过程
  • 原文地址:https://www.cnblogs.com/plxx/p/4947439.html
Copyright © 2011-2022 走看看