zoukankan      html  css  js  c++  java
  • IBM.WMQ订阅消息

    网上关于IBM这个消息队列中间件的资料相对比较少,C#相关的资料就更少了,最近因为要对接这个队列中间件,花了不少功夫去搜索、整理各种资料,遇到很多问题,因此记录下来。

    1、基于 amqmdnet.dll 进行开发,这个是官方提供的DLL,安装了IBM WebSphere MQ后在安装目录可以找到(C:Program FilesIBMWebSphere MQin)

    2、基于 MmqiNetLite.dll 开发,这是一个开源组件,地址:https://github.com/fglaeser/mmqinet,这个项目代码有部分未完善,原作者也是好几年没更新,但是基础功能可以使用,本文代码主要基于此编写

    源码如下:

    public class IBMWMQConfig
        {
            /// <summary>
            /// MQ主机地址
            /// </summary>
            private const string CONNECTION_HOST = "";
            /// <summary>
            /// 通讯端口
            /// </summary>
            private const int CONNECTION_PORT = 4421;
            /// <summary>
            /// CLIENT_ID
            /// </summary>
            private const string CLIENT_ID = "";
            /// <summary>
            /// 通道名称
            /// </summary>
            public const string CHANNEL = "SYSTEM.ADMIN.SVRCONN";
            /// <summary>
            /// 队列管理器名称
            /// </summary>
            public const string QUEUE_MGR_NAME = "PHIBHUBGW1";
            /// <summary>
            /// 订阅主题持久化标识,{0}标识具体业务
            /// </summary>
            public static readonly string SUBSCRIPTION_TEMPLATE = "JMS:" + QUEUE_MGR_NAME + ":" + CLIENT_ID + "_{0}.REQ:{0}.REQ";
            /// <summary>
            /// 主题名称模板,{0}标识具体业务
            /// </summary>
            public static readonly string TOPIC_TEMPLATE = "{0}.REQ";
            /// <summary>
            /// IBM.WMQ连接字符串
            /// </summary>
            public static readonly string CONNECTION_INFO = string.Format("{0}({1})", CONNECTION_HOST, CONNECTION_PORT);
        }

    订阅消息:

    /// <summary>
            /// 订阅主题
            /// </summary>
            /// <param name="business"></param>
            /// <returns></returns>
            private string SubTopic(string business)
            {
                string message = string.Empty;
                try
                {
                    using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO))
                    {
                        MQSubscription subs = new MQSubscription(mqmgr);
                        if (mqmgr.IsConnected)
                        {
                            int option = MQC.MQSO_CREATE + MQC.MQSO_RESUME + MQC.MQSO_NON_DURABLE + MQC.MQSO_MANAGED + MQC.MQSO_FAIL_IF_QUIESCING;
                            string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business);
                            string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business);
    
                            subs.Subscribe(subName, option, topicName);
    
                            MQMessage incoming = new MQMessage();
                            MQGetMessageOptions gmo = new MQGetMessageOptions();
                            gmo.WaitInterval = 10 * 1000;//MQC.MQWI_UNLIMITED;
                            gmo.Options |= MQC.MQGMO_WAIT;
                            gmo.Options |= MQC.MQGMO_SYNCPOINT;
    
                            subs.Get(incoming, gmo);
                            message = incoming.ReadAll();
    
                            //subs.Close(MQC.MQCO_REMOVE_SUB, closeSubQueue: true, closeSubQueueOptions: MQC.MQCO_NONE);
                        }
                    }
                }
                catch (MQException e)
                {
                    message = e.Reason.ToString();
                }
                return message;
            }

    向队列推送一条消息:

    /// <summary>
            /// 向消息队列推送一条消息
            /// </summary>
            /// <param name="msg">消息内容</param>
            /// <param name="queueName">队列名称</param>
            public void PushMsgToQueue(string msg, string queueName)
            {
                using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO))
                using (var q = new MQQueue(mqmgr, queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_OUTPUT + MQC.MQOO_FAIL_IF_QUIESCING))
                {
                    var outgoing = new MQMessage()
                    {
                        CharacterSet = MQC.CODESET_UTF,
                        Encoding = MQC.MQENC_NORMAL
                    };
                    outgoing.WriteString(msg);
                    q.Put(outgoing, new MQPutMessageOptions());
                }
            }
    
            /// <summary>
            /// 向消息队列推送一条消息
            /// </summary>
            /// <param name="msg">消息内容</param>
            /// <param name="queueName">队列名称</param>
            public void PushMsgToQueue1(string msg, string queueName)
            {
                using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO))
                {
                    if (mqmgr.IsConnected)
                    {
                        var outgoing = new MQMessage()
                        {
                            CharacterSet = MQC.CODESET_UTF,
                            Encoding = MQC.MQENC_NORMAL
                        };
                        outgoing.WriteString(msg);
    
                        var od = new MQObjectDescriptor
                        {
                            ObjectType = MQC.MQOT_Q,
                            ObjectName = queueName
                        };
                        mqmgr.Put1(od, outgoing, new MQPutMessageOptions());
                    }
                }
            }
  • 相关阅读:
    Block & 代理
    堆&栈, 内存分配
    ASI 的 使用
    iOS开发-清理缓存功能的实现
    iOS8是如何跳转系统设置页面
    键盘弹出获取高度
    http://www.jianshu.com/collection/9a22b04a9357
    IOS 字符串中去除特殊符号 stringByTrimmingCharactersInSet
    iOS 判断输入是否全是空格
    iOS AFN 请求封装方法
  • 原文地址:https://www.cnblogs.com/zhao-yi/p/10563745.html
Copyright © 2011-2022 走看看