zoukankan      html  css  js  c++  java
  • 2.hello rabbitmq

    作者

    微信:tangy8080
    电子邮箱:914661180@qq.com
    更新时间:2019-07-22 22:49:50 星期一

    欢迎您订阅和分享我的订阅号,订阅号内会不定期分享一些我自己学习过程中的编写的文章
    如您在阅读过程中发现文章错误,可添加我的微信 tangy8080 进行反馈.感谢您的支持。

    文章主题

    从一个最简单的hello rabbitmq开始,逐步了解rabbitmq

    本文源码:http://admin@gitblit.honeysuckle.site/r/public/rabbitmq/Solution2.git

    前置条件

    已经构建好了一个rabbitmq集群

    正文

    认识rabbitmq

    首先,rabbitmq是一个消息中间件.它主要解决了应用耦合、异步处理、流量削锋等问题。
    它具有以下特点:

    • 解耦

    假如现在您需要做一个通知中心,您需要将您的消息通知到 A,B,C.
    经验稍差的同学可能会直接代码引用,通知中心直接引用A,B,C的代码.一旦有新的消息,直接调用A B C的逻辑处理.但是这样带来一个问题,现在如果D也需要收到消息.难道通知中心再引用一次D吗? 这显然是一种糟糕的设计.

    一种比较优雅的实现就是观察者模式.A,B,C观察通知中心的消息.如果有新的消息.则做出相应的逻辑. 如果D现在需要知道通知中心的消息,那么它只需要订阅通知中心的消息就可以了.通知中心的逻辑是不需要动的

    • 异步化
      多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间

    • 削峰
      广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况

    专业术语
    • 生产:生产意味着就是发送。 发送消息的程序是一个生产者
    • 队列:这里的队列是指一个名称,但是名称所代表的队列实体寄存在RabbitMQ服务器端中。 虽然消息流过RabbitMQ和您的应用程序,但它们只能存储在队列中。 队列只受主机的内存和磁盘的限制,它本质上是一个大的消息缓冲区。 许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据
    • 消费:消费具有与接收相似的含义。 消费者是一个主要等待接收消息的程序
    工作原理

    使用 Net/C# 客户端完成hello rabbitmq

    生产者:

    using RabbitMQ.Client;
    using System;
    using System.Text;
    
    namespace Server
    {
        class Program
        {
            static void Main(string[] args)
            {
                var connectionFactory = new ConnectionFactory
                {
                    Port = 5672,
                    VirtualHost = "test",
                    HostName = "192.168.161.160",
                    UserName = "test",
                    Password = "123456",
                    AutomaticRecoveryEnabled = true,
                    TopologyRecoveryEnabled = true
                };
    
    
                using (var connection = connectionFactory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "hello",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);
    
                    string message = "Hello RabbitMQ!";
                    var body = Encoding.UTF8.GetBytes(message);
    
                    channel.BasicPublish(exchange: "",
                                  routingKey: "hello",
                                  basicProperties: null,
                                  body: body);
                    Console.WriteLine(" [x] Sent {0}", message);
                }
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
    
    
    1.定义连接参数
    • ConnectionFactory用来创建rabbitmq连接,相比通道来说.连接是高成本的.
    • Port: rabbitmq Server TCP监听端口,由于我们使用nginx做了负载均衡.这里写的是Nginx监听端口
    • VirtualHost: rabbitmq支持虚拟主机,各个虚拟主机之间相互隔离.
    • HostName:主机地址, 由于我们使用nginx做了负载均衡.这里写的是虚拟IP
    • UserName,Password 用户名和密码
    • AutomaticRecoveryEnabled:启用自动连接恢复,当服务端或者客户端异常断开后将尝试自动重连
    • TopologyRecoveryEnabled: 连接恢复交换机,队列,绑定和消费者
    2.创建一个连接

    您可以在创建连接时执行主机列表

           //
            // 摘要:
            //     Create a connection using a list of hostnames using the configured port. By default
            //     each hostname is tried in a random order until a successful connection is found
            //     or the list is exhausted using the DefaultEndpointResolver. The selection behaviour
            //     can be overriden by configuring the EndpointResolverFactory.
            //
            // 参数:
            //   hostnames:
            //     List of hostnames to use for the initial connection and recovery.
            //
            // 返回结果:
            //     Open connection
            //
            // 异常:
            //   T:RabbitMQ.Client.Exceptions.BrokerUnreachableException:
            //     When no hostname was reachable.
            public IConnection CreateConnection(IList<string> hostnames);
    
    3.创建1-N个通道

    通道是建立在连接的基础上的,相对连接来说.通道是低成本.

    4.交换机

    因为rabbitmq已经有了自定义的ampq default exchange,这里暂时不申明交换机

    5.定义队列

    如果队列不存在,则会创建队列.如果两个程序要求创建同一个队列(队列名称相同) 但指定了不同的参数,则后面的定义队列请求会收到异常

    channel.QueueDeclare(queue: "hello",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);
    
    • queue: 队列名称
    • durable: 是否持久化,申明为false.当前连接关闭后队列将被销毁
    • exclusive:是否是独占的,不允许其他连接访问
    • autoDelete :是否自动删除,true,自动删除,自动删除的前提:至少有一个消息者连接到这个队列,之后所有与这个队列连接的消息都断开时,才会自动删除,,
      备注:生产者客户端创建这个队列,或者没有消息者客户端连接这个队列时,不会自动删除这个队列
    6.发送消息
    channel.BasicPublish(exchange: "",
                                  routingKey: "hello",
                                  basicProperties: null,
                                  body: body);
    
    • 服务器发送消息不会直接发送到队列中(Queue),而是直接发送给交换机(Exchange),然后根据确定的规则,RabbitMQ将会决定消息该投递到哪个队列。这些规则称为路由键(routing key),队列通过路由键绑定到交换机上。

    消费者:

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    
    class Receive
    {
        public static void Main()
        {
            var connectionFactory = new ConnectionFactory
            {
                Port = 5672,
               
                VirtualHost = "test",
                HostName = "192.168.161.160",
                UserName = "test",
                Password = "123456",
                AutomaticRecoveryEnabled = true,
                TopologyRecoveryEnabled = true
            };
            using (var connection = connectionFactory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "hello",
                              durable: false,
                              exclusive: false,
                              autoDelete: false,
                              arguments: null);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
                };
                channel.BasicConsume(queue: "hello", true, consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
    
    
        }
    }
    
    

    思考两个问题
    1. 我们在发布消息的时候,routingKey为什么是hello 而不是其他的
    2. 我们在进行消费的时候好像也没有建立queue和exchange的绑定,为啥还是可以收到消息

    答案在这里:

    回到原理图,看下这里我们是如何定义各个角色的

    1.Client 我们这里是生产者,即生产者部分的代码
    2.Exchange 没有显式定义交互机,但它是存在的.名称为AMQP default.在创建virtual host时会自动创建
    3.queue 我们这里定义了名称为 hello的队列
    4.Recive 我们这里是消费者部分的代码

    引用链接

    https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
    https://www.rabbitmq.com/dotnet-api-guide.html

    请尽量按照自己期望的生活 email:18980489167@189.cn
  • 相关阅读:
    链式二叉树遍历具体程序
    非线性结构
    函数调用
    递归
    队列的操作
    MFC多线程详细讲解(转)
    PreTranslateMessage()函数捕获键盘按键消息
    vc和halcon数据的相互赋值
    VC6.0加载lib文件的三种方法
    MFC中按钮控件的用法笔记(转)
  • 原文地址:https://www.cnblogs.com/gytangyao/p/11406086.html
Copyright © 2011-2022 走看看