基础拾遗
前言
消息队列,在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说双十一很多人进行下单,购买物品这是对于数据的操作是非常之大的,不管是是insert还是update是不是都有及时操作数据库,那么就有可能造成数据库思索移除什么堆积阻塞。那么我们这时是不是加入异步,nosql是不是能减轻其压力,那么这中间剑气的桥梁就是mq了,当然她的使用场景有很多,我们接下来把社么是消息队列了解清楚它是怎么一回事之后,希望大家能在自己的项目中灵活应用即可。
消息队列(MQ)
我们先从图文上说一下它的使用场景,异步处理,应用解耦,流量削锋和消息通讯四个场景。因为以前开发过商城所以就以下载订单来叙述一下,他的适应场景吧。
异步处理
比如我们下载订单后发送邮件与短信给使用者(简单举例一般不会哈)。那么我们在写程序一般怎么处理呢?(1)把下单信息存入数据库中,调用邮件,短信接口,发送(并行发送或者一个一个发送),返回界面。但是我们计算一下如果每个操作时间为30ms那么最少也需要60ms,多的情况是90ms,
那么如果我们加入消息队列将是一个怎样的情况呢,我们先把下单信息存入数据库,同时把信息放到消息队列。然后就不用管它了。这样的话所用时间就是30ms+1ms(存消息队列)。其实放消息队列中还是要管的的,但那是消费者的事和下单这个生产这无关。
应用解耦
还是商城下载订单的问题,当我们商城下载订单,然后公司内部erp中库存管理相应库存进行同步。一般我们怎么处理,下载完订单,调用erp系统,然后处理erp数据,接着把erp数据库中的信息进行同步到商城,这个时候处理上面提到的效率,还有一个问题,需要解决:如果两个系统不能同时访问,你会怎么做。那么我们就要对两个系统进行解耦了对不对。这个时候消息队列就有了用武之地。如下图:其实消息队列在这个功能下,我们的erp系统也有写入的时候,在这不再累述业务,大家了解消息队列的用途即可。
流量削锋
做过商城的应该都会遇到这个问题,当举行活动是拥挤大量的用户,可能会是系统崩溃,这时候流量控制,和异常处理是一件特别重要的工作。当然请不要说在这其他方法,我们不对其进行讨论,我们尽对消息队列的使用做简单介绍。
消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
以上我们大致说了一下他的使用场景,那么不知道大家有没有了解到它到底是个什么东西?
其实吧消息队列就是一个生产者,把相应消息(对象)放到消息队列(中间件中),然后它就什么都不用管了,接下来消费者(或者叫订阅者)去消息队列中间件中去获取订阅的信息,它自己再去处理。能解决的问题咱们从上面的场景应该已经了解到了,解耦,提高效率。那么重点来了消息队列中间件又是什么呢?它都有哪些,又是怎么实现的呢?下面我们就来了解其中的一个中间件RabbitMq。
RabbitMq
大家大致知道什么是消息队列了,那么它的实现是什么样的呢?现在基本上也知道它实现重要的一环是消息对立中间件,rabbitmq,就是其中之一,其中还包括:Active MQ,Rocket Mq,Kafka,Zero MQ甚至也有人用redis来实现。
从我的角度来说我去了解了两个AcctionMQ与RabbitMq这两种最终选择了它,也简单做了相应的封装,来我先来介绍一下RabbitMq.
RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。它提供的内部机制包括持久性机制、投递确认、发布者证实和高可用性机制,多协议,集群,联合我们可以在实现的过程中针对于性能与可靠性进行相应权衡。
看一下:rabbitmq可视化工具如下(此可视化web的操作请大家自行查询):
其实消息队列的协议是AMQP,有很多对此的介绍在这不再累述。结果上面的了解我们大致知道它是个什么东西,不过我们也要在此提一下,几个概念。消息、队列、路由(包括点对点和发布/订阅),生产者,消费者,具体解释我觉得不需要了,就是你理解的字面意思。
其中队列我们一般用P来表示,消费者一般用C,队列(存消息的集合)用q。路由是R.多个消费者可以访问多个q。接下来开始我们的实现了。
RabbitMq的代码实现
RabbitMq连接
首先看一下配置文件信息:
<appSettings> <!--rabbitMQ--> <add key="serveraddress" value="amqp://192.168.0.76:5672/"/> <add key="virtualhost" value="erpadminvirtualhost"/> <add key="username" value="tx_junpin"/> <add key="password" value="abc.1234%"/>
以上分别是访问服务地址,虚拟地址(可在可视化上手动添加,记得要加一条数据进去,然后删除,好比初始haunted一样),用户,密码。其中web访问地址一般为端口后改为“15672”.
连接关键数据准备好之后就是c# 中代码的实现了
private RabbitConsumerConfig RBGetinfo; private ConnectionFactory cf = new ConnectionFactory(); private IConnection conn; //建立联接 /// <summary> /// 初始化Rabbit连接 /// </summary> /// <param name="rbinfo"></param> public RabbitConsumer(RabbitConsumerConfig rbinfo) { RBGetinfo = rbinfo; cf = new ConnectionFactory() { UserName = RBGetinfo.UserName, Password = RBGetinfo.Password, VirtualHost = RBGetinfo.VirtualHost, RequestedHeartbeat = 0, Uri = RBGetinfo.ServerAddress }; conn = cf.CreateConnection(); }
以上ConnectionFactory 内部为中间件提供的连接工厂。方便与AMQP代理相关联的Connection。用兴趣的小伙伴请F12去看代码吧。
调用代码封装
/// <summary> /// 队列出列的方法,传入处理队列中body的方法,并传入队列名称 /// </summary> /// <param name="messageProcessAction">要执行的方法(委托)</param> /// <param name="queuename">队列名称</param> /// <param name="count">获取数据条数</param> public void ConsumeMessage(Action<string> messageProcessAction, string queuename, ushort count) { if (string.IsNullOrEmpty(queuename)) { throw new ArgumentNullException("queuename"); } CheckConn(); using (IModel ch = conn.CreateModel()) { //第二种取法QueueingBasicConsumer基于订阅模式 QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch); ch.BasicQos(0, count, true); ch.BasicConsume(queuename, false, consumer); while (true) { string message = ""; try { BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); IBasicProperties props = e.BasicProperties; byte[] body = e.Body; message = System.Text.Encoding.UTF8.GetString(body); messageProcessAction.Invoke(System.Text.Encoding.UTF8.GetString(body).Replace("