zoukankan      html  css  js  c++  java
  • JMS连接WMQ及收发消息

    因为对JMS的了解也只算入门级,有些概念也很模糊,不过,卤煮会尽可能去介绍的。另外,sample code都调试过可以跑。

    1.神马是JMS?

    jms即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。——摘自百度百科

    2.JMS组成要素

    JMS提供者,JMS客户,JMS生产者,JMS消费者,JMS队列和JMS主题。具体啥意思,还是参考百度百科吧,不过,概念讲起来也木有啥意思,还会把人绕晕。多跑几次sample,回来再看概念可能更清楚些哈。另,博文中的JMS提供者以IBM 的WebSphere MQ为例。

    3.JMS模型

    JMS模型定义一组可供 Java™ 应用程序用于执行消息传递操作的接口。

    以下列表概括了主要的JMS接口:

    Destination:Destination 对象是应用程序将消息发往的位置和/或应用程序从其接收消息的源。

    ConnectionFactory:ConnectionFactory 对象包括连接的一组配置属性。应用程序使用连接工厂来创建连接。

    Connection:Connection 对象包括应用程序与消息传递服务器的活动连接。应用程序使用连接来创建会话。

    Session:Session 对象是用于发送和接收消息的单个线程上下文。应用程序使用会话来创建消息、消息生产者和消息使用者。会话是事务性或非事务性会话。

    Message:Message 对象包括应用程序发送或接收的消息。

    MessageProducer:应用程序使用消息生产者将消息发送到目标。

    MessageConsumer:应用程序使用消息使用者来接收已发送到目标的消息。

    下图是这些对象之间的关系(摘自IBM Info Center)其实,IBM info center中资料多多的呀。

    Destination、ConnectionFactory 或 Connection 对象可供多线程应用程序的不同线程并发使用,但是 Session、MessageProducer 或 MessageConsumer 对象不能供不同线程并发使用。确保不并发使用 Session、MessageProducer 或 MessageConsumer 对象的最简单方法是为每个线程创建单独的 Session 对象。

    JMS支持两种消息传递样式:

    • 点到点消息传递
    • 发布/预订消息传递

    这两类消息传递也被称为消息传递域,且您可以将两类消息传递都组合在一个应用程序中。在点到点域中,目标是队列,而在发布/预订域中,目标是主题。

     通过JMS1.1 以前的JMS版本,对点到点域的程序设计使用一组接口和方法,而对发布/预订域的程序设计使用另一组接口和方法。两组接口和方法是相似的,但却各自独立。通过JMS1.1,您可以使用一组公共的支持两类消息传递域的接口和方法。公共接口提供了独立于域的每个消息传递域的视图。下表列出了独立于JMS 域的接口及其相应的特定于域的接口。

    特别清楚有木有。

    4.开发JMS客户端用于连接WMQ及收发消息

    下面的code能跑起来的前提是,本地已安装MQ,且创建好队列管理器和相应的QUEUE,TOPIC。

    连接方式一:使用的是IBM对于JMS的实现(要导入包com.ibm.mqjms.jar):

      1 package com.demo;
      2 
      3 import java.io.UnsupportedEncodingException;
      4 
      5 import javax.jms.BytesMessage;
      6 import javax.jms.Connection;
      7 import javax.jms.DeliveryMode;
      8 import javax.jms.ExceptionListener;
      9 import javax.jms.JMSException;
     10 import javax.jms.Message;
     11 import javax.jms.MessageConsumer;
     12 import javax.jms.MessageListener;
     13 import javax.jms.MessageProducer;
     14 import javax.jms.Queue;
     15 import javax.jms.QueueConnection;
     16 import javax.jms.QueueConnectionFactory;
     17 import javax.jms.Session;
     18 import javax.jms.TextMessage;
     19 import javax.naming.NamingException;
     20 
     21 import com.ibm.mq.jms.MQQueueConnectionFactory;
     22 
     23 public class JmsQueueDemo {
     24     private static Connection conn = null;
     25     private static Session session = null;
     26     private static MessageProducer producer = null;
     27     private static MessageConsumer consumer = null;
     28     private static QueueConnection qConn = null;
     29 
     30     public static void init() {
     31         // 连接工厂,用com.ibm.mq.jms中的类实现javax.jms中的接口
     32         QueueConnectionFactory qcf = new MQQueueConnectionFactory();
     33 
     34         // 设置连接工厂属性
     35         try {
     36             //设置WMQ所在机器的IP
     37             ((MQQueueConnectionFactory) qcf).setHostName("localhost");
     38             //设置WMQ上队列管理器名
     39             ((MQQueueConnectionFactory) qcf).setQueueManager("TestQM");
     40             //设置WMQ上的通道名
     41             ((MQQueueConnectionFactory) qcf).setChannel("SYSTEM.DEF.SVRCONN");
     42             //设置WMQ上的监听端口
     43             ((MQQueueConnectionFactory) qcf).setPort(1414);
     44             
     45             //由连接工厂创建连接
     46             qConn = qcf.createQueueConnection();
     47 
     48             //建立异常监听器用于监听连接过程中发生的异常
     49              ExceptionListener exceptionListener = new ExceptionListener(){
     50             
     51              //此处可放入更多逻辑,由自己定义
     52              public void onException(JMSException e) {
     53              System.out.println("mq exception");
     54              e.printStackTrace();
     55              System.exit(0);
     56              }
     57             
     58              };
     59              //在连接上面注册监听器
     60              qConn.setExceptionListener(exceptionListener);
     61         } catch (JMSException e) {
     62 
     63             e.printStackTrace();
     64             return;
     65         }
     66     }
     67 
     68     public static void main(String[] args) throws NamingException,
     69             JMSException, UnsupportedEncodingException {
     70 
     71         init();
     72          sendMessage();
     73         // receiveMessage();
     74         receiveWithListener();
     75         destroy();
     76 
     77     }
     78 
     79     public static void sendMessage() throws JMSException {
     80         boolean transacted = false;
     81 
     82         // 非事务处理(分别接收或发送消息)[事务处理(全部发送或者全部接收作为一个单元的一组消息)]
     83         session = qConn
     84                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
     85         // 由session创建要发送到的队列
     86         Queue inputQ = session.createQueue("TestQ");
     87         
     88         //由session创建消息发送者
     89         MessageProducer sender = session.createProducer(inputQ);
     90         
     91         //启动连接
     92         qConn.start();
     93         
     94         // 消息由会话创建
     95         TextMessage message = session.createTextMessage();
     96         //设置消息内容
     97         message.setText("this is input message from queue sender");
     98         //这句可有可无的哦,主要用于设置消息属性;方便后面取消息时,取特定类型的消息,如"company='systems'"
     99         message.setStringProperty("company", "systems");
    100 
    101         // 发送消息,后面的参数依次为消息的持久性设置,消息的优先级,消息在队列的存活时间,设置为0,表示永不失效
    102         //DeliveryMode为PERSISTENT表示,队列管理器或者WMQ重启后,消息仍在queue中;NON_PERSISTENT意思相反
    103         sender.send(message, DeliveryMode.NON_PERSISTENT, 7, 0);
    104     }
    105 
    106     //使用listener的方式从queue中取消息,可一次取多条消息出来
    107     public static void receiveWithListener() throws JMSException {
    108 
    109         boolean transacted = false;
    110 
    111         // 非事务处理(分别接收或发送消息)[事务处理(全部发送或者全部接收作为一个单元的一组消息)]
    112         session = qConn
    113                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
    114         //从同一个queue中取消息,此处为使用session创建queue
    115         Queue outputQ = session.createQueue("TestQ");
    116         //使用session创建消息消费者,注意了,后面的那个参数就是消息选择器,用于接收特定类型的消息
    117         consumer = session.createConsumer(outputQ, "company='t-systems'");
    118         //创建消息监听器
    119         MessageListener listener = new MessageListener() {
    120             public void onMessage(Message message) {
    121                 try {
    122                     if (message instanceof TextMessage) {
    123                         System.out.println("Listener 接收消息:"
    124                                 + ((TextMessage) message).getText());
    125                     }
    126                 } catch (JMSException e) {
    127                     e.printStackTrace();
    128                 }
    129             }
    130         };
    131         //注册消息监听器
    132         consumer.setMessageListener(listener);
    133         //启动连接
    134         qConn.start();
    135         try {
    136             Thread.sleep(10 * 1000);
    137         } catch (InterruptedException e) {
    138             e.printStackTrace();
    139         }
    140     }
    141     //取消息的另一种方式,手动从queue中取消息,一次只能接收一条消息
    142     public static void receiveMessage() throws JMSException,
    143             UnsupportedEncodingException {
    144 
    145         boolean transacted = false;
    146 
    147         // 非事务处理(分别接收或发送消息)[事务处理(全部发送或者全部接收作为一个单元的一组消息)]
    148         session = qConn
    149                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
    150         // 对队列管理器上队列的映射
    151         Queue outputQ = session.createQueue("TestQ");
    152         consumer = session.createConsumer(outputQ);
    153         qConn.start();
    154         //此时,若send的message中,设置了message的属性如,"company='systems'",下面的方法是取不到消息的哈
    155         //要取到消息可屏蔽到send中设置message属性的语句,或使用consumer.receive("company='systems'")
    156         Message msg = consumer.receiveNoWait();
    157         //转换消息格式
    158         if (msg instanceof TextMessage) {
    159             TextMessage message = (TextMessage) msg;
    160             System.out.println("received message from queue is:"
    161                     + message.getText());
    162         } else if (msg instanceof BytesMessage) {
    163             BytesMessage message = (BytesMessage) msg;
    164             byte buff[] = null;
    165             long length = message.getBodyLength();
    166             buff = new byte[(int) length];
    167             message.readBytes(buff);
    168             String textmessage = new String(buff, "UTF-8");
    169             System.out.println("received message from queue is:" + textmessage);
    170         }
    171     }
    172     //销毁资源
    173     public static void destroy() throws JMSException {
    174         if (consumer != null) {
    175             consumer.close();
    176         }
    177         if (producer != null) {
    178             producer.close();
    179         }
    180         if (session != null) {
    181             session.close();
    182         }
    183         if (conn != null) {
    184             conn.close();
    185         }
    186 
    187     }
    188 
    189 }
    View Code

    上面的代码是点对点消息发送模式的实现,其实,发布/预定模式的实现也差不多了。

      1 package com.demo;
      2 
      3 import java.io.UnsupportedEncodingException;
      4 
      5 import javax.jms.BytesMessage;
      6 import javax.jms.Connection;
      7 import javax.jms.DeliveryMode;
      8 import javax.jms.ExceptionListener;
      9 import javax.jms.JMSException;
     10 import javax.jms.Message;
     11 import javax.jms.MessageConsumer;
     12 import javax.jms.MessageListener;
     13 import javax.jms.MessageProducer;
     14 import javax.jms.Session;
     15 import javax.jms.TextMessage;
     16 import javax.jms.Topic;
     17 import javax.jms.TopicConnection;
     18 import javax.jms.TopicConnectionFactory;
     19 import javax.naming.NamingException;
     20 
     21 import com.ibm.mq.jms.MQTopicConnectionFactory;
     22 
     23 public class JmsTopicDemo {
     24     private static Connection conn;
     25     private static Session session;
     26     private static MessageProducer producer;
     27     private static MessageConsumer consumer;
     28     private static TopicConnection tConn = null;
     29 
     30     public static void init() {
     31         // 连接工厂,用com.ibm.mq.jms中的类实现javax.jms中的接口
     32         TopicConnectionFactory tcf = new MQTopicConnectionFactory();
     33 
     34         // 设置连接工厂属性
     35         try {
     36             ((MQTopicConnectionFactory) tcf).setHostName("localhost");
     37             ((MQTopicConnectionFactory) tcf).setQueueManager("TestQM");
     38             ((MQTopicConnectionFactory) tcf).setCCSID(1381);
     39             ((MQTopicConnectionFactory) tcf).setChannel("SYSTEM.DEF.SVRCONN");
     40             ((MQTopicConnectionFactory) tcf).setPort(1414);
     41             tConn = tcf.createTopicConnection();
     42 
     43             ExceptionListener exceptionListener = new ExceptionListener() {
     44 
     45                 // 此处可放入更多逻辑
     46                 public void onException(JMSException e) {
     47                     System.out.println("mq exception");
     48                     e.printStackTrace();
     49                     System.exit(0);
     50                 }
     51 
     52             };
     53             tConn.setExceptionListener(exceptionListener);
     54         } catch (JMSException e) {
     55 
     56             e.printStackTrace();
     57             return;
     58         }
     59     }
     60 
     61     public static void main(String[] args) throws NamingException,
     62             JMSException, UnsupportedEncodingException, InterruptedException {
     63 
     64         init();
     65         sendMessage();
     66         receiveMessage();
     67         receiveWithListener();
     68         destroy();
     69 
     70     }
     71 
     72     public static void sendMessage() throws JMSException {
     73         boolean transacted = false;
     74 
     75         session = tConn
     76                 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
     77 
     78         Topic inputTopic = session.createTopic("TestT");
     79         MessageProducer sender = session.createProducer(inputTopic);
     80         tConn.start();
     81 
     82         TextMessage message = session.createTextMessage();
     83         message.setText("this is input message from topic sender");
     84 
     85         sender.send(message, DeliveryMode.PERSISTENT, 7, 0);
     86 
     87     }
     88 
     89     public static void receiveWithListener() throws JMSException {
     90         // receive message from mq
     91 
     92         boolean transacted = false;
     93 
     94         session = tConn
     95                 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
     96         Topic outputT = session.createTopic("TestT");
     97 
     98         consumer = session.createConsumer(outputT);
     99 
    100         MessageListener listener = new MessageListener() {
    101             public void onMessage(Message message) {
    102                 try {
    103                     if (message instanceof TextMessage) {
    104                         System.out.println("Listener 接收消息:"
    105                                 + ((TextMessage) message).getText());
    106                     }
    107                 } catch (JMSException e) {
    108                     e.printStackTrace();
    109                 }
    110             }
    111         };
    112         consumer.setMessageListener(listener);
    113         tConn.start();
    114         try {
    115             Thread.sleep(10 * 1000);
    116         } catch (InterruptedException e) {
    117             e.printStackTrace();
    118         }
    119     }
    120 
    121     public static void receiveMessage() throws JMSException,
    122             UnsupportedEncodingException {
    123 
    124         boolean transacted = false;
    125 
    126         session = tConn
    127                 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
    128 
    129         Topic outputT = session.createTopic("TestT");
    130         consumer = session.createConsumer(outputT);
    131 
    132         tConn.start();
    133         Message msg = consumer.receive(5000);
    134         if (msg instanceof TextMessage) {
    135             TextMessage message = (TextMessage) msg;
    136             System.out.println("received message from topic is:"
    137                     + message.getText());
    138         } else if (msg instanceof BytesMessage) {
    139             BytesMessage message = (BytesMessage) msg;
    140             byte buff[] = null;
    141             long length = message.getBodyLength();
    142             buff = new byte[(int) length];
    143             message.readBytes(buff);
    144             String textmessage = new String(buff, "UTF-8");
    145             System.out.println("received message from topic is:" + textmessage);
    146         }
    147     }
    148 
    149     public static void destroy() throws JMSException {
    150         if (consumer != null) {
    151             consumer.close();
    152         }
    153         if (producer != null) {
    154             producer.close();
    155         }
    156         if (session != null) {
    157             session.close();
    158         }
    159         if (conn != null) {
    160             conn.close();
    161         }
    162 
    163     }
    164 
    165 }
    View Code

    不同点的地方,可能就是从topic中取消息的时候,因为消息不能在topic中驻留,所以,一定要是取的动作已经发生了或准备好,再做发送消息的操作,这时候,才能够从topic中收到发送的消息。怎么理解呢?比如说,queue是个篮子,消息发送出去,就会在篮子里。有时,发送的动作也许早就已经完成,但消息会一直在篮子里。取消息的时候,从篮子中取走即可。而topic则是一个篮球框的那种篮网,消息发送到篮网,如果下面没有接收的东西,就会直接从篮网溜走,你这时再去取消息,就只能是竹篮打水一场空啦。所以,从topic中收消息,一定要再发消息之前先把接的动作准备好的哈。

    连接方式二:使用javax.jms中的标准JMS接口(要导入包jms.jar)

    使用javax.jms中的标准JMS接口,可以隐藏特定厂商的实现,使代码具有更好的可移植性。开发时,只需要关注javax.jms.QueueConnectionFactory 和 javax.jms.Queue这两个对象即可。通常JMS提供厂商会有自己的工具去构建这两个对象,并存储在JNDI命名空间中。可以从命名空间中检索这两个对象,而不用care是哪个厂商实现滴。

    对于WMQ,这两个对象是这样构建滴哈。

    1)打开WMQ,找到“JMS受管对象”,右击“添加初始上下文”,如下图选择,记得绑定目录是先创建好的哈。

    点击“完成”。

    2)在刚建好的“初始上下文”下面,找到”连接工厂“,右击新建“连接工厂”,如下图,

    点击”下一步“,到最后一页,在连接里面做如下设置,

    选择”基本队列管理器“(应先在WMQ上创建好一个队列管理器,此处选上;另还需先创建好;一个queue,下面要用到滴),完成。

    此时我们就有javax.jms.QueueConnectionFactory这个对象了。

    3)在新建的”初始上下文“下,找到”目标“,新建”目标“,点”下一步“,到最后一页,做下面的设置,

     选择”队列管理器“和”队列“,应先在WMQ上创建好。这时,javax.jms.Queue这个对象也构建好了呢。

    OK,完事具备了,下面就是配合JNDI方式实现的标准JMS接口了。

     1 package com.demo;
     2 
     3 import java.util.Hashtable;
     4 
     5 import javax.jms.*;
     6 import javax.naming.*;
     7 
     8 public class Tester {
     9     public static void main(String[] args) {
    10         send();
    11     }
    12 
    13     public static void send() {
    14         try {
    15         
    16             Hashtable<String, String> environment = new Hashtable<String, String>();
    17             //刚才建的初始上下文哈
    18             environment.put(Context.INITIAL_CONTEXT_FACTORY,
    19                     "com.sun.jndi.fscontext.RefFSContextFactory");
    20             environment.put(Context.PROVIDER_URL, "file:/D:/JNDI-Directory");
    21             InitialContext initContext = new InitialContext(environment);
    22             //创建的连接工厂哈
    23             ConnectionFactory factory = (ConnectionFactory) initContext
    24                     .lookup("TestQM_Q");
    25             //创建的目标哈
    26             Destination destination = (Destination) initContext
    27                     .lookup("TestQueue");
    28             initContext.close();
    29             
    30             Connection connection = factory.createConnection();
    31             Session session = connection.createSession(false,
    32                     Session.AUTO_ACKNOWLEDGE);
    33             
    34             MessageProducer sender = session.createProducer(destination);
    35             // Send messages
    36             TextMessage message = session
    
    37                     .createTextMessage("hello, this is my first jms sample code~~~");
    38             
    39             //记住,message并不是send到“TestQueue”里面去了,而是send到WMQ上与“TestQueue”关联的那个queue里面去了
    40             sender.send(message);
    41             session.close();
    42             connection.close();
    43         } catch (Exception e) {
    44             e.printStackTrace();
    45         }
    46     }
    47 }
    View Code

    好了,就介绍到这里了。Happy ending~~~

  • 相关阅读:
    Web前端框架与类库的思考【转】
    mouseover事件mouseenter事件
    11_Eclipse中演示Git版本号的创建,历史版本号的改动,创建分支,合并历史版本号和当前版本号
    实现了私聊和群聊功能的聊天工具
    有预处理命令#define声明一个常数,用以表明1年中有多少秒
    解决Office软件冲突问题
    pig载入两个不同字段个数的文件?load file with different items(f1有42列,f2有43列读到一个对象中)
    游戏公司通用屏蔽字列表
    Android统计图表MPAndroidChart
    HTTP状态码解析
  • 原文地址:https://www.cnblogs.com/alvwood/p/3184538.html
Copyright © 2011-2022 走看看