zoukankan      html  css  js  c++  java
  • RabbitMQ 简单的消息发送与接收

    RabbitMQ是建立在AMQP(Advanced Message Queuing Protocol,高级消息队列协议)基础上的,而AMQP是建立在TCP协议之上的。

    因此,RabbitMQ是需要建立TCP连接的。其建立连接的方法如下:

    //首选创建一个连接工厂对象
    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "yyt", Password = "yyt888888",VirtualHost="log" };
    //然后使用工厂对象创建一个TCP连接
    using (var connection = factory.CreateConnection()){
    
    }

    TCP连接的频繁创建与销毁是需要很大开销的,因此RabbitMQ需要在当前TCP连接上建立一些虚拟管道用作通信,就如同一根电缆,外部那根大的橡胶线圈就如同在TCP上建立的连接,内部每一根铜线就如同每一个通信的虚拟管道。如果有多个通信,可以建立多个虚拟管道。创建一根虚拟管道的方法如下:

    //在当前连接上创建一根通信的虚拟管道
    using (var channel = connection.CreateModel()) {
         //TO-DO:do something  
    }

    现在通信的管道有了,接下来就开始发送消息。在发送消息之前,需要创建交换机和队列。因为可以通过交换机去找到队列,从而可以将消息存放到队列中,所以发送消息之前,必须先创建交换机和队列,除非该交换机和队列已经存在。

    创建交换机的方法如下:

    channel.ExchangeDeclare("e.log",  //交换机名称
                           "direct"); //交换机类型

    创建队列的方法如下: 

    channel.QueueDeclare(queue: "q.log.info", //队列名称
                         durable: false,      //当前队列是否持久化
                         exclusive: false,    //是否是私有队列
                         autoDelete: false,   //该队列在最后一个consumer断开之后是否自动删除
                         arguments: null);    //其他配置参数

    交换机怎么知道将消息放入哪个队列呢?因此需要将交换机和队列进行绑定,并产生一个RoutingKey,以后该交换机就可以通过这个RoutingKey找到对应的队列了。

    将交换机和队列绑定的方法如下:

    channel.QueueBind("q.log.info", //队列名称
                      "e.log",      //交换机名称
                      "log.info");  //自定义的RoutingKey

    重点来啦!前面所做的一切就是为了下面这句代码,发送消息到RabbitMQ的队列中。请记住:RabbitMQ的队列中存储的是二进制数据

    发送消息的方法如下:

    //生成二进制消息
    var body = Encoding.UTF8.GetBytes("hello world!");
    //将二进制消息通过交换机发送到队列中
    channel.BasicPublish(exchange: "e.log",      //交换机
                         routingKey: "log.info", //RoutingKey
                         basicProperties: null,  //先让它为空
                         body: body);            //发送的消息(二进制数据)

    至此,一个简单的消息发送操作就完成了。

    接下来开始接收消息操作的演示。

    接收端在接收消息之前,也需要连接RabbitMQ服务器和创建通信管道。

    但是有一个问题,如果该消息队列不存在,还能正常接收消息吗?答案是肯定不能的,而且还会报错!

    因此,我们在接收消息前,还得声明一个队列。如果该队列存在,就返回该队列,如果该队列不存在,就创建一个队列。

    对于同一个队列,接收端创建的方式必须要和发送端一样,每个参数都要一样。RabbitMQ不允许使用不同的参数声明同一个队列,否则会报错!

    接收端连接RabbitMQ服务器、创建通信管道和创建队列方式与发送端一样,如下方法:

    //首选创建一个连接工厂对象
    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "yyt", Password = "yyt888888",VirtualHost="log" };
    //然后使用工厂对象创建一个TCP连接
    using (var connection = factory.CreateConnection()){
        //在当前连接上创建一根通信的虚拟管道
        using (var channel = connection.CreateModel()) {
             channel.QueueDeclare(queue: "q.log.info", //队列名称
                         durable: false,      //当前队列是否持久化
                         exclusive: false,    //是否是私有队列
                         autoDelete: false,   //该队列在最后一个consumer断开之后是否自动删除
                         arguments: null);    //其他配置参数
                         
                         
        }
    }

    接下来创建一个消费者,用来接收并处理从队列中获取的消息。

    创建消费者的方法如下:

    //新建一个事件驱动的消费者
    var consumer = new EventingBasicConsumer(channel);
    //消费者收到消息后的事件处理
    consumer.Received += (model, ea) => {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);                                     
    };
    //通知RabbitMQ服务器,我要接收q.log.info队列的消息,快点发消息给我
    channel.BasicConsume(queue: "q.log.info",
                         noAck: true,
                         consumer: consumer);

    至此,发送消息与接收消息就演示完毕。

    我们整合一下整个代码,如下所示:

    消息发送端(生产者):

    //首先创建一个连接工厂对象
    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "yyt", Password = "yyt888888",VirtualHost="log" };
    //然后使用工厂对象创建一个TCP连接
    using (var connection = factory.CreateConnection()){
        //在当前连接上创建一根通信的虚拟管道
        using (var channel = connection.CreateModel()) {
            //声明一个交换机
            channel.ExchangeDeclare("e.log",  //交换机名称
                                "direct"); //交换机类型
            //声明一个队列
            channel.QueueDeclare(queue: "q.log.info", //队列名称
                                 durable: false,      //当前队列是否持久化
                                 exclusive: false,    //是否是私有队列
                                 autoDelete: false,   //该队列在最后一个consumer断开之后是否自动删除
                                 arguments: null);    //其他配置参数
            //将该队列绑定到交换机,并设置RoutingKey=log.info,以便交换机能通过RoutingKey找到该队列
            channel.QueueBind("q.log.info", //队列名称
                              "e.log",      //交换机名称
                              "log.info");  //自定义的RoutingKey
    
            //生成二进制消息
            var body = Encoding.UTF8.GetBytes("hello world!");
            //将二进制消息通过交换机发送到队列中
            channel.BasicPublish(exchange: "e.log",      //交换机
                                 routingKey: "log.info", //RoutingKey
                                 basicProperties: null,  //先让它为空
                                 body: body);            //发送的消息(二进制数据)
            
        }
    }

    消息接收端(消费者):

    //首选创建一个连接工厂对象
    var factory = new ConnectionFactory() { HostName = "localhost", UserName = "yyt", Password = "yyt888888",VirtualHost="log" };
    //然后使用工厂对象创建一个TCP连接
    using (var connection = factory.CreateConnection()){
        //在当前连接上创建一根通信的虚拟管道
        using (var channel = connection.CreateModel()) {    
            channel.QueueDeclare(queue: "q.log.info", //队列名称
                                 durable: false,      //当前队列是否持久化
                                 exclusive: false,    //是否是私有队列
                                 autoDelete: false,   //该队列在最后一个consumer断开之后是否自动删除
                                 arguments: null);    //其他配置参数
                         
            //新建一个事件驱动的消费者
            var consumer = new EventingBasicConsumer(channel);
            //消费者收到消息后的事件处理
            consumer.Received += (model, ea) => {
                                var body = ea.Body;
                                var message = Encoding.UTF8.GetString(body);
                                Console.WriteLine(" [x] Received {0}", message);                                     
            };
            //通知RabbitMQ服务器,我要接收q.log.info队列的消息,快点发消息给我
            channel.BasicConsume(queue: "q.log.info",
                                 noAck: true,
                                 consumer: consumer);
            
        }
    }

    以上就是一个关于RabbitMQ消息队列的简单的消息发送和接收过程。

    最后让我们喊出我们的口号:捉住那只兔子,炖了它!

  • 相关阅读:
    特征选取1-from sklearn.feature_selection import SelectKBest
    使用K-S检验一个数列是否服从正态分布、两个数列是否服从相同的分布
    风控8-收码平台
    风控7-同盾设备指纹
    互联网黑产剖析_虚假号码
    风控3_iv算法详细解释
    woe_iv原理和python代码建模
    逾期30天和60天的回款概率
    (剑指Offer)面试题15:链表中倒数第k个结点
    (剑指Offer)面试题14:调整数组顺序使奇数位于偶数前面
  • 原文地址:https://www.cnblogs.com/williamwsj/p/8093093.html
Copyright © 2011-2022 走看看