zoukankan      html  css  js  c++  java
  • Java调用MQ队列

    IBM MQ 6.0中设置两个队列,(远程队列、通道之类都不设置)。

    队列管理器是XIR_QM_1502

    队列名称是ESBREQ

    IP地址是10.23.117.134(远程的一台电脑,跟我的电脑不在一个局域网内)

    端口1414

    CCSID 1208


    MQ配置可以参考这个,有配图http://wenku.baidu.com/view/06d108d0360cba1aa811daa3.html

    程序如下,发送线程两个,接收线程一个。接收完毕后就结束。


    1. /* 
    2.  * 创建日期 2012-7-10 
    3.  * 
    4.  * TODO 要更改此生成的文件的模板,请转至 
    5.  * 窗口 - 首选项 - Java - 代码样式 - 代码模板 
    6.  */  
    7. package yerasel;  
    8.   
    9. /** 
    10.  * @author Fenglb E-mail:56553655@163.com 
    11.  * @version 创建时间:2009-4-30 下午04:13:38 类说明 
    12.  */  
    13.   
    14. import java.io.IOException;  
    15. import com.ibm.mq.MQC;  
    16. import com.ibm.mq.MQEnvironment;  
    17. import com.ibm.mq.MQException;  
    18. import com.ibm.mq.MQGetMessageOptions;  
    19. import com.ibm.mq.MQMessage;  
    20. import com.ibm.mq.MQPutMessageOptions;  
    21. import com.ibm.mq.MQQueue;  
    22. import com.ibm.mq.MQQueueManager;  
    23.   
    24. interface SomeConstants {  
    25.     String qManager = "XIR_QM_1502";//"XIR_QM"; //QueueManager name  
    26.     String qName = "ESBREQ";// Queue Name  
    27.     String strIP = "10.23.117.134";//"10.24.28.139";//"10.24.28.102";  
    28.     int iPort = 1502;//1414;  
    29.     String strChl = "SYSTEM.DEF.SVRCONN";// Server-Connection Channel  
    30.     int iCCSID = 1208;  
    31. }  
    32.   
    33. class Sender implements Runnable, SomeConstants {  
    34.     public void run() {  
    35.         sendMessage();  
    36.     }  
    37.   
    38.     public void sendMessage() {  
    39.   
    40.         String name = Thread.currentThread().getName();  
    41.         System.out.println("进入线程" + name);  
    42.   
    43.         MQQueueManager qMgr = null;  
    44.         // configure connection parameters  
    45.   
    46.         MQEnvironment.hostname = strIP;  
    47.         // Server name or IP  
    48.         MQEnvironment.port = iPort;  
    49.         MQEnvironment.channel = strChl;  
    50.         MQEnvironment.CCSID = iCCSID;  
    51.   
    52.         // java程序连接mq的方式有两种,一是客户机方式,一是绑定方式,  
    53.         // 默认是客户机方式,当mq部署在本地的时候,就需要用绑定方式  
    54.         // 本机IP是10.24.28.139连接10.23.117.134的时候不需要下句  
    55.         //MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,  
    56.         //MQC.TRANSPORT_MQSERIES_BINDINGS);  
    57.   
    58.         // Create a connection to the QueueManager  
    59.         System.out.println(name + " Connecting to queue manager: " + qManager);  
    60.         try {  
    61.             qMgr = new MQQueueManager(qManager);  
    62.             // Set up the options on the queue we wish to open  
    63.             int openOptions = MQC.MQMT_REQUEST | MQC.MQPMO_NEW_MSG_ID  
    64.                     | MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING  
    65.                     | MQC.MQOO_INPUT_AS_Q_DEF;  
    66.             // Now specify the queue that we wish to open and the open options  
    67.             System.out.println(name + " Accessing queue: " + qName);  
    68.             MQQueue queue = qMgr.accessQueue(qName, openOptions);  
    69.             // Define a simple WebSphere MQ Message ...  
    70.   
    71.             // Specify the default put message options  
    72.             MQPutMessageOptions pmo = new MQPutMessageOptions();  
    73.   
    74.             // Put the message to the queue  
    75.             System.out.println(name + " Sending a message...");  
    76.   
    77.             MQMessage msg = new MQMessage();  
    78.             msg.messageId = "MSGID".getBytes();  
    79.             msg.messageType = MQC.MQMT_REQUEST;  
    80.             msg.replyToQueueName = "ESBREQ";  
    81.   
    82.             // 在此测试一下 mq 的传输次列  
    83.             for (int j = 1; j < 5; j++) {  
    84.                 msg.messageSequenceNumber = j;  
    85.                 // write some text in UTF8 format  
    86.                 try {  
    87.                     String str = "Salemetsizbe Yerasel";  
    88.                     str = str + " " + j;  
    89.                     msg.writeUTF(str);  
    90.                     queue.put(msg, pmo);  
    91.                     msg.clearMessage();  
    92.                     System.out.println(name + " putting the message... " + j);  
    93.                 } catch (MQException mqe) {  
    94.                     mqe.printStackTrace();  
    95.                     break;  
    96.                 } catch (IOException e1) {  
    97.                     e1.printStackTrace();  
    98.                 }  
    99.             }  
    100.             qMgr.commit();  
    101.             System.out.println(name + " Done!");  
    102.             System.out.println("==========");  
    103.             System.out.println("");  
    104.         } catch (MQException e) {  
    105.             e.printStackTrace();  
    106.         }  
    107.     }  
    108. }  
    109.   
    110. class Receiver implements Runnable, SomeConstants {  
    111.   
    112.     public void run() {  
    113.         recvMessage();  
    114.     }  
    115.   
    116.     public void recvMessage() {  
    117.   
    118.         String name = Thread.currentThread().getName();  
    119.           
    120.         try {  
    121.             Thread.sleep(1000);  
    122.             MQQueueManager qMgr = null;  
    123.   
    124.               
    125.             System.out.println("进入线程" + name);  
    126.   
    127.             System.out.println(name + " Connecting to queue manager: "  
    128.                     + qManager);  
    129.             qMgr = new MQQueueManager(qManager);  
    130.             // 设置将要连接的队列属性  
    131.             // Note. The MQC interface defines all the constants used by the  
    132.             // WebSphere MQ Java programming interface  
    133.             // (except for completion code constants and error code constants).  
    134.             // MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the  
    135.             // queue-defined default.  
    136.             // MQOO_OUTPUT:Open the queue to put messages.  
    137.             int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT  
    138.                     | MQC.MQOO_INQUIRE;  
    139.   
    140.             // Now get the message back again. First define a WebSphere MQ  
    141.             // message to receive the data  
    142.             MQMessage rcvMessage = new MQMessage();  
    143.   
    144.             // Specify default get message options  
    145.             MQGetMessageOptions gmo = new MQGetMessageOptions();  
    146.             gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// Get messages  
    147.                                                             // under sync point  
    148.                                                             // control(在同步点控制下获取消息)  
    149.             gmo.options = gmo.options + MQC.MQGMO_WAIT; // Wait if no messages  
    150.                                                         // on the  
    151.                                                         // Queue(如果在队列上没有消息则等待)  
    152.             gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if  
    153.                                                                     // Qeue  
    154.                                                                     // Manager  
    155.                                                                     // Quiescing(如果队列管理器停顿则失败)  
    156.             gmo.waitInterval = 1000; // Sets the time limit for the  
    157.                                         // wait.(设置等待的毫秒时间限制)  
    158.   
    159.             System.out.println(name + " Accessing queue: " + qName);  
    160.             MQQueue queue = qMgr.accessQueue(qName, openOptions);  
    161.             int depth = 0;  
    162.   
    163.             // Get the message off the queue.  
    164.             System.out.println("... " + name + " getting the message back again");  
    165.             for (;;) {  
    166.                 try {  
    167.                     queue.get(rcvMessage, gmo);  
    168.                     System.out.println(" ID: "  
    169.                             + (new String(rcvMessage.messageId)).trim()  
    170.                             + " Num: " + rcvMessage.messageSequenceNumber  
    171.                             + " Type: " + rcvMessage.messageType + " Flag: "  
    172.                             + rcvMessage.messageFlags);  
    173.                     // And display the message text...  
    174.                     String msgText = rcvMessage.readUTF();  
    175.                     System.out.println("The message is: " + msgText);  
    176.                     rcvMessage.clearMessage();  
    177.   
    178.                     // Break if no MSG left in queue  
    179.                     depth = queue.getCurrentDepth();  
    180.                     if (depth == 0)  
    181.                         break;  
    182.   
    183.                 } catch (MQException mqe) {  
    184.                     mqe.printStackTrace();  
    185.                     break;  
    186.                     // null;  
    187.                 } catch (IOException e) {  
    188.                     e.printStackTrace();  
    189.                 }  
    190.             }  
    191.             // Close the queue  
    192.             System.out.println(name + " Closing the queue");  
    193.             queue.close();  
    194.             // Disconnect from the QueueManager  
    195.             System.out.println(name + " Disconnecting from the Queue Manager");  
    196.             qMgr.disconnect();  
    197.             System.out.println(name + " Done!");  
    198.             System.out.println("==========");  
    199.             System.out.println("");  
    200.         } catch (MQException ex) {  
    201.             System.out  
    202.                     .println("A WebSphere MQ Error occured : Completion Code "  
    203.                             + ex.completionCode + " Reason Code "  
    204.                             + ex.reasonCode + ex.getMessage());  
    205.         } catch (InterruptedException e1) {  
    206.             e1.printStackTrace();  
    207.         }  
    208.     }  
    209. }  
    210.   
    211. public class MQTest {  
    212.   
    213.     public static void main(String args[]) {  
    214.   
    215.         /* 
    216.          * MQTest first = new MQTest(); first.sendMessage(); 
    217.          * first.recvMessage(); 
    218.          */  
    219.         Sender sender = new Sender();  
    220.         Thread senderThread = new Thread(sender);  
    221.         senderThread.start();  
    222.         senderThread.setName("Sender");  
    223.           
    224.         Thread senderThread2 = new Thread(sender);  
    225.         senderThread2.start();  
    226.         senderThread2.setName("Sender2");  
    227.           
    228.         Receiver recv = new Receiver();  
    229.         Thread recvThread = new Thread(recv);  
    230.         recvThread.start();  
    231.         recvThread.setName("Receiver");  
    232.   
    233.         // Receiver recv = new Receiver();  
    234.         // new Thread(recv).start();  
    235.   
    236.     }  
    237.   
    238. }  


    运行结果如下:

    进入线程Sender2
    进入线程Sender
    Sender2 Connecting to queue manager: XIR_QM_1502
    Sender Connecting to queue manager: XIR_QM_1502
    Sender2 Accessing queue: ESBREQ
    Sender2 Sending a message...
    Sender Accessing queue: ESBREQ
    Sender Sending a message...
    Sender2 putting the message... 1
    Sender putting the message... 1
    Sender2 putting the message... 2
    Sender putting the message... 2
    Sender2 putting the message... 3
    Sender putting the message... 3
    Sender2 putting the message... 4
    Sender putting the message... 4
    Sender2 Done!
    ==========


    Sender Done!
    ==========


    进入线程Receiver
    Receiver Connecting to queue manager: XIR_QM_1502
    Receiver Accessing queue: ESBREQ
    ... Receiver getting the message back again
     ID: MSGID Num: 1 Type: 1 Flag: 0
    The message is: Salemetsizbe Yerasel 1
     ID: MSGID Num: 1 Type: 1 Flag: 0
    The message is: Salemetsizbe Yerasel 1
     ID: MSGID Num: 1 Type: 1 Flag: 0
    The message is: Salemetsizbe Yerasel 2
     ID: MSGID Num: 1 Type: 1 Flag: 0
    The message is: Salemetsizbe Yerasel 2
     ID: MSGID Num: 1 Type: 1 Flag: 0
    The message is: Salemetsizbe Yerasel 3
     ID: MSGID Num: 1 Type: 1 Flag: 0
    The message is: Salemetsizbe Yerasel 3
     ID: MSGID Num: 1 Type: 1 Flag: 0
    The message is: Salemetsizbe Yerasel 4
     ID: MSGID Num: 1 Type: 1 Flag: 0
    The message is: Salemetsizbe Yerasel 4
    Receiver Closing the queue
    Receiver Disconnecting from the Queue Manager
    Receiver Done!
    ==========

  • 相关阅读:
    【linux】驱动-13-阻塞与非阻塞
    【linux】驱动-12-并发与竞态
    【linux】驱动-11-gpio子系统
    【linux】驱动-10-pinctrl子系统
    【linux】驱动-9-设备树插件
    手写Java分页模块
    JDBC连接与自定义线程池
    类加载器
    网络编程之TCP
    网络编程之UDP
  • 原文地址:https://www.cnblogs.com/pricks/p/3904129.html
Copyright © 2011-2022 走看看