zoukankan      html  css  js  c++  java
  • RabbitMQ 简单的消息发送与接收

    RabbitMQ是建立在AMQP(Advanced Message Queuing Protocol,高级消息队列协议)基础上的,而AMQP是建立在TCP协议之上的。

    因此,RabbitMQ是需要建立TCP连接的。其建立连接的方法如下:

    //首选创建一个连接工厂对象
    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "yyt", Password = "yyt888888",VirtualHost="log" };
    //然后使用工厂对象创建一个TCP连接
    using (var connection = factory.CreateConnection()){
    
    }

    TCP连接的频繁创建与销毁是需要很大开销的,因此RabbitMQ需要在当前TCP连接上建立一些虚拟管道用作通信,就如同一根电缆,外部那根大的橡胶线圈就如同在TCP上建立的连接,内部每一根铜线就如同每一个通信的虚拟管道。如果有多个通信,可以建立多个虚拟管道。创建一根虚拟管道的方法如下:

    //在当前连接上创建一根通信的虚拟管道
    using (var channel = connection.CreateModel()) {
         //TO-DO:do something  
    }

    现在通信的管道有了,接下来就开始发送消息。在发送消息之前,需要创建交换机和队列。因为可以通过交换机去找到队列,从而可以将消息存放到队列中,所以发送消息之前,必须先创建交换机和队列,除非该交换机和队列已经存在。

    创建交换机的方法如下:

    channel.ExchangeDeclare("e.log",  //交换机名称
                           "direct"); //交换机类型

    创建队列的方法如下: 

    channel.QueueDeclare(queue: "q.log.info", //队列名称
                         durable: false,      //当前队列是否持久化
                         exclusive: false,    //是否是私有队列
                         autoDelete: false,   //该队列在最后一个consumer断开之后是否自动删除
                         arguments: null);    //其他配置参数

    交换机怎么知道将消息放入哪个队列呢?因此需要将交换机和队列进行绑定,并产生一个RoutingKey,以后该交换机就可以通过这个RoutingKey找到对应的队列了。

    将交换机和队列绑定的方法如下:

    channel.QueueBind("q.log.info", //队列名称
                      "e.log",      //交换机名称
                      "log.info");  //自定义的RoutingKey

    重点来啦!前面所做的一切就是为了下面这句代码,发送消息到RabbitMQ的队列中。请记住:RabbitMQ的队列中存储的是二进制数据

    发送消息的方法如下:

    //生成二进制消息
    var body = Encoding.UTF8.GetBytes("hello world!");
    //将二进制消息通过交换机发送到队列中
    channel.BasicPublish(exchange: "e.log",      //交换机
                         routingKey: "log.info", //RoutingKey
                         basicProperties: null,  //先让它为空
                         body: body);            //发送的消息(二进制数据)

    至此,一个简单的消息发送操作就完成了。

    接下来开始接收消息操作的演示。

    接收端在接收消息之前,也需要连接RabbitMQ服务器和创建通信管道。

    但是有一个问题,如果该消息队列不存在,还能正常接收消息吗?答案是肯定不能的,而且还会报错!

    因此,我们在接收消息前,还得声明一个队列。如果该队列存在,就返回该队列,如果该队列不存在,就创建一个队列。

    对于同一个队列,接收端创建的方式必须要和发送端一样,每个参数都要一样。RabbitMQ不允许使用不同的参数声明同一个队列,否则会报错!

    接收端连接RabbitMQ服务器、创建通信管道和创建队列方式与发送端一样,如下方法:

    //首选创建一个连接工厂对象
    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "yyt", Password = "yyt888888",VirtualHost="log" };
    //然后使用工厂对象创建一个TCP连接
    using (var connection = factory.CreateConnection()){
        //在当前连接上创建一根通信的虚拟管道
        using (var channel = connection.CreateModel()) {
             channel.QueueDeclare(queue: "q.log.info", //队列名称
                         durable: false,      //当前队列是否持久化
                         exclusive: false,    //是否是私有队列
                         autoDelete: false,   //该队列在最后一个consumer断开之后是否自动删除
                         arguments: null);    //其他配置参数
                         
                         
        }
    }

    接下来创建一个消费者,用来接收并处理从队列中获取的消息。

    创建消费者的方法如下:

    //新建一个事件驱动的消费者
    var consumer = new EventingBasicConsumer(channel);
    //消费者收到消息后的事件处理
    consumer.Received += (model, ea) => {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);                                     
    };
    //通知RabbitMQ服务器,我要接收q.log.info队列的消息,快点发消息给我
    channel.BasicConsume(queue: "q.log.info",
                         noAck: true,
                         consumer: consumer);

    至此,发送消息与接收消息就演示完毕。

    我们整合一下整个代码,如下所示:

    消息发送端(生产者):

    //首先创建一个连接工厂对象
    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "yyt", Password = "yyt888888",VirtualHost="log" };
    //然后使用工厂对象创建一个TCP连接
    using (var connection = factory.CreateConnection()){
        //在当前连接上创建一根通信的虚拟管道
        using (var channel = connection.CreateModel()) {
            //声明一个交换机
            channel.ExchangeDeclare("e.log",  //交换机名称
                                "direct"); //交换机类型
            //声明一个队列
            channel.QueueDeclare(queue: "q.log.info", //队列名称
                                 durable: false,      //当前队列是否持久化
                                 exclusive: false,    //是否是私有队列
                                 autoDelete: false,   //该队列在最后一个consumer断开之后是否自动删除
                                 arguments: null);    //其他配置参数
            //将该队列绑定到交换机,并设置RoutingKey=log.info,以便交换机能通过RoutingKey找到该队列
            channel.QueueBind("q.log.info", //队列名称
                              "e.log",      //交换机名称
                              "log.info");  //自定义的RoutingKey
    
            //生成二进制消息
            var body = Encoding.UTF8.GetBytes("hello world!");
            //将二进制消息通过交换机发送到队列中
            channel.BasicPublish(exchange: "e.log",      //交换机
                                 routingKey: "log.info", //RoutingKey
                                 basicProperties: null,  //先让它为空
                                 body: body);            //发送的消息(二进制数据)
            
        }
    }

    消息接收端(消费者):

    //首选创建一个连接工厂对象
    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "yyt", Password = "yyt888888",VirtualHost="log" };
    //然后使用工厂对象创建一个TCP连接
    using (var connection = factory.CreateConnection()){
        //在当前连接上创建一根通信的虚拟管道
        using (var channel = connection.CreateModel()) {    
            channel.QueueDeclare(queue: "q.log.info", //队列名称
                                 durable: false,      //当前队列是否持久化
                                 exclusive: false,    //是否是私有队列
                                 autoDelete: false,   //该队列在最后一个consumer断开之后是否自动删除
                                 arguments: null);    //其他配置参数
                         
            //新建一个事件驱动的消费者
            var consumer = new EventingBasicConsumer(channel);
            //消费者收到消息后的事件处理
            consumer.Received += (model, ea) => {
                                var body = ea.Body;
                                var message = Encoding.UTF8.GetString(body);
                                Console.WriteLine(" [x] Received {0}", message);                                     
            };
            //通知RabbitMQ服务器,我要接收q.log.info队列的消息,快点发消息给我
            channel.BasicConsume(queue: "q.log.info",
                                 noAck: true,
                                 consumer: consumer);
            
        }
    }

    以上就是一个关于RabbitMQ消息队列的简单的消息发送和接收过程。

    最后让我们喊出我们的口号:捉住那只兔子,炖了它!

  • 相关阅读:
    LeetCode15 3Sum
    LeetCode10 Regular Expression Matching
    LeetCode20 Valid Parentheses
    LeetCode21 Merge Two Sorted Lists
    LeetCode13 Roman to Integer
    LeetCode12 Integer to Roman
    LeetCode11 Container With Most Water
    LeetCode19 Remove Nth Node From End of List
    LeetCode14 Longest Common Prefix
    LeetCode9 Palindrome Number
  • 原文地址:https://www.cnblogs.com/williamwsj/p/8093093.html
Copyright © 2011-2022 走看看