zoukankan      html  css  js  c++  java
  • RabbitMQ初次使用

    背景

    之前的工程一直使用的MSMQ,其实也没有用的很深。
    不过想往容器化部署发展,MSMQ依赖于windows操作系统,基本是不太可能的,因为windows对docker的支持就很差。

    切换的话,选择尝试比较出名的RabbitMQ作为替代。

    概念

    消息队列的概念很简单,但是到了RabbitMQ上,会有其他一系列复杂概念,不单是写入和读取这样简单了。
    这篇文章对我的帮助很大。

    信息流

    RabbitMQ其实是一种AMQP的实现,似乎实现在术语上称为Broker
    它的标准信息流向如下图:

    MessageFlow

    这里的作用直接摘抄了原文

    1. The producer publishes a message to the exchange.
    2. The exchange receives the message and is now responsible for the routing of the message.
    3. A binding has to be set up between the queue and the exchange. In this case, we have bindings to two different queues from the exchange. The exchange routes the message in to the queues.
    4. The messages stay in the queue until they are handled by a consumer
    5. The consumer handles the message.

    Exchange类型

    Queue可以绑定不同的Exchange类型,不同的Exchange有不同的消息转发机制。

    Direct Exchange

    绑定在该Exchange上的队列,消息按照Routing Key全字匹配Binding key转发到指定队列。

    协议规定默认存在的Exchange""是Direct Exchange,可以直接使用。

    Topic Exchange

    该类型的Binding Key支持通配符,通过*#,匹配Routing Key

    默认的Topic Exchange名称是amq.topic

    Fanout Exchange

    消息不做处理,直接转发给所有绑定的队列。

    默认的Fanout Exchange是amq.fanout

    Headers Exchange

    不再通过Binding keyRouting key,而是通过绑定时定义的Header(键值对),和发送消息时添加的Header(同样是键值对)进行匹配。

    匹配模式有:1. All消息中的Header必须全部满足。 2. Any仅满足一个即可接收。

    默认的Headers Exhangeamq.headers

    安装

    安装RabbitMQ需要先安装Erlang运行环境。

    安装RabbitMQ时,安装时默认会把各种日志和配置写入AppData目录,很不友好。

    找到一个方案,即安装时,先不安装RabbitMQ服务。
    然后配置环境变量:

    set RABBITMQ_BASE=D:RabbitMQData
    

    再通过RabbitMQ安装脚本注册服务

    rabbitmq-service.bat install
    

    使用

    RabbitMQ配置

    环境变量

    可以讲rabbitmq安装目录的sbin目录加入path,这样方便执行指令

    初始化

    默认的Guest账号不能远程登陆,需要添加其他用户。
    同时新添加的用户不能访问VHOST,需要添加用户权限。

    rabbitmqctl.bat  add_user  rabbitmquser  123456
    rabbitmqctl.bat  set_permissions -p / rabbitmquser '.*' '.*' '.*'
    

    管理界面

    通过rabbitmq-plugins.bat enable rabbitmq_management使能管理界面,可以实时监控rabbitmq使用情况。
    默认的web地址是15672端口。

    代码访问

    这里粘贴上官网的示例代码:

    发送方:

    using System;
    using RabbitMQ.Client;
    using System.Text;
    
    class Send
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
    
                channel.BasicPublish(exchange: "",
                                     routingKey: "hello",
                                     basicProperties: null,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
    

    接收方:

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    
    class Receive
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
                };
                channel.BasicConsume(queue: "hello",
                                     autoAck: true,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
    

    有几点需要注意:

    • 发送方和接收方最好都声明exchange和queue,这样利于解耦
    • connection尽量复用,不要每次使用都创建
    • channel建议同一线程复用,但是不要跨线程复用
  • 相关阅读:
    Linux_DNS服务器
    Linux_DNS服务器
    Linux_FTP服务器
    Linux_FTP服务器
    Linux_DHCP&DHCP Relay
    分布式内存存储式元数据服务的构建
    如何提高分布式系统的可观察性:Insight Tool的引入
    论分布式系统中Metric框架的设计
    分布式存储系统关于GDPR条例中的数据清除原则
    Pipeline并行处理模型
  • 原文地址:https://www.cnblogs.com/mosakashaka/p/12608757.html
Copyright © 2011-2022 走看看