zoukankan      html  css  js  c++  java
  • IBM.WMQ订阅主题,连续获取消息解决办法

    去队列里面一直获取消息,一开始想到了两种解决方案:

    第一:订阅一次获取一次消息,正常的话每次都能获取到,但是要及时去清理订阅并且时间粒度不好控制

    第二:订阅一次,再获取消息这里加死循环,超时MQ已经做了,所以可以不用控制线程等待,获取到消息了以后,直接通过自定义事件的机制去及时处理消息

    从最终实验结果来看,第二种是最优的做法,可以做到随时获取到消息,又不占用资源。接下来我把最终的实现代码分享处理,希望对大家有所帮助,有不对的地方,请及时联系我。

    订阅主题:

            private IIBMWMQMsgHandler _msgHandler;
    
            public IBMWMQHelper()
            {
                _msgHandler = new DefaultIBMWMQMsgHandler();
            }
    
            public IBMWMQHelper(IIBMWMQMsgHandler msgHandler)
            {
                _msgHandler = msgHandler;
            }
    /// <summary>
            /// 订阅主题。订阅一次并尝试一直获取消息
            /// </summary>
            public void SubTopic1(string business, bool isGetMsg = true)
            {
                IBMWMQEventSource eventSource = new IBMWMQEventSource();
                IBMWMQMsgEventListener msgEventListener = new IBMWMQMsgEventListener(_msgHandler);
                MQSubscription subs = null;
                try
                {
                    //订阅事件
                    msgEventListener.Subscribe(eventSource);
                    //MQEnvironment.CCSID = 1381;
    
                    using (var mqmgr = MQQueueManager.Connect(IBMWMQConfig.QUEUE_MGR_NAME, MQC.MQCO_NONE, IBMWMQConfig.CHANNEL, IBMWMQConfig.CONNECTION_INFO))
                    {
                        subs = new MQSubscription(mqmgr);
                        if (mqmgr.IsConnected)
                        {
                            this.TryAdd(business, subs);
    
                            int option = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
                            string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business);
                            string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business);
    
                            try
                            {
                                subs.Subscribe(subName, option, topicName);
                            }
                            catch (MQException ex)
                            {
                                string code = ex.Reason.ToString();
                                //引发事件
                                eventSource.RaiseErroeMsgEvent(business, code);
                            }
    
                            while (isGetMsg)
                            {
                                eventSource.RaiseErroeMsgEvent(business, string.Format("开始尝试获取 {0} 消息...", business));
                                try
                                {
                                    MQMessage incoming = new MQMessage()
                                    {
                                        CharacterSet = MQC.CODESET_UTF,
                                        Encoding = MQC.MQENC_NORMAL
                                    };
                                    MQGetMessageOptions gmo = new MQGetMessageOptions();
                                    gmo.WaitInterval = 10 * 1000; //MQC.MQWI_UNLIMITED;
                                    gmo.Options |= MQC.MQGMO_WAIT;
                                    gmo.Options |= MQC.MQGMO_SYNCPOINT;
    
                                    subs.Get(incoming, gmo);
                                    string message = incoming.ReadAll();
    
                                    if (!string.IsNullOrEmpty(message))
                                    {
                                        //引发事件
                                        eventSource.RaiseNewMsgEvent(business, message);
                                    }
                                }
                                catch (MQException ex)
                                {
                                    string code = ex.Reason.ToString();
                                    //引发事件
                                    eventSource.RaiseErroeMsgEvent(business, code);
                                }
                            }
                        }
                    }
                }
                catch (MQException e)
                {
                    string code = e.Reason.ToString();
                    //引发事件
                    eventSource.RaiseErroeMsgEvent(business, code);
                }
                finally
                {
                    //if (subs != null)
                    //{
                    //    subs.Close(MQC.MQCO_REMOVE_SUB, closeSubQueue: true, closeSubQueueOptions: MQC.MQCO_NONE);
                    //}
                }
            }

    或者使用原生方式:

    /// <summary>
            /// 订阅主题。订阅一次并尝试一直获取消息(原生)
            /// </summary>
            /// <param name="business"></param
            /// <param name="transportMode"></param>
            /// <param name="durability"></param>
            public void SubTopic2(string business, string transportMode = "managed", string durability = "durable")
            {
                Hashtable properties = new Hashtable();
                MQTopic topic = null;
                IBMWMQEventSource eventSource = new IBMWMQEventSource();
                IBMWMQMsgEventListener msgEventListener = new IBMWMQMsgEventListener(_msgHandler);
    
                try
                {
                    //订阅事件
                    msgEventListener.Subscribe(eventSource);
                    //MQEnvironment.CCSID = 1381;
    
                    //自动重连
                    properties.Add(MQC.CONNECT_OPTIONS_PROPERTY, MQC.MQCNO_RECONNECT_Q_MGR);
                    properties.Add(MQC.HOST_NAME_PROPERTY, IBMWMQConfig.CONNECTION_HOST);
                    properties.Add(MQC.PORT_PROPERTY, IBMWMQConfig.CONNECTION_PORT);
                    properties.Add(MQC.CHANNEL_PROPERTY, IBMWMQConfig.CHANNEL);
    
                    if (transportMode == "managed")
                    {
                        properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED);
                    }
                    else if (transportMode == "unmanaged")
                    {
                        properties.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_CLIENT);
                    }
    
                    using (var queueManager = new MQQueueManager(IBMWMQConfig.QUEUE_MGR_NAME, properties))
                    {
                        if (queueManager.IsConnected)
                        {
                            int option4Durable = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;//持久化订阅
                            int option4NotDurable = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING;//非持久化订阅
    
                            string subName = string.Format(IBMWMQConfig.SUBSCRIPTION_TEMPLATE, business);
                            string topicName = string.Format(IBMWMQConfig.TOPIC_TEMPLATE, business);
    
                            try
                            {
                                if (durability == "durable")
                                {
                                    topic = queueManager.AccessTopic(topicName, null, option4Durable, null, subName);
                                }
                                else
                                {
                                    topic = queueManager.AccessTopic(topicName, null, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, option4NotDurable);
                                }
    
                                while (true)
                                {
                                    eventSource.RaiseErroeMsgEvent(business, string.Format("开始尝试获取 {0} 消息...", business));
                                    try
                                    {
                                        MQMessage incoming = new MQMessage()
                                        {
                                            CharacterSet = MQC.CODESET_UTF,
                                            Encoding = MQC.MQENC_NORMAL
                                        };
                                        MQGetMessageOptions gmo = new MQGetMessageOptions();
                                        gmo.WaitInterval = 10 * 1000; //MQC.MQWI_UNLIMITED;
                                        gmo.Options |= MQC.MQGMO_WAIT;
                                        gmo.Options |= MQC.MQGMO_SYNCPOINT;
    
                                        topic.Get(incoming, gmo);
                                        string message = incoming.ReadString(incoming.MessageLength);
    
                                        if (!string.IsNullOrEmpty(message))
                                        {
                                            //引发事件
                                            eventSource.RaiseNewMsgEvent(business, message);
                                        }
                                    }
                                    catch (MQException ex)
                                    {
                                        string code = ex.Reason.ToString();
                                        //引发事件
                                        eventSource.RaiseErroeMsgEvent(business, code);
                                    }
                                }
                            }
                            catch (MQException ex)
                            {
                                string code = ex.Reason.ToString();
                                //引发事件
                                eventSource.RaiseErroeMsgEvent(business, code);
                            }
                        }
                        else
                        {
                            eventSource.RaiseErroeMsgEvent(business, "连接队列管理器失败!");
                        }
                    }
                }
                catch (MQException e)
                {
                    string code = e.Reason.ToString();
                    //引发事件
                    eventSource.RaiseErroeMsgEvent(business, code);
                }
            }

    接下来开始自定义事件:

    定义事件中心:

    public class IBMWMQEventSource
        {
            /// <summary>
            /// 新消息处理委托
            /// </summary>
            /// <param name="business"></param>
            /// <param name="msg"></param>
            public delegate void NewMsgHandler(string business, string msg);
            /// <summary>
            /// 新消息处理事件
            /// </summary>
            public event NewMsgHandler NewMsgEventHandler;
    
            /// <summary>
            /// 错误消息处理委托
            /// </summary>
            /// <param name="errorCode"></param>
            public delegate void ErrorMsgHandler(string business, string errorCode);
            /// <summary>
            /// 错误消息处理事件
            /// </summary>
            public event ErrorMsgHandler ErrorMsgEventHandler;
    
            /// <summary>
            /// 引发新消息处理事件的方法
            /// </summary>
            /// <param name="business"></param>
            /// <param name="msg"></param>
            public void RaiseNewMsgEvent(string business, string msg)
            {
                if (NewMsgEventHandler != null)
                {
                    NewMsgEventHandler.Invoke(business, msg);
                }
            }
    
            /// <summary>
            /// 引发错误消息处理事件的方法
            /// </summary>
            /// <param name="business"></param>
            /// <param name="errorCode"></param>
            public void RaiseErroeMsgEvent(string business, string errorCode)
            {
                if (ErrorMsgEventHandler != null)
                {
                    ErrorMsgEventHandler.Invoke(business, errorCode);
                }
            }
        }

    定义事件监听器:

    public class IBMWMQMsgEventListener
        {
            private readonly IIBMWMQMsgHandler _msgHandler;
    
            public IBMWMQMsgEventListener(IIBMWMQMsgHandler msgHandler)
            {
                _msgHandler = msgHandler;
            }
    
            /// <summary>
            /// 订阅事件
            /// </summary>
            public void Subscribe(IBMWMQEventSource eventSource)
            {
                eventSource.NewMsgEventHandler += _msgHandler.OnNewMsgHandler;
                eventSource.ErrorMsgEventHandler += _msgHandler.OnErrorMsgHandler;
            }
    
            /// <summary>
            /// 取消订阅事件
            /// </summary>
            public void UnSubscribe(IBMWMQEventSource eventSource)
            {
                eventSource.NewMsgEventHandler -= _msgHandler.OnNewMsgHandler;
                eventSource.ErrorMsgEventHandler -= _msgHandler.OnErrorMsgHandler;
            }
        }

    定义消息处理接口:

    public interface IIBMWMQMsgHandler
        {
            /// <summary>
            /// 处理新消息
            /// </summary>
            /// <param name="business">业务代码</param>
            /// <param name="msg">消息包</param>
            void OnNewMsgHandler(string business, string msg);
    
            /// <summary>
            /// 处理错误消息
            /// </summary>
            /// <param name="business">业务代码</param>
            /// <param name="errorCode">错误码</param>
            void OnErrorMsgHandler(string business, string errorCode);
        }

    默认消息处理机制:

    public class DefaultIBMWMQMsgHandler : IIBMWMQMsgHandler
        {
            /// <summary>
            /// 处理新消息
            /// </summary>
            /// <param name="business">业务代码</param>
            /// <param name="msg">消息包</param>
            public void OnNewMsgHandler(string business, string msg)
            {
                Trace.WriteLine(string.Format("新消息到达,数据包:{0}", msg));
            }
    
            /// <summary>
            /// 处理错误消息
            /// </summary>
            /// <param name="business">业务代码</param>
            /// <param name="errorCode">错误码</param>
            public void OnErrorMsgHandler(string business, string errorCode)
            {
                Trace.WriteLine(string.Format("处理错误消息,错误码:{0}", errorCode));
            }
        }

    定义消息处理方法:

    public class CustomIBMWMQMsgHandler : BaseJobManager, IIBMWMQMsgHandler
        {
            /// <summary>
            /// 消息自定义业务处理
            /// </summary>
            private static CustomBusinessHandler customBusinessHandler = new CustomBusinessHandler();
    
            /// <summary>
            /// 处理新消息
            /// </summary>
            /// <param name="business"></param>
            /// <param name="msg"></param>
            public void OnNewMsgHandler(string business, string msg)
            {
                //获取配置文件
                List<JobConfigEntity> configs = InitJobConfig();
                string businessName = configs.First(c => c.JobName.Replace("_", "").Contains(business)).Name;
    
                switch (business)
                {
                    case IBMWMQConfig.BUSINESS_NAME_ZDFZ09:
                        msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_ZDF_Z09);
                        customBusinessHandler.DO_ZDF_Z09(business, msg);
                        break;
                    case IBMWMQConfig.BUSINESS_NAME_ZAPZ10:
                        msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_ZAP_Z10);
                        customBusinessHandler.DO_ZAP_Z10(business, msg);
                        break;
                    case IBMWMQConfig.BUSINESS_NAME_OULR24:
                        msg = customBusinessHandler.RemoveMsgHeader(msg, IBMWMQConfig.BUSINESS_NAME_OUL_R24);
                        customBusinessHandler.DO_OUL_R24(business, msg);
                        break;
                }
    
                this.WriteInfo(this.GetType(), string.Format("收到来自{0}的消息,数据包:{1}", businessName, msg));
            }
    
            /// <summary>
            /// 处理错误消息
            /// </summary>
            /// <param name="business"></param>
            /// <param name="errorCode"></param>
            public void OnErrorMsgHandler(string business, string errorCode)
            {
                if (!string.IsNullOrEmpty(errorCode))
                {
                    //获取配置文件
                    List<JobConfigEntity> configs = InitJobConfig();
                    string businessName = configs.First(c => c.JobName.Replace("_", "").Contains(business)).Name;
    
                    //TODO 其他消息内容校验
                    if (errorCode.Equals("2033"))
                    {
                        this.WriteInfo(this.GetType(), string.Format("MQRC_NO_MSG_AVAILABLE.{0} 无消息({1})。", businessName, errorCode));
                    }
                    else if (errorCode.Equals("2085"))
                    {
                        this.WriteInfo(this.GetType(), string.Format("MQRC_UNKNOWN_OBJECT_NAME.{0} 主题不存在({1})。", businessName, errorCode));
                    }
                    else if (errorCode.Equals("2429"))
                    {
                        this.WriteInfo(this.GetType(), string.Format("MQRC_SUBSCRIPTION_IN_USE.{0} 主题已被订阅({1})。", businessName, errorCode));
                    }
                    else if (errorCode.Equals("2537"))
                    {
                        this.WriteInfo(this.GetType(), string.Format("MQRC_CHANNEL_NOT_AVAILABLE.频道不可用({0})。", errorCode));
                    }
                    else if (errorCode.Equals("2538"))
                    {
                        this.WriteInfo(this.GetType(), string.Format("MQRC_HOST_NOT_AVAILABLE.无法连接消息队列主机({0})。", errorCode));
                    }
                    else if (errorCode.Equals("2539"))
                    {
                        this.WriteInfo(this.GetType(), string.Format("MQRC_CHANNEL_CONFIG_ERROR.频道配置错误({0})。", errorCode));
                    }
                    else if (errorCode.Equals("2540"))
                    {
                        this.WriteInfo(this.GetType(), string.Format("MQRC_UNKNOWN_CHANNEL_NAME.未知频道称({0})。", errorCode));
                    }
                    else
                    {
                        if (errorCode.Length == 4)
                        {
                            this.WriteInfo(this.GetType(), string.Format("未知错误消息,错误原因编码:{0}。", errorCode));
                        }
                        else
                        {
                            this.WriteInfo(this.GetType(), string.Format("{0} {1}", businessName, errorCode));
                        }
                    }
                }
            }
        }

    去掉消息头:

    /// <summary>
            /// 去掉消息头
            /// </summary>
            /// <param name="msg"></param>
            /// <param name="name"></param>
            /// <returns></returns>
            public string RemoveMsgHeader(string msg, string name)
            {
                msg = msg.Trim();
    
                //去掉消息头
                int index = msg.IndexOf("<" + name, StringComparison.Ordinal);
                if (index > 0)
                {
                    string temp = msg.Substring(0, index);
                    msg = msg.Substring(index, msg.Length - temp.Length);
                    msg = msg.Trim();
                }
    
                return msg;
            }

    配置信息:

    public class IBMWMQConfig
        {
            /// <summary>
            /// MQ主机地址
            /// </summary>
            private static readonly string CONNECTION_HOST = ConfigHelper.GetValue("IBM_WMQ_CONNECTION_HOST");
            /// <summary>
            /// 通讯端口
            /// </summary>
            private const int CONNECTION_PORT = 4421;
            /// <summary>
            /// CLIENT_ID
            /// </summary>
            private const string CLIENT_ID = "";
            /// <summary>
            /// 通道名称
            /// </summary>
            public const string CHANNEL = "";
            /// <summary>
            /// 队列管理器名称
            /// </summary>
            public const string QUEUE_MGR_NAME = "";
            /// <summary>
            /// 订阅主题持久化标识,{0}标识具体业务
            /// </summary>
            public static readonly string SUBSCRIPTION_TEMPLATE = "JMS:" + QUEUE_MGR_NAME + ":" + CLIENT_ID + "_{0}.REQ:{0}.REQ";
            /// <summary>
            /// 主题名称模板,{0}标识具体业务
            /// </summary>
            public static readonly string TOPIC_TEMPLATE = "{0}.REQ";
            /// <summary>
            /// IBM.WMQ连接字符串
            /// </summary>
            public static readonly string CONNECTION_INFO = string.Format("{0}({1})", CONNECTION_HOST, CONNECTION_PORT);
            
        }

    调用:

    IBMWMQHelper helper = new IBMWMQHelper(new CustomIBMWMQMsgHandler());
    helper.SubTopic1(IBMWMQConfig.BUSINESS_NAME_ZAPZ10);
  • 相关阅读:
    HDU1496 Equations 卡时间第二题
    HDU3833 YY's new problem 卡时间第一题
    hiho1601最大分数 DP
    密码脱落
    hihoCoder
    王坚十年前的坚持,才有了今天世界顶级大数据计算平台MaxCompute
    SaaS加速器 I 商业中心:提供商业助力 共享商业成功
    发布SaaS加速器:我们不做SaaS,我们只做SaaS生态的推进者和守护者
    MaxCompute SQL 使用正则表达式选列
    MaxCompute如何对SQL查询结果实现分页获取
  • 原文地址:https://www.cnblogs.com/zhao-yi/p/10616580.html
Copyright © 2011-2022 走看看