zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记

    AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,所以安装RabbitMq服务器端前需要先安装Erlang。以下记录windows 7环境下的学习笔记。

    一、RabbitMQ下载安装

    Erlang下载地址:https://www.erlang.org/downloads

    RabbitMQ服务端下载地址:https://www.rabbitmq.com/install-windows.html

    安装好Erlang和RabbitMQ之后都需要新建系统变量,计算机-->属性-->高级系统设置-->环境变量-->系统变量新建

    1、新建ErlangRabbitMQ系统变量

     

     

    建系统变量时值为安装目录,我的安装目录:

    Erlang:C:Program Fileserl10.4

    RabbitMQ:C:Program FilesRabbitMQ Server abbitmq_server-3.7.15

    新建系统变量后,还需要在已有系统变量PATH中增加Erlang和RabbitMQ的配置(注:是追加,PATH原有的值不要删除),以分号分隔。

    Erlang:%ERLANG_HOME%in

    RabbitMQ:%RABBITMQ_SERVER%sbin

     

    2、启动RabbitMQ的管理界面

    cmd中运行如下命令:

    "C:Program FilesRabbitMQ Server
    abbitmq_server-3.7.15sbin
    abbitmq-plugins.bat" enable rabbitmq_management

    默认会生成一个账号guest,密码也是guest。

    查看已有用户及用户的角色命令

    rabbitmqctl.bat list_users

     

    RabbitMQ安装后是默认启动服务的,若要手动启动、停止服务,执行如下命令:

    启动服务:

    net start rabbitmq

    停止服务:

    net stop rabbitmq

    3、登录RabbitMQ的管理界面

    默认地址是本地端口15672,地址:http://127.0.0.1:15672guest账号登录。

    二、Rabbit介绍

    消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

    交换机和交换机类型

    Default exchange默认交换机):一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

    Direct exchange(直连交换机)根据消息携带的路由键(routing key)将消息投递给对应队列的。

    Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

    Topic exchange(主题交换机)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。

    routing_key格式是以点号“."分割的字符表对于routing_key,有两个特殊字符(在正则表达式里叫元字符):

    *代表任意一个单词

    #代表0个或者多个单词

    由于有"*""#"Topic exchange 非常强大并且可以转化为其他的exchange:

    如果binding_key是"#"它会接收所有的Message,不管routing_key是什么,就像是Fanout exchange

    如果"*"和"#"都没有被使用,那么topic exchange就变成了Direct exchange

    Headers exchange(头交换机)有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

    三、Hello Word

    创建2个项目,1个发送端Send,1个接收端Receive,通过Nuget安装RabbitMQ.Client。

    发送端代码:

                var message ="Hello Word ";
    
                var factory = new ConnectionFactory() { HostName = "localhost" };
    
                using (var connection = factory.CreateConnection())
    
                using (var channel = connection.CreateModel())
    
                {
    
                    channel.QueueDeclare(queue: "hello",
    
                        durable: false,
    
                        exclusive: false,
    
                        autoDelete: false,
    
                        arguments: null);
    
                    for (int i = 0; i < 10; i++)
    
                    {
    
                        var body = Encoding.UTF8.GetBytes(message + i);
    
                        channel.BasicPublish(exchange: "",//使用默认交换机
    
                            routingKey: "hello",
    
                            basicProperties: null, //new BasicProperties() { DeliveryMode = 2 },
    
                            body: body);
    
                    }
    
                }

    接收端代码:

                var factory = new ConnectionFactory() { HostName = "localhost" };
    
                var connection = factory.CreateConnection();
    
                var channel = connection.CreateModel();
    
                channel.QueueDeclare(queue: "hello",
    
                    durable: false,
    
                    exclusive: false,
    
                    autoDelete: false,
    
                    arguments: null);
    
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//限制流量,向消费者一条条发送消息
    
                var consumer = new EventingBasicConsumer(channel);
    
                consumer.Received += (model, ea) =>
    
                {
    
                    var body = ea.Body;
    
                    var message = Encoding.UTF8.GetString(body);
    
                    var directory = AppContext.BaseDirectory + "/Log";
    
                    if (!Directory.Exists(directory))
    
                    {
    
                        Directory.CreateDirectory(directory);
    
                    }
    
                    using (FileStream fs = System.IO.File.Open(directory + $"/{DateTime.Now:yyyyMMdd}{ea.ConsumerTag}.txt", FileMode.Append, FileAccess.Write, FileShare.ReadWrite))
    
                    {
    
                        var sr = new StreamWriter(fs);
    
                        sr.WriteLine($"[{DateTime.Now:yyyyMMddHHmmssfff}]");
    
                        sr.WriteLine(ea.ConsumerTag + ":" + message);
    
                        sr.Flush();
    
                        sr.Close();
    
                        Thread.Sleep(100);//可复制一份代码,将等待的时间设置成200,测试多消费者
    
                    }
    
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动消息确认
    
                };
    
                channel.BasicConsume(queue: "hello",
    
                    autoAck: false,//消息确认设置成手动
    
                    consumer: consumer);

    更多的内容可以看以下文章:

    https://blog.csdn.net/anzhsoft2008/column/info/rabbitmq

    http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html

  • 相关阅读:
    数据同步
    闭包的内存泄漏解决办法
    No module named 'MySQLdb'
    pqi 更换pip 国内源
    BZOJ 1934 [Shoi2007]Vote 善意的投票
    BZOJ 2038 [2009国家集训队]小Z的袜子(hose)
    BZOJ 1002 [FJOI2007]轮状病毒
    BZOJ 3442 学习小组
    BZOJ 3261 最大异或和
    BZOJ 4029 [HEOI2015]定价
  • 原文地址:https://www.cnblogs.com/hsybs/p/11165627.html
Copyright © 2011-2022 走看看