zoukankan      html  css  js  c++  java
  • MQ_ActiveMQ环境部署+C#推送和接收消息

    一、          ActiveMQ环境部署

    1. Jdk:jdk-8u91-windows-i586.exe
    2. ActiveMQ:apache-activemq-5.15.0,选择win64,启动服务activemq,要求端口号61616不被占用,启动服务效果如图:
     
    1. 如果安装提示Failed to execute start task,解决方法:停止ICS(运行-->services.msc找到Internet Connection Sharing (ICS)服务,改成手动启动或禁用)
    2. ActiveMQ类库:

        (1)Apache.NMS.dll路径:Apache.NMS.ActiveMQ-1.7.2-binlibApache.NMS et-3.5

        (2)Apache.NMS.ActiveMQ.dll路径:Apache.NMS.ActiveMQ-1.7.2-binuild et-3.5debug

    安装完成,访问地址效果如图:

     
    1. ActiveMQ后台管理地址:http://localhost:8161/admin,默认账号:admin,密码:admin
     

    二、          C#ActiveMQ实现推送接收数据

    1. 添加ActiveMQ类库Apache.NMS.dll、Apache.NMS.ActiveMQ.dll
     
    1. 定义传值参数类:ActiveMQModel,命名空间定义:ActiveMQClient。

    namespace ActiveMQClient

    {

        [Serializable]

        public class ActiveMQModel

        {

            /// <summary>

            /// guid

            /// </summary>

            public string guid { get; set; }

            /// <summary>

            /// 方法名

            /// </summary>

            public string method { get; set; }

            /// <summary>

            /// 接口参数(T转json)

            /// </summary>

            public string json { get; set; }

        }

    }

    1. 初始化ActiveMQ,注册推送事件,定义推送方法。

    using ActiveMQClient;

    using Apache.NMS;

    using Apache.NMS.Util;

    using Newtonsoft.Json;

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

     

    namespace ActiveMQ

    {

        public class ActiveMQHelper

        {

            private static IConnectionFactory connFac;

     

            private static IConnection connection;

            private static ISession session;

            private static IDestination destination;

            private static IMessageProducer producer;

            private static IMessageConsumer consumer;

     

            /// <summary>

            /// 初始化ActiveMQ

            /// </summary>

            public static void initAMQ()

            {

                string strsendTopicName = "A";//推送方topic名

                string strreceiveTopicName = "B";//接受方toptic名

                var url = "localhost:61616";//activemq地址

                var userid = "oa";//帐户

                var pwd = "oa";//密码

                try

                {

                    connFac = new NMSConnectionFactory(new Uri("activemq:failover:(tcp://" + url + ")")); //new NMSConnectionFactory(new Uri("activemq:failover:(tcp://localhost:61616)"));

     

                    //新建连接

                    connection = connFac.CreateConnection(userid, pwd);//connFac.CreateConnection("oa", "oa");//设置连接要用的用户名、密码

     

                    //如果你要持久“订阅”,则需要设置ClientId,这样程序运行当中被停止,恢复运行时,能拿到没接收到的消息!               

                    connection.ClientId = "ClientId_" + strsendTopicName;

                    //connection = connFac.CreateConnection();//如果你是缺省方式启动Active MQ服务,则不需填用户名、密码

     

                    //创建Session

                    session = connection.CreateSession();

     

                    //发布/订阅模式,适合一对多的情况

                    destination = SessionUtil.GetDestination(session, "topic://" + strreceiveTopicName);

     

                    //新建生产者对象

                    producer = session.CreateProducer(destination);

                    producer.DeliveryMode = MsgDeliveryMode.Persistent;//ActiveMQ服务器停止工作后,消息不再保留

     

                    //新建消费者对象:普通“订阅”模式

                    //consumer = session.CreateConsumer(destination);//不需要持久“订阅”      

     

                    //新建消费者对象:持久"订阅"模式:

                    //    持久“订阅”后,如果你的程序被停止工作后,恢复运行,

                    //从第一次持久订阅开始,没收到的消息还可以继续收

                    consumer = session.CreateDurableConsumer(

                        session.GetTopic(strsendTopicName)

                        , connection.ClientId, null, false);

     

                    //设置消息接收事件

                    consumer.Listener += new MessageListener(OnMessage);

     

                    //启动来自Active MQ的消息侦听

                    connection.Start();

                }

                catch (Exception e)

                {

                    SysErrorLog.SaveErrorInfo(e, "初始化ActiveMQ失败");

                }

            }

     

            /// <summary>

            /// 推送ActiveMQ

            /// </summary>

            /// <param name="guid"></param>

            /// <param name="t"></param>

            /// <param name="method"></param>

            public static void Send(string guid, object t, string method)

            {

                if (producer == null)

                {

                    initAMQ();

                }

                if (session == null)

                {

                    throw new Exception("请初始化ActiveMQ!");

                }

                if (producer == null)

                {

                    throw new Exception("请初始化ActiveMQ!");

                }

                var model = new ActiveMQModel();

                model.guid = guid;

                model.method = method;

                model.json = JsonConvert.SerializeObject(t);

                var i = session.CreateObjectMessage(model);

                producer.Send(i);

            }

     

            /// <summary>

            /// 接收ActiveMQ消息

            /// </summary>

            /// <param name="receivedMsg"></param>

            protected static void OnMessage(IMessage receivedMsg)

            {

                if (receivedMsg is IObjectMessage)

                {

                    var message = receivedMsg as IObjectMessage;

                    if (message.Body is ActiveMQModel)

                    {

                        SysErrorLog.SaveErrorInfo("ActiveMQModel=" + JsonConvert.SerializeObject(message.Body));

                    }

                }

            }

        }

    }

    三、          C#推送ActiveMQ,以更新机构商品库存为例:

    推送代码:

                    var model = new

                    {

                        ShopId = ShopId,//门店编码

                        proNum = newKuc,//库存

                        skuNo = skuno,//sku

                    };

                    var guid=Guid.NewGuid().ToString();

                    var method = "updatestoreproductkuc";

                    var lst = new List<object>();

                    lst.Add(model);

                    ActiveMQHelper.Send(guid, lst, method);

    接收报文格式如下

     

    1

    ActiveMQ站点账号密码设置

    https://www.cnblogs.com/MIC2016/p/6196789.html

     

     

    2

    ActiveMQ的访问密码

    http://blog.csdn.net/zbw18297786698/article/details/52994612

     

     

  • 相关阅读:
    Flask 服务器设置host=0.0.0.0之后外部仍然无法访问
    HTB::Sauna
    VulnHub::DC-4
    【CTFHub 技能树】RCE
    【CTFHub 技能树】反射型XSS
    VulnHub::DC-3
    HashMap中add()方法的源码学习
    equals和HashCode深入理解(转)
    AQS原理分析
    初步认识线程安全性
  • 原文地址:https://www.cnblogs.com/a735882640/p/7625771.html
Copyright © 2011-2022 走看看