zoukankan      html  css  js  c++  java
  • 使用JMS接口接入WebSphere MQ消息

    在你的应用程序中利用IBM WebSphere MQ消息中间件提供Java消息服务开放接口。

      IBM WebSphere MQ(WMQ)是一套面向消息的中间件(message-oriented middleware,MOM),通过使用消息和队列简化应用程序之间的通信,WMQ支持点到点的和发布/订阅消息,支持多种平台,包括Windows、AIX、HP-UX和Sun Solaris,它特别适合目前的异构计算环境。

      对于Java开发者而言,WMQ为Java应用程序提供了两种接口:

      1.为Java提供MQ基础类 - 一个基于WMQ本地接口的Java接口。

      2.MQ JMS - Java消息服务(Java Messaging Service ,JMS)1.1接口的实现。

      JMS通过开启Java应用程序发送和接收消息扩展了Java的互操作性,JMS应用程序可以使用点到点或发布/订阅模式进行消息交换。

      本文描述的是使用WMQ和JMS进行开发的过程,重心集中在点到点消息,通过下面几步教你如何使用Java类编写JMS接口:

      1.创建WMQ对象

      2.创建JMS管理对象

      3.解释JMS代码

      4.运行一个实例类发送消息和从WMQ接收消息

      WMQ安装

      本文使用的是winxp上的MQ 版本 7(可以从http://www.ibm.com/developerworks/downloads/ws/wmq/learn.html下载试用版),在windows上的安装是非常简单的,一路默认就可以完成安装。

      如果你还没有使用过WMQ也不要担心,你可以使用基于Eclipse的WebSphere MQ管理器,它是一个简单的用于管理WMQ的图形工具。

      创建MQ对象:队列管理器和队列

      队列是用来存储消息的,直到应用程序处理完毕才释放,队列管理器拥有并管理队列,要创建一个队列管理器和队列,按以下步骤启动WebSphere MQ管理器:开始?所有程序? IBM WebSphere MQ ? WebSphere MQ Explorer。图1显示了启动WebSphere MQ管理器时的样子。

      图1 WebSphere MQ 管理器

      在MQ管理器中执行下列操作创建一个队列管理器:

      1.在导航视图下,右击“队列管理器”,选择“新建--队列管理器”,启动创建队列管理器向导。

      2.在第一步中输入队列管理器的名字“TestMQ”,点击“下一步”。如图2所示。

      图2 创建队列管理器,第一步:输入队列管理器名称

      3.在第二步和第三步都点击“下一步”,进入第四步,确定选中了“创建一个TCP/IP监听器”,然后输入一个未使用的端口号,点击“完成”。如图3所示。

      图3 创建队列管理器,第四步:检查队列管理器的监听器端口号。

      接下来创建两个队列:IN.QUEUE 和 OUT.QUEUE。你就可以将消息写入IN.QUEUE,从OUT.QUEUE读取消息。

      1.在TestMQ下,右击“队列”?“新建”?“本地队列”启动“新建本地队列”向导。

      2.在名称区域,输入IN.QUEUE,点击“完成”。如图4所示。

      图4 创建队列向导:输入队列名称,其它属性值保留默认值

      3.重复上面的步骤创建好队列OUT.QUEUE。

    创建JMS管理对象

      JMS定义了一个通用的接口来发送和接收消息,只要与之通信的程序兼容JMS即可,点到点的JMS接口是:

      1. javax.jms.QueueConnection - 这个接口提供一个到JMS提供程序的连接,用于创建会话对象。

      2.javax.jms.QueueSession - 这个接口为产生和消耗消息提供上下文呢,包括创建QueueSender和QueueReceiver的方法。

      3. javax.jms.QueueSender - 这个接口用于向队列发送消息,javax.jms.QueueReceiver用于从队列接收消息。

      为了让你的代码可以在不同的消息提供程序间有良好的移植性,你必须在你的应用程序中使用javax.jms中的标准JMS接口,所有特定厂家的信息都封装在javax.jms.QueueConnectionFactory 和 javax.jms.Queue中,这些管理对象可以使用厂家提供管理工具进行构建,存储在JNDI命名空间中,JMS应用程序可以从命名空间中检索这些对象,这时就不需要知道是哪个厂家提供的了。

      按照下面的步骤使用Websphere MQ管理器创建管理对象,存储在基于文件的目录下:

      1.在JMS-管理对象上点击右键?添加初始上下文。

      2.在屏幕1上:

       为“JNDI存储在哪里”选择“文件系统”

       在关联目录处,输入C:JNDI-Directory(前提是这个目录已经存在)

       注意工厂类和提供程序URL,因为你将会在Java代码中使用到(如图5所示)

      图5 添加初始上下文向导:你将在样例类中使用工厂类和提供程序URL

      3.你的管理器现在应该如图6所示

      图6 添加初始上下文后的MQ管理器:你可以在MQ管理器中轻易地创建JMS管理对象

      在新的初始上下文中,创建一个连接工厂。

      1.在连接工厂上点击右键?新建?连接工厂,在第一个屏幕上,在名称区域输入“TestQM_QCF”,点击“下一步”,在JNDI查找中你就使用TestQM_QCF了。

      2.将类型设置为“队列连接工厂”,点击“下一步”。如图7所示。

      图7 新建连接工厂向导:用于点对点消息的队列连接工厂

      3.保持传送类型为汇集,点击下一步,当WMQ和应用程序在同一机器上时使用汇集传输。

      4.在下一页面点击下一步(无需修改设置)。

      5.在最后一页,选择连接标签,点击“选择”按钮选择TestQM作为“基础队列管理器”。如图8所示。

      图8 新建连接工厂向导:连接工厂被包装为TestMQ

      6.点击“完成”。

      接下来创建目的地,对应WMQ消息的JMS管理对象。

      1.在目的地上点击右键?选择“新建”?“目的地”。

      2.在第一页上,在名字区域输入INInputTestQueue作为名字,确保类型设置为队列了,点击“下一步”。如图9所示。

      图9 新建目的地向导:使用InputTestQueue查找IN.QUEUE

      3.第二页保持默认设置不变,点击“下一步”。

      4.在最后一页:

       在队列管理器区域,点击“选择”按钮选择TestQM。

       在队列区域,点击“选择”按钮选择IN.QUEUE。

      5.点击“完成”。

      重复上述步骤创建另一个目的地:OutputTestQueue,它对应OUT.QUEUE

    理解示例类

      如果你编写过JMS应用程序,就很容易理解JNDIUtil 和 Tester示例类(从http://assets.devx.com/sourcecode/WebSphereMQ_JMSSource&Classes.zip下载Java源文件和编译好的类),你创建的JMS管理对象隐藏了所有厂家专利实现。

      JNDIUtil类

      JNDIUtil包括使用名字通过JNDI查找检索对象的方法,参考清单1,你可以使用这个类中的方法检索你在MQ管理器中定义的JMS对象的引用情况。

      清单1 JNDIUtil.java

    package devx.articles.mqjms;
      
    // JMS 类   import javax.jms.JMSException;
      
    import javax.jms.Queue;
      
    import javax.jms.QueueConnectionFactory;
      
    // JNDI 类   import javax.naming.InitialContext;
      
    import javax.naming.Context;
      
    import javax.naming.NamingException;
      
    // 标准 Java类   import java.util.Hashtable;
      
    /**
      *
      * A wrapper class for JNDI calls
      *
      
    */
      
    publicclass JNDIUtil
      {
      
    private Context context;
      
    public JNDIUtil(String icf, String url) throws JMSException, NamingException
      {
      Hashtable environment
    =new Hashtable();
      environment.put(Context.INITIAL_CONTEXT_FACTORY, icf );
      environment.put(Context.PROVIDER_URL, url);
      context
    =new InitialContext( environment );
      }
      
    /**
      *
      *
    @param ObjName Object Name to be retrieved
      *
    @return Retrieved Object
      *
    @throws NamingException
      
    */
      
    private Object getObjectByName(String ObjName) throws NamingException
      {
      
    return context.lookup( ObjName );
      }
      
    /**
      * A convenience method that returns QueueConnectionFactory objects (no casting required)
      *
    @param factoryName QueueConnectionFactory JNDI name
      *
    @return QueueConnectionFactory object
      *
    @throws NamingException
      
    */
      
    public QueueConnectionFactory getQueueConnectionFactory(String factoryName) throws NamingException
      {
      
    return (QueueConnectionFactory) getObjectByName(factoryName);
      }
      
    /**
      * A convenience method that returns Queue objects (no casting required)
      *
    @param queueName
      *
    @return
      *
    @throws NamingException
      
    */
      
    public Queue getQueue(String queueName) throws NamingException
      {
      
    return (Queue) getObjectByName(queueName);
      }
      }

        Tester类

      Tester类向OUT.QUEUE中写入消息,从IN.QUEUE中读取消息。参考清单2.

      清单2 Tester.java

     package devx.articles.mqjms;
      
    //JMS 类   import javax.jms.Queue;
      
    import javax.jms.QueueSession;
      
    import javax.jms.QueueConnection;
      
    import javax.jms.QueueConnectionFactory;
      
    import javax.jms.JMSException;
      
    import javax.jms.Session;
      
    import javax.jms.QueueSender;
      
    import javax.jms.QueueReceiver;
      
    import javax.jms.TextMessage;
      
    import javax.jms.Message;
      
    //JNDI 类   import javax.naming.NamingException;
      
    //标准 Java 类   /**
      *
      * A class to test JMS with IBM MQ
      *
      
    */
      
    publicclass Tester
      {
      
    publicstatic String icf ="com.sun.jndi.fscontext.RefFSContextFactory";
      
    publicstatic String url ="file:/C:/JNDI-Directory";
      
    publicstaticvoid main(String[] vars) throws JMSException, NamingException
      {
      QueueSession session
    =null;
      QueueConnection connection
    =null;
      QueueConnectionFactory factory
    =null;
      QueueSender queueSender
    =null;
      QueueReceiver queueReceiver
    =null;
      Queue oQueue
    =null; // 消息发送到的队列   Queue iQueue =null; // 接收消息的队列   try
      {
      JNDIUtil jndiUtil
    =new JNDIUtil(icf,url);
      factory
    = jndiUtil.getQueueConnectionFactory("TestQM_QCF");
      connection
    = factory.createQueueConnection();
      
    // 启动(或重新启动)入站消息的连接地址,如果没有这个调用消息不会被接收   connection.start();
      
    //表示一个非相互操作会话   boolean transacted =false;
      session
    = connection.createQueueSession( transacted, Session.AUTO_ACKNOWLEDGE);
      oQueue
    = jndiUtil.getQueue("OutputTestQueue");
      queueSender
    = session.createSender(oQueue);
      TextMessage oMsg
    = session.createTextMessage();
      oMsg.setText(
    "www.devx.com");
      
    // 你还可以设置其他消息属性   queueSender.send(oMsg);
      iQueue
    = jndiUtil.getQueue("InputTestQueue");
      queueReceiver
    = session.createReceiver(iQueue);
      Message iMsg
    = queueReceiver.receive(1000);
      
    if ( iMsg !=null )
      System.out.println( ((TextMessage)iMsg).getText() );
      
    else
      System.out.println(
    "No messages in queue " );
      }
      
    finally
      {
      
    //总是释放资源   if ( queueReceiver!=null )
      queueReceiver.close();
      
    if ( queueSender!=null )
      queueSender.close();
      
    if ( session!=null )
      session.close();
      
    if ( connection!=null )
      {
      connection.close();
      }
      }
      }
      }

       开始点是连接工厂查找,这个工厂用于创建一个连接:

    factory= jndiUtil.getQueueConnectionFactory("TestQM_QCF");
      connection
    = factory.createQueueConnection();

       连接对象用于创建一个会话:

     session = connection.createQueueSession( transacted, Session.AUTO_ACKNOWLEDGE);

      要将消息写入IN.QUEUE queue,查找前面创建的目的地对象OutputTestQueue:

    oQueue= jndiUtil.getQueue("OutputTestQueue");

      最后创建一个QueueSender对象将消息写入队列:

    queueSender = session.createSender(oQueue); TextMessage oMsg = session.createTextMessage(); oMsg.setText("www.devx.com"); queueSender.send(oMsg);

      从OUT.QUEUE读取消息的过程相同,但使用的是QueueReceiver。

    编译运行示例类

      当你安装WMQ时会自动将编译和运行示例类需要的jar文件添加到CLASSPATH环境变量中,需要的jar文件位于C:Program FilesIBMWebSphere MQJavalib,包括JMS和JNDI需要的jar。

      在运行Tester类之前,使用MQ管理器向IN.QUEUE增加一条测试消息:

      1.在 IN.QUEUE上点击右键,选择放入测试消息。

      2.输入任意文本,点击放入消息。

      如果你还没有在IN.QUEUE队列中放入过消息,Tester类会显示“队列中无消息”。

      要运行Tester类,将Tester.class 和 JNDIUtil.class添加到CLASSPATH,然后在命令提示符输入:

     java devx.articles.mqjms.Tester

       应用程序向OUT.QUEUE写入一条消息,并显示从IN.QUEUE检索到的消息,要检查发送到OUT.QUEUE中的消息,进入MQ管理器,在OUT.QUEUE上点击右键?选择“浏览消息”即可。

      企业中的Java和WMQ MOM

      在大型企业环境下,可以充分利用WMQ的性能和稳定性优势,只要你的代码遵循Java标准接口,你就可以受益。

  • 相关阅读:
    代码管理模型概况
    循环链表
    队列

    链表
    java 2020-10-12T11:22:49.000+0800 字符串转换成正常时间格式
    动态数组
    mysql练习
    复杂度与LeetCode
    记一次带逗号的数字类型处理
  • 原文地址:https://www.cnblogs.com/tbyang/p/3489347.html
Copyright © 2011-2022 走看看