zoukankan      html  css  js  c++  java
  • 消息处理代码

    消息处理封装的代码:

    public class Transceiver
    {
    public IPAddress MQServerIP { get; set; }
    public UInt16 MQServerPort { get; set; }
    public String MQUserName { get; set; }
    public String MQPassword { get; set; }
    public String MQTopic { get; set; }
    public Boolean IsOnline { get { return Online; } }
    /// <summary>
    /// 消息持久时间(秒)
    /// </summary>
    public Int32 TTL { get; set; }

    private IConnectionFactory factory;
    private IConnection connection;
    private ISession session;
    private IMessageProducer producer;
    private IMessageConsumer consumer;
    private Boolean Online;

    public delegate void MessageReceiveHandle(MQMessage massage);
    public event MessageReceiveHandle MessageReceived;
    public delegate void ConnectionEventHandle(Boolean Online);
    public event ConnectionEventHandle ConnectionEvent;
    public delegate void ConnectionErrorHandle(Exception error);
    public event ConnectionErrorHandle ConnectionError;

    public Transceiver(IPAddress ServerIP, UInt16 ServerPort, String UserName, String Password, String Topic, Int32 ttl)
    {
    MQServerIP = ServerIP;
    MQServerPort = ServerPort;
    MQUserName = UserName;
    MQPassword = Password;
    MQTopic = Topic;
    TTL = ttl;

    producer = null;
    factory = null;
    connection = null;
    session = null;
    Online = false;
    }

    ~Transceiver()
    {
    Close();
    }

    public Boolean Open()
    {
    if (IsOnline) { return IsOnline; }
    try
    {
    String URI = String.Format("tcp://{0}:{1}", MQServerIP, MQServerPort);
    factory = new ConnectionFactory(URI);
    if (MQUserName != "")
    {
    connection = factory.CreateConnection(MQUserName, MQPassword);
    }
    else
    {
    connection = factory.CreateConnection();
    }
    connection.ConnectionInterruptedListener += connection_ConnectionInterruptedListener;
    connection.ConnectionResumedListener += connection_ConnectionResumedListener;
    connection.ExceptionListener += connection_ExceptionListener;
    connection.Start();
    session = connection.CreateSession();
    //CreateProducer
    producer = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(MQTopic));
    //CreateConsumer
    consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(MQTopic));
    //Regist event
    consumer.Listener += consumer_Listener;
    Online = true;
    }
    catch
    {
    Online = false;
    }
    return Online;
    }

    void connection_ExceptionListener(Exception exception)
    {
    ConnectionError(exception);
    Close();
    Thread thread = new Thread(new ThreadStart(ResumConncet));
    thread.IsBackground = true;
    thread.Start();
    }

    private void ResumConncet()
    {
    while (!IsOnline)
    {
    String URI = String.Format("tcp://{0}:{1}", MQServerIP, MQServerPort);
    factory = new ConnectionFactory(URI);
    try
    {
    if (MQUserName != "")
    {
    connection = factory.CreateConnection(MQUserName, MQPassword);
    }
    else
    {
    connection = factory.CreateConnection();
    }
    connection.ConnectionInterruptedListener += connection_ConnectionInterruptedListener;
    connection.ConnectionResumedListener += connection_ConnectionResumedListener;
    connection.ExceptionListener += connection_ExceptionListener;
    connection.Start();
    Online = true;
    ConnectionEvent(Online);
    session = connection.CreateSession();
    //CreateProducer
    producer = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(MQTopic));
    //CreateConsumer
    consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(MQTopic));
    //Regist event
    consumer.Listener += consumer_Listener;
    }
    catch { }
    }
    }

    void connection_ConnectionResumedListener()
    {
    Online = true;
    ConnectionEvent(Online);
    }

    void connection_ConnectionInterruptedListener()
    {
    Online = false;
    ConnectionEvent(Online);
    }

    void consumer_Listener(IMessage message)
    {
    IObjectMessage objmsg = (IObjectMessage)message;
    MQMessage msg = (MQMessage)objmsg.Body;
    MessageReceived(msg);
    }

    public void Close()
    {
    if (producer != null)
    {
    try
    {
    producer.Close();
    producer.Dispose();
    }
    catch { }
    }
    if (consumer != null)
    {
    try
    {
    consumer.Close();
    consumer.Dispose();
    }
    catch { }
    }
    if (session != null)
    {
    try
    {
    session.Close();
    session.Dispose();
    }
    catch { }
    }
    if (connection != null)
    {
    try
    {
    connection.Stop();
    connection.Close();
    connection.Dispose();
    }
    catch { }
    }
    Online = false;
    }

    public void SendMessage(MQMessage massage)
    {
    IObjectMessage msg = producer.CreateObjectMessage(massage);
    producer.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, Apache.NMS.MsgPriority.Normal, TimeSpan.FromSeconds(TTL));
    }
    }

  • 相关阅读:
    HDU4628+状态压缩DP
    Javascript 去掉字符串前后空格的五种方法
    Javascript 数组之判断取值和数组取值
    ASP.NET MVC 出现错误 “The view 'XXX' or its master was not found or no view engine support”
    ASP.NET MVC 页面调整并传递参数
    ASP.NET MV3 部署网站 报"Could not load file or assembly ' System.Web.Helpers “ 错的解决方法
    ASP.NET MVC 控制器向View传值的三种方法
    CSharp 如何通过拼接XML调用存储过程来查询数据
    SQLServer : EXEC和sp_executesql的区别
    关于SQLServer2005的学习笔记—异常捕获及处理
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/3419044.html
Copyright © 2011-2022 走看看