zoukankan      html  css  js  c++  java
  • jms在jboss上的简单应用

    核心概念:

    Ø连接工厂(ConnectionFactory)客户端用来创建连接的管理对象。
    Ø连接(Connection)代表一个与JMS提供者的活动连接。
    Ø目的(Destination)标识消息接收方式。
    Ø会话(Session)接收和发送消息的会话线程。
    Ø消息生产者(MessageProducer)会话使用它把消息发送到目的地。
    Ø消息消费者(MessageConsumer)会话使用它从目的地接收消息生产者发送的消息
     
    1.发送消息的客户端使用JMS的过程

    (1)使用JNDI查询管理对象ConnectionFactory和Destination

    (2)使用管理对象ConnectionFactory建立连接Connection

    (3)使用连接Connection建立会话Session

    (4)使用会话Session和管理对象Destination创建消息生产者MessagerProducer

    (5)使用消息生产者MessagerPriducer发送消息

    2.接收消息的客户端使用JMS的过程

    (1)使用JNDI查询管理对象ConnectionFactory和Destination

    (2)使用管理对象ConnectionFactory建立连接Connection

    (3)使用连接Connection建立会话Session

    (4)使用会话Session和管理对象Destination创建消息生产者MessagerProducer

    (5)使用消息生产者MessagerConsumer接收消息

    3. JMS中支持两种事务方式:事务性会话和JTA事务

      a)创建事务性会话的代码:

      qsession=qcon.createQueueSession(true; //在PTP方式下创建事务性会话

      Session.AUTO_ACKNOWLEDGE

      );

      tsession=tcon.createTopicSession(

      true; //在Pub/Sub方式下创建事务性会话

      Session.AUTO_ACKNOWLEDGE

      );

    b)JTA事务:JTA支持跨数据源的事务,步骤如下:

    (1)创建非事务性会话

    QueuesSession session=connection.createQueueSession(

                             false,Session.AUTO_ACKNOWLEGE);

    (2)使用JNDI查询JTA事务引用

    Context ctx = new InitialContext();

    UserTansaction ux=(UserTansaction)ctx.lookup

    (“javax.transaction.UserTansaction”);

    (3)开始事务

    ux.begin();

    (4)执行业务操作

    (5)提交或回滚事务

    提交事务:ux.commit();

    回滚事务:ux.rollback();

    服务区端代码

    package test.jms;

    import javax.jms.*;
    import javax.naming.*;

    public class Server {
      private static Server instance = new Server();
      private  TopicSession tsession = null;
      private  Topic topic = null;
      private  TopicPublisher tpub = null;

      private TopicConnectionFactory tcf = null;
      private TopicConnection tconn = null;
      private Context ctx = null;

      private Server() {
        init();
      }

      public static Server getInstance(){
        return instance;
      }


      public void sendMessage(MessageInfo msgInfo){
        int tryTimes = 0;
          while(true){
            try {
              if(msgInfo == null){
                break;
              }
              Message msg = tsession.createMessage();
              msg.setStringProperty("xxxx", msgInfo.getxxxx());
              msg.setStringProperty("xxxx", msgInfo.getxxxx());
              msg.setStringProperty("xxxx", msgInfo.getxxxx());
              msg.setStringProperty("xxxx", msgInfo.getxxxx());
              msg.setStringProperty("xxxx", msgInfo.getxxxx());
              msg.setIntProperty("xxxx", msgInfo.getxxxx());

              tpub.publish(msg);
              break;
            }
            catch (Exception e) {
              if (!init()) {
                if (tryTimes < 20) { //默认15分钟可以恢复数据库连接,这里冗余一部分
                  tryTimes++;
                  try{
                    Thread.sleep(60000); //一分种后重试
                  }
                  catch(Exception ex){
                    Logger.log(Logger.DEBUG_TYPE,ex);
                  }
                  continue;
                }
                else{
                  System.out.println("系统消息机制异常,系统将自动退出!请进行系统恢复!");
                  SysTool.exit(0);
                  return;
                }
              }
            }
          }


      }

      private boolean init() {
        try {
          try{
            tpub.close();
          }
          catch(Exception ex){ }
          try{
            tsession.close();
          }
          catch(Exception ex){  }
          try{
            tconn.close();
          }
          catch(Exception ex){  }
          try{
            ctx.close();
          }
          catch(Exception ex){ }

          ctx = new InitialContext();
          tcf = (TopicConnectionFactory)ctx.lookup(
            "ConnectionFactory");
          tconn = tcf.createTopicConnection();
          tsession = tconn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE);
          topic = (Topic)ctx.lookup("topic/xxxxTopic");
          tpub = tsession.createPublisher(topic);
          tconn.start();
          return true;
        }
        catch (Exception e) {
          Logger.log(Logger.DEBUG_TYPE, e);
          return false;
        }
      }

    }

    客户端代码

    package test;
    import java.util.Hashtable;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.Topic;
    import javax.jms.TopicConnection;
    import javax.jms.TopicConnectionFactory;
    import javax.jms.TopicSession;
    import javax.jms.TopicSubscriber;
    import javax.naming.Context;
    import javax.naming.InitialContext;

    public class JMSClinet {
      public static void main(String[] args) {

            while(true){
            try {
                TopicSession tsession = null;

                TopicSubscriber tsub = null;
                Hashtable ht = new Hashtable();
                ht.put(Context.INITIAL_CONTEXT_FACTORY,
                        "org.jnp.interfaces.NamingContextFactory");
                ht.put(Context.PROVIDER_URL, "ip地址:1099");
                ht.put("java.naming.rmi.security.manager", "yes");
                ht.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");

                Context ctx = new InitialContext(ht);

                TopicConnectionFactory factory = (TopicConnectionFactory) ctx
                        .lookup("ConnectionFactory");

                TopicConnection connection = factory.createTopicConnection();


                TopicSession session = connection.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);

                Topic topic = (Topic) ctx.lookup("topic/logInAndOutTopic");

                tsub = session.createSubscriber(topic);

                connection.start();
                Message msg = tsub.receive();
                String xxxx= msg.getStringProperty("xxxx");
                String xxxx= msg.getStringProperty("xxxx");
                String xxxx= msg.getStringProperty("xxxx");
                String xxxx= msg.getStringProperty("xxxx");
                String xxxx= msg.getStringProperty("xxxx");
                int xxxx= msg.getIntProperty("xxxx");     
                connection.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            }
        }

    }

    配置文件

     <mbean code="org.jboss.mq.server.jmx.Topic"
      name="jboss.mq.destination:service=Topic,name=xxxxTopic">
        <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
        <depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends>
        <attribute name="SecurityConf">
           <security>
             <role name="xxxx" read="true" write="true"/>
             <role name="xxxx" read="true" write="true" create="false"/>
             <role name="xxxx" read="true" write="true" create="true"/>
          </security>
        </attribute>
     </mbean>

  • 相关阅读:
    leetcode Remove Linked List Elements
    leetcode Word Pattern
    leetcode Isomorphic Strings
    leetcode Valid Parentheses
    leetcode Remove Nth Node From End of List
    leetcode Contains Duplicate II
    leetcode Rectangle Area
    leetcode Length of Last Word
    leetcode Valid Sudoku
    leetcode Reverse Bits
  • 原文地址:https://www.cnblogs.com/davidwang456/p/2871924.html
Copyright © 2011-2022 走看看