zoukankan      html  css  js  c++  java
  • 2.RABBITMQ 入门

    关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置

    公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个  短信的winform的服务。那么到后期的话,登录服务器之后,全是

    一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端

    该案例的接收端,源自网上的代码片段 片内容,做了部分修改之后使用

     

    日志中心的 功能要求使用注入解耦,所以,这里我也解耦了,如果日至那边使用的是 autofac,我只里使用的MEF实现注入 所以定义了相关的接口对象

    IMQContextFactory:

    using Ecostar.MQLogger.Core.Infrastructure;
    using System;
    
    namespace Ecostar.MQConsumer.Core.Infrastructure
    {
        /// <summary>
        ///     仅仅只有sender使用到
        /// </summary>
        public interface IMQContextFactory
        {
            MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog);
        }
    }
    View Code

    对应的实现类:MQContextFactory

    using Ecostar.MQLogger.Core.Infrastructure;
    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.ComponentModel.Composition;
    using System.Security.Cryptography;
    
    namespace Ecostar.MQConsumer.Core.Infrastructure
    {
        /// <summary>
        ///     仅仅只有sender使用到
        /// </summary>
        [Export(typeof(IMQContextFactory))]
        public class MQContextFactory : IMQContextFactory
        {
            /// <summary>
            /// 上下文字典
            /// </summary>
            private static readonly Dictionary<string, MQContext> Contexts = new Dictionary<string, MQContext>();
    
            /// <summary>
            /// 上下文操作锁字典,只创建一次
            /// </summary>
            public static readonly Dictionary<string, object> contextLockers = new Dictionary<string, object>();
    
            /// <summary>
            /// 更新上下文操作锁字典时的锁,只创建一次
            /// </summary>
            private static readonly object contextLockersLocker = new object();
    
            /// <summary>
            /// 获取指定的上下文
            /// </summary>
            /// <param name="mqUri">mq地址</param>
            /// <param name="toLog">日志记录</param>
            /// <returns>上下文对象</returns>
            public MQContext CreateContext(string mqUri, Action<string, LogLevel> toLog)
            {
                var key = MD5Encrypt(mqUri);
                var locker = GetFactoryLocker(key);
    
                lock (locker)
                {
                    MQContext context;
                    if (!Contexts.TryGetValue(key, out context))
                    {
                        Guid contextId = Guid.NewGuid();
                        string logHeader = string.Format("[{0}]", contextId.ToString());
    
                        context = new MQContext()
                        {
                            ReceiveQueueName = "Logs",
                            Id = contextId
                        };
                        Console.WriteLine(logHeader + "   初始化发送上下文完毕");
    
                        // 获取连接
                        context.SendConnection = CreateConnection(mqUri);
                        context.SendConnection.AutoClose = false;
                        context.SendConnection.ConnectionShutdown += (o, e) => Console.WriteLine("   RabbitMQ错误,连接被关闭了:" + e.ReplyText);
                        Console.WriteLine(logHeader + "   创建连接完毕", LogLevel.Trace);
    
                        // 获取通道
                        context.SendChannel = CreateChannel(context.SendConnection);
                        Console.WriteLine(logHeader + "   创建通道完毕", LogLevel.Trace);
    
                        Contexts.Add(key, context);
    
                    }
    
                    return context;
                }
            }
    
            #region 私有方法
            /// 创建连接
            /// </summary>
            /// <param name="mqUrl"></param>
            /// <returns></returns>
            private static IConnection CreateConnection(string mqUrl)
            {
                const ushort heartbeta = 120;
    
                var factory = new ConnectionFactory()
                {
                    Uri = mqUrl,
                    RequestedHeartbeat = heartbeta,
                    AutomaticRecoveryEnabled = true
                };
    
                return factory.CreateConnection();
            }
    
            /// <summary>
            /// 创建通道
            /// </summary>
            /// <param name="connection"></param>
            /// <returns></returns>
            private static IModel CreateChannel(IConnection connection)
            {
                if (connection != null)
                    return connection.CreateModel();
                return null;
            }
    
    
            /// <summary>
            /// 获取上下文操作锁
            /// </summary>
            /// <param name="contextKey">上下文工厂key</param>
            /// <returns></returns>
            private static object GetFactoryLocker(string contextKey)
            {
                lock (contextLockersLocker)
                {
                    object locker;
                    if (!contextLockers.TryGetValue(contextKey, out locker))
                    {
                        locker = new object();
                        contextLockers.Add(contextKey, locker);
                    }
    
                    return locker;
                }
            }
    
            /// <summary>
            /// 获取字符的MD5值
            /// </summary>
            /// <param name="str"></param>
            /// <returns></returns>
            private static string MD5Encrypt(string str)
            {
                MD5 md5 = new MD5CryptoServiceProvider();
                byte[] result = md5.ComputeHash(System.Text.Encoding.Default.GetBytes(str));
                return System.Text.Encoding.Default.GetString(result);
            }
            #endregion
    
    
        }
    }
    View Code

    注视我写的很明白,这部分的使用 是  生产者使用的类,也就是  发送消息


    下面是消费者:

    IReceiver.cs:

    namespace Ecostar.MQConsumer.Core
    {
        public interface IReceiver
        {
            /// <summary>
            ///     初始化接收程序
            /// </summary>
            /// <param name="mqUrls"></param>
            void InitialReceive(MQReceiverParam receiverParams);
    
    
        }
    }
    View Code

    对应的实现类:

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.ComponentModel.Composition;
    using System.Threading;
    
    namespace Ecostar.MQConsumer.Core
    {
        [Export(typeof(IReceiver))]
        public class Receiver : IReceiver
        {
            private MQContext _context;
            private const ushort Heartbeta = 60;
            private string _queueName;
            private bool _isAutoAck;
            private List<string> _mqUrls;
            private Func<byte[], bool> _processFunction;
            private Action<string> _mqActionLogFunc;
            private MQConnectionFactory _ConnectionFactoryParams;
    
       
            public void InitialReceive(MQReceiverParam receiverParams)
            {
                _queueName       = receiverParams._queueName;
                _isAutoAck       = receiverParams._isAutoAck;
                _mqUrls          = receiverParams._mqUrls;
                _processFunction = receiverParams._processFunction;
                _mqActionLogFunc = receiverParams._mqActionLogFunc;
                _ConnectionFactoryParams = receiverParams.ConnectionFactoryParam;
                receiverParams._mqUrls.ForEach(url => InitReceive(_queueName, _isAutoAck, url));
    
            }
    
            /// <summary>
            /// 初始化某个节点的接收
            /// </summary>
            private void InitReceive(string queueName, bool isAutoAck, string mqUrl)
            {
                Guid contextId = Guid.NewGuid();
                string logHeader = string.Format("[{0}, {1}]", queueName, contextId.ToString());
                try
                {
                    _context = new MQContext()
                    {
                        Id = contextId,
                        ReceiveQueueName = queueName,
                        IsAutoAck = isAutoAck,
                        ReceiveConnection = new ConnectionFactory()
                        {
                            HostName = _ConnectionFactoryParams.HostName,
                            UserName = _ConnectionFactoryParams.UserName,
                            Password = _ConnectionFactoryParams.Password,
                            VirtualHost = _ConnectionFactoryParams.VirtualHost
                        }.CreateConnection()
                    };
    
                    // 监听Shutdown事件,记录下LOG便于排查和监管服务的稳定性
                    _context.ReceiveConnection.ConnectionShutdown += (o, e) =>
                    {
                        _mqActionLogFunc("   RabbitMQ错误,连接被关闭了:" + e.ReplyText);
                    };
                    // 获取通道
                    _context.ReceiveChannel = _context.ReceiveConnection?.CreateModel();
    
                    // 创建事件驱动的消费者
                    var consumer = new EventingBasicConsumer(_context.ReceiveChannel);
                    consumer.Received += (o, e) =>
                    {
                        try
                        {
                            // 接受数据处理逻辑
                            // e.Body
                            var result = _processFunction(e.Body);
    
                            if (!isAutoAck)
                            {
                                if (!result)
                                {
                                    Thread.Sleep(300);
    
                                    // 未能处理完成的话,将消息重新放入队列头
                                    _context.ReceiveChannel.BasicReject(e.DeliveryTag, true);
                                    _mqActionLogFunc("   消息未处理成功,将消息重新放入队列头");
                                }
                                else if (!_context.ReceiveChannel.IsClosed)
                                {
                                    // 处理成功并且通道未关闭时ack回去,删除队列中的消息
                                    _context.ReceiveChannel.BasicAck(e.DeliveryTag, false);
                                    _mqActionLogFunc("   消息处理成功,发送Ack完毕");
                                }
                            }
                        }
                        catch (Exception ex)
                        {
                            Thread.Sleep(300);
                            if (!isAutoAck)
                            {
                                // 将消息重新放入队列头
                                _context.ReceiveChannel.BasicReject(e.DeliveryTag, true);
                            }
                            _mqActionLogFunc("   处理数据发生异常:" + ex.Message + ex.StackTrace);
                        }
                    };
    
                    // 一次只获取一条消息
                    _context.ReceiveChannel.BasicQos(0, 1, false);
                    _context.ReceiveChannel.BasicConsume(_context.ReceiveQueueName, _context.IsAutoAck, consumer);
    
                    _mqActionLogFunc("   初始化队列完毕");
                }
                catch (Exception ex)
                {
                    _mqActionLogFunc("   初始化RabbitMQ出错:" + ex.Message + ex.StackTrace);
                }
            }
    
        }
    }
    View Code

    使用到的参数 MQReceiverParam:

    using System;
    using System.Collections.Generic;
    
    namespace Ecostar.MQConsumer.Core
    {
        /// <summary>
        ///     消费者入参
        /// </summary>
        public class MQReceiverParam
        {
            public  string _queueName { get; set; }
            public  bool _isAutoAck { get; set; }
            public  List<string> _mqUrls { get; set; }
            public  Func<byte[], bool> _processFunction { get; set; }
            public  Action<string> _mqActionLogFunc { get; set; }
            public MQConnectionFactory ConnectionFactoryParam { get; set; }
    
        }
    
        /// <summary>
        ///     服务配置
        /// </summary>
        public class MQConnectionFactory
        {
            public string HostName     {get;set;}
            public string UserName     {get;set;}
            public string Password     {get;set;}
            public string VirtualHost  {get;set;}
        }
    
    }
    View Code

    重力要说明一下:

    Func<byte[], bool> _processFunction { get; set; }
    Action<string> _mqActionLogFunc { get; set; }

    参数的对象中,有这么两个委托,原因是,如果你在学习 rabbitmq得这块内容的时候,你会发现,网上很多案例,以及官方提供的案例,写法都比较简单,而且,都是讲业务逻辑和   rabbitmq的消费的这跨功能 耦合到了一起

    如果其他地方使用的时候,还是重复,创建  connection   创建queue,绑定,,,,,等相关动作,代码不仅不美观,而且显得繁琐,啰嗦,所以,这两个委托类型的参数,起到了接偶的作用,似的 具体的业务逻辑和 rabbitmq的消费逻辑 分离

    使用如下:

    (我是在窗体上直接放置了一个  richTextBox的控件,讲接收的信息打印出来,)

    using Ecostar.MQConsumer.Core;
    using System;
    using System.Collections.Generic;
    using System.ComponentModel.Composition;
    using System.ComponentModel.Composition.Hosting;
    using System.IO;
    using System.Reflection;
    using System.Text;
    using System.Windows.Forms;
    
    namespace Ecostar.MQConsumer.UI
    {
        [Export]
        public partial class MQMainForm : Form
        {
    
            #region Fields
            private static CompositionContainer _container;//MEF 部件组合 管理
            [Import]
            public IReceiver Receiver { get; set; }
    
            #endregion
    
            public MQMainForm()
            {
                InitializeComponent();
            }
    
            private void MQMainForm_Load(object sender, EventArgs e)
            {
                InitForm();
                InitialListener();
            }
    
    
            public void InitForm()
            {
                AggregateCatalog catalog = new AggregateCatalog();
                catalog.Catalogs.Add(new DirectoryCatalog(Directory.GetCurrentDirectory()));
                catalog.Catalogs.Add(new AssemblyCatalog(Assembly.GetExecutingAssembly()));
                _container = new CompositionContainer(catalog);
            }
    
            /// <summary>
            ///     初始化监听程序
            /// </summary>
            void InitialListener()
            {
                MQMainForm form;
                try
                {
                    form = _container.GetExportedValue<MQMainForm>();
                }
                catch (Exception ex)
                {
    
                    throw;
                }
                form.Receiver.InitialReceive(new MQReceiverParam()
                {
                    _queueName = "testQueueName",
                    _isAutoAck = false,
                    _mqUrls = new List<string>() { "amqp://127.0.0.1:5672/" },
                    _processFunction = (buffer) =>
                    {
                        string receiveMsg = Encoding.UTF8.GetString(buffer);
                        this.rtb_receive.Invoke(new Action(() => { { this.rtb_receive.Text += receiveMsg + "
    "; } }));
                        return true;
    
                    },
                    _mqActionLogFunc = (msg) =>
                    {
                        this.rtb_receive.Invoke(new Action(() =>
                        {
                            this.rtb_receive.Text += "====MQ Action====" + msg + "
    ";
                        }));
                    },
                    ConnectionFactoryParam = new MQConnectionFactory()
                    {
                        HostName = "127.0.0.1",
                        UserName = "CC",
                        Password = "123qwe",
                        VirtualHost = "/"
                    }
                });
            }
    
    
        }
    }
    View Code

     其中的 testQueueName,是客户端发送的 消息列队名称,也就是queue的名称,你也可以(如果是测试),在mq服务器上 人为的添加这个queue名称之后再测试。

    这样一来,_processFunction 这个用于消费的方法,可以,写任意的处理方式,比如打印到控制台,输出到床体 控件显示,写入到日志,写入到数据库等等。

    而且中的 _mqActionLogFunc,适用于记录mq的消费过程的日志,比如 mq消费操作执行过程中发生异常 ,那么直接找mq的问题即可。


    截图中还一个:MQContext类,这是一个部分类,为了方便区分,我把消费者,生产者  公共部分分别放置到了三个部分类中:

    MQContext.Consumer.cs

    using RabbitMQ.Client;
    
    namespace Ecostar.MQConsumer.Core
    {
        /// <summary>
        ///        MQ 消费者
        /// </summary>
        public partial class MQContext
        {
            // <summary>
            /// 用户监听的Connection
            /// </summary>
            public IConnection ReceiveConnection { get; set; }
    
            /// <summary>
            /// 用于监听的Channel
            /// </summary>
            public IModel ReceiveChannel { get; set; }
    
            /// <summary>
            /// 监听队列名
            /// </summary>
            public string ReceiveQueueName { get; set; }
        }
    }
    View Code

    MQContext.cs

    namespace Ecostar.MQConsumer.Core
    {
        /// <summary>
        ///     MQ 生产者消费者公共部分
        /// </summary>
        public partial class MQContext
        {
            /// <summary>
            /// mq地址
            /// </summary>
            public string MQUrl { get; set; }
    
        }
    }
    View Code

    MQContext.Producer.cs

    using RabbitMQ.Client;
    using System;
    
    namespace Ecostar.MQConsumer.Core
    {
        /// <summary>
        ///        MQ 生产者
        /// </summary>
        public partial class MQContext
        {
            /// <summary>
            /// 用于发送消息的Connection
            /// </summary>
            public IConnection SendConnection { get; set; }
    
            /// <summary>
            /// 用于发送消息到Channel
            /// </summary>
            public IModel SendChannel { get; set; }
    
            /// <summary>
            /// 发送的Exchange
            /// </summary>
            public string Exchange { get; set; }
    
            /// <summary>
            /// 是否启用自动删除
            /// </summary>
            public bool IsAutoAck { get; set; }
            /// <summary>
            /// 上下文ID
            /// </summary>
            public Guid Id { get; set; }
    
            /// <summary>
            /// 路由
            /// </summary>
            public string RouteKey { get; set; }
            /// <summary>
            /// 是否正在运行,默认false
            /// </summary>
            public bool IsRunning { get; set; }
    
            /// <summary>
            /// 回收此上下文
            /// </summary>
            public void Recovery()
            {
                IsRunning = false;
            }
        }
    }
    View Code

    到此,这个简单的消费案例就完成了。


    下面的是 生产者,(发送消息的案例),为了造数据,所以写的随意些:(一个控制台程序),nuget引入  rabbitmq.client ,指令: install-package rabbitmq.Client

    using RabbitMQ.Client;
    using System;
    using System.Text;
    
    namespace RubbitMQClient
    {
        /// <summary>
        ///     1.Routing (按路线发送接收)
        /// </summary>
        public class RoutingType
        {
            public static void RoutingProducer(string[] arguments)
            {
                arguments = new string[] { "0","" };
    
    
                string serverAddress = "127.0.0.1";
                string account = "CC";
                string password = "123qwe";
    
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = serverAddress,
                    UserName = account,
                    Password = password,
                    VirtualHost = "/"
                };
                IConnection conn = factory.CreateConnection();
                for (int i = 0; i < 1000; i++)
                {
                    arguments[1] = i.ToString();
                    string queueName = "testQueueName";
    
    
                    using (var channel = conn.CreateModel())
                    {
                        //---1.声明durable Exchange 和 Queue--------------------------------------------------------------------------------------------------------------
                        channel.ExchangeDeclare(Consts.EXCHANGE_NAME_DIRECT, "direct", durable: true, autoDelete: false, arguments: null);
                        //arguments = new[] { "12321", "32432" };
                        channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                        channel.QueueBind(queueName, Consts.EXCHANGE_NAME_DIRECT, routingKey: "");//queueName
    
                        //----------------------------------------------------------------------------------------------------------------------
    
                        //---2.发布持久化消息到队列 ---------------------------------------------------------------------------------------------------
                        var props = channel.CreateBasicProperties();
                        //props.Priority = 3;//控制优先级
                        props.DeliveryMode = 2;//将信息也持久化
                        props.Persistent = true;///SetPersistent方式提示已经过时,建议使用当前方式
                        string severity = getSeverity(arguments);
                        string message = getMessage(arguments);
                        byte[] buffer = Encoding.UTF8.GetBytes(message);
    
                        channel.BasicPublish(Consts.EXCHANGE_NAME_DIRECT, routingKey: "", basicProperties: props, body: buffer);
    
                        ////---消费消息
                        //BasicGetResult msgResponse = channel.BasicGet(queueName, noAck: true);
    
                        //var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                        //Console.WriteLine(msgBody);
    
    
                        //3.1(发布方式还有一种 基于推送的事件订阅 )第二种方式(使用内置的 QueueingBasicConsumer 提供简化的编程模型,通过允许您在共享队列上阻塞,直到收到一条消息)
                        //var consumer = new QueueingBasicConsumer(channel);
                        //channel.BasicConsume(queueName, noAck: true, consumer: consumer);
                        //var msgResponse = consumer.Queue.Dequeue(); //blocking
                        //var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
    
                    }
    
                }
                conn.Close();
                Console.ReadKey();
            }
    
    
    
    
            private static String getSeverity(String[] strings)
            {
                if (strings.Length < 1)
                    return "routing(direct) type info";
                return strings[0];
            }
    
            private static String getMessage(String[] strings)
            {
                if (strings.Length < 2)
                    return "routing(direct) --> Hello World!";
                return joinStrings(strings, " ", 1);
            }
    
            private static String joinStrings(String[] strings, String delimiter, int startIndex)
            {
                return strings[1].ToString();
            }
    
    
    
    
    
        }
    }
    View Code

    抽时间将上面涉及到的   mq一些相关  属性(常用的API的),在总结下,主要是零散,其实东西很简单,如何更好的,更灵活的组合到一起,是这个插件使用的 最主要一点。

  • 相关阅读:
    基于thinkphp3.2.3开发的CMS内容管理系统(二)- Rbac用户权限
    phpstrom 快捷键
    基于thinkphp3.2.3开发的CMS内容管理系统
    html中的字幕滚动marquee属性
    学会这些网站优化技巧,秒变seo专家
    服务器设置防火墙规则,实现远程桌面连接的ip限制
    IIS7.5中神秘的ApplicationPoolIdentity
    mysql 安装成功后如何设置密码?
    网站优化提高加载速度的14个技巧
    解决帝国cms系统后台管理员登录密码输入五次密码错误后需等候60分钟的方法
  • 原文地址:https://www.cnblogs.com/Tmc-Blog/p/5433253.html
Copyright © 2011-2022 走看看