踩过的坑:
坑1
ActiveMq断网之后不会自动重连,需要将连接字符串修改为:failover:(tcp://192.168.0.47:61616)?randomize=false
如果有多个地址可以使用 逗号分隔,例如:failover:(tcp://192.168.0.47:61616,tcp://192.168.0.48:61616)?randomize=false
坑2
ActiveMq 分为 持久化、非持久化
持久化: 程序关闭之后服务器发送消息会存在在消息队列中,再次上线后会将之前消息全部再次接收,
非持久化:程序关闭后,再次上线后这段时间不会接收到历史消息,等于是从新开始
开撸
private void CreateMq() { ActiveMqManager mq = new ActiveMqManager(); mq.Listener += MqListener; mq.CreateTopic("termNotifyTopic"); } private void MqListener(string message) { Console.WriteLine(message); }
需要引用DLL
https://files.cnblogs.com/files/zisai/ActiveMq.rar
ActiveMqManager源码
public class ActiveMqManager { public delegate void ActiveHandle(string message); #region 属性 IConnection _connection; /// <summary> /// 监听接收事件 /// </summary> public event ActiveHandle Listener; /// <summary> /// MQ 连接地址 /// </summary> public string BrokerUri { get; set; } /// <summary> /// 客户端名称标识 /// </summary> public string ClientId { get; set; } /// <summary> /// 消费者 名称 /// </summary> public string Name { get; set; } /// <summary> /// 是否为 持久订阅者 /// </summary> public bool Lasting { get; set; } #endregion public ActiveMqManager() { BrokerUri = AppConfig.MqUrl; ClientId = AppConfig.TermCode; Name = "Akeem"; } public ActiveMqManager(string brokerUri, string clientId, string name) { BrokerUri = brokerUri; ClientId = clientId; Name = name; } /// <summary> /// 创建1对多消费模式 /// </summary> /// <param name="topicName">广播ID</param> public void CreateTopic(string topicName) { if (_connection != null && _connection.IsStarted) { throw new Exception("已经存在一个连接"); } //获取一个connection _connection = GetConnection(); if (_connection == null) { return; } //启动连接,监听的话要主动启动连接 _connection.Start(); //通过连接创建一个会话 ISession session = _connection.CreateSession(); if (Lasting) { //通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置 IMessageConsumer consumer = session.CreateDurableConsumer(new ActiveMQTopic(topicName), Name, null, false); //注册监听事件 consumer.Listener += new MessageListener(consumer_Listener); } else { //创建topic ActiveMQTopic destination = new ActiveMQTopic(topicName); //普通消费者,即非持久订阅者 var consumer = session.CreateConsumer(destination); consumer.Listener += new MessageListener(consumer_Listener); } } /// <summary> /// 创建1对1 消费模式 /// </summary> public void CreateQueue(string queueName, string filter) { if (_connection != null && _connection.IsStarted) { throw new Exception("已经存在一个连接"); } _connection = GetConnection(); if (_connection == null) { return; } //启动连接,监听的话要主动启动连接 _connection.Start(); //通过连接创建一个会话 ISession session = _connection.CreateSession(); //通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置 例如:string.Format("filter='{0}'", this.txtID.Text) IMessageConsumer consumer = session.CreateConsumer(new ActiveMQQueue(queueName), filter); //注册监听事件 consumer.Listener += new MessageListener(consumer_Listener); } private void Connection_ExceptionListener(Exception exception) { } private void Connection_ConnectionResumedListener() { } /// <summary> /// 连接中断监听器 /// </summary> private void Connection_ConnectionInterruptedListener() { } private void consumer_Listener(IMessage message) { ITextMessage msg = (ITextMessage)message; Listener?.Invoke(msg.Text); } /// <summary> /// 创建连接器 /// </summary> /// <returns></returns> private IConnection GetConnection() { try { //创建连接工厂 IConnectionFactory factory = new ConnectionFactory(BrokerUri); //通过工厂构建连接 var connection = factory.CreateConnection(); //这个是连接的客户端名称标识 connection.ClientId = ClientId; //监听连接中断事件 connection.ConnectionInterruptedListener += Connection_ConnectionInterruptedListener; //监听连接恢复事件 connection.ConnectionResumedListener += Connection_ConnectionResumedListener; //异常监听 connection.ExceptionListener += Connection_ExceptionListener; return connection; } catch (Exception ex) { LogHelper.Error("MQ:GetConnection", ex); return null; } } }