zoukankan      html  css  js  c++  java
  • rabbitmq (三) 发布/订阅

    rabbitmq的目的并不是让生产者把消息直接发到队列里面去,

    这样不能实现解耦的目的,也不利于程序的扩展.

    所以就有交换机(exchanges)的概念.

    交换机有几种类型:direct, topic, headers 和fanout,

    可以为交换机命名,还有一种没有命名的交换机,上几章的消息都是发布到没有命名的交换机.

    channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    var
    message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);

    感觉交换机主要控制消息的投递方式.

    临时队列:

    可以通过创建队列的方式对消息的存储等方式进行管理.

    var queueName = channel.QueueDeclare().QueueName;

    绑定:

    最后通过绑定的方式把交换机和队列进行关联.

    channel.QueueBind(queue: queueName,
                                      exchange: "logs",
                                      routingKey: "");

    publish:

    using RabbitMQ.Client;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RMQ_Publish
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    
                    var message = GetMessage(args);
                    int i = 100;
                    while (true)
                    {
                        var body = Encoding.UTF8.GetBytes(message + ":" + i.ToString());
                        channel.BasicPublish(exchange: "logs",
                                             routingKey: "",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine(" [x] Sent {0}", message);
                        
                        if (i-- == 0)
                            i = 100;
                    }
                }
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
    
            private static string GetMessage(string[] args)
            {
                return ((args.Length > 0)
                       ? string.Join(" ", args)
                       : "info: Hello World!");
            }
        }
    }

    subscribe:

    var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    
                    var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: queueName,
                                      exchange: "logs",
                                      routingKey: "");
    
                    Console.WriteLine(" [*] Waiting for logs.");
    
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        
                        Console.WriteLine(" [x] {0}", message);
                        Thread.Sleep(100);
                    };
                    channel.BasicConsume(queue: queueName,
                                         autoAck: true,
                                         consumer: consumer);
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
  • 相关阅读:
    Yield Usage Understanding
    Deadclock on calling async methond
    How to generate file name according to datetime in bat command
    Run Unit API Testing Which Was Distributed To Multiple Test Agents
    druid的关键参数+数据库连接池运行原理
    修改idea打开新窗口的默认配置
    spring boot -thymeleaf-url
    @pathvariable和@RequestParam的区别
    spring boot -thymeleaf-域对象操作
    spring boot -thymeleaf-遍历list和map
  • 原文地址:https://www.cnblogs.com/weichao975/p/8079381.html
Copyright © 2011-2022 走看看