zoukankan      html  css  js  c++  java
  • EXCHANGE中TOPIC的使用

    1、exchange [topic]
    1)direct 提前预知性的binding info、error、debug、warnning=》exchange
    2)fanout 群发性的binding
    3)headers and、or性质的binding x-match
    4)topic 归类性的binding【带有正则效果】
    queue1 *.com
    queue2 *.cn
    queue3 *.net

    符号:

    *:匹配一个或多个词

    #:匹配一个词

    2、代码实现

    1)生产者

                //基础配置
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "10.123.44.12",
                    UserName = "datamip",
                    Password = "datamip"
                };
    
                //第一步:创建connection
                using (var connection = factory.CreateConnection())
                {
                    //第二步:创建channel
                    using (var channel = connection.CreateModel())
                    {
                        //第三步:创建exchange
                        //channel.ExchangeDeclare("myfanoutexchange", ExchangeType.Fanout, true, false, null);
                        //第四步:发布消息
                        for (int i = 0; i < 100; i++)
                        {
                            var msg = Encoding.UTF8.GetBytes(string.Format("{0}:{1}", i, "你好"));
                            var routingkey = i % 13 == 0 ? "呵呵.com" : "哈哈.cn";
                            channel.BasicPublish("mytopicexchange", routingkey, null, body: msg);
                            Console.WriteLine(i);
                        }
    
                    }
                }
                Console.WriteLine("生产成功!");
                Console.ReadKey();
    

    生产者产生的routing会根据消费者队列绑定的routingkey对应关系去进入不同的队列

    2)消费者

    i)

                //创建连接工厂
                ConnectionFactory factory = new ConnectionFactory
                {
                    UserName = "datamip",//用户名
                    Password = "datamip",//密码
                    HostName = "10.123.44.12"//rabbitmq ip
                };
                //第一步:创建connection
                using (var connection = factory.CreateConnection())
                {
                    //第二步:创建channel
                    using (var channel = connection.CreateModel())
                    {
                        //第三步:生明交换机
                        channel.ExchangeDeclare("mytopicexchange", ExchangeType.Topic, true, false, null);
    
                        //第四步:声明队列,绑定交换机
                        channel.QueueDeclare("mytopicqueue1", true, false, false, null);
                        channel.QueueBind("mytopicqueue1", "mytopicexchange","*.com",null);
    
                        //处理消息
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (sender, e) => {
                            var msg = Encoding.UTF8.GetString(e.Body.ToArray());
                            Console.WriteLine(msg);
                        };
                        //消费
                        channel.BasicConsume("mytopicqueue1", true, consumer);
                        Console.ReadKey();
                    }
                }
    

      

    ii)

                //创建连接工厂
                ConnectionFactory factory = new ConnectionFactory
                {
                    UserName = "datamip",//用户名
                    Password = "datamip",//密码
                    HostName = "10.123.44.12"//rabbitmq ip
                };
                //第一步:创建connection
                using (var connection = factory.CreateConnection())
                {
                    //第二步:创建channel
                    using (var channel = connection.CreateModel())
                    {
                        //第三步:生明交换机
                        channel.ExchangeDeclare("mytopicexchange", ExchangeType.Topic, true, false, null);
    
                        //第四步:声明队列,绑定交换机
                        channel.QueueDeclare("mytopicqueue2", true, false, false, null);
                        channel.QueueBind("mytopicqueue2", "mytopicexchange","*.cn",null);
    
                        //处理消息
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (sender, e) =>
                        {
                            var msg = Encoding.UTF8.GetString(e.Body.ToArray());
                            Console.WriteLine(msg);
                        };
                        //消费
                        channel.BasicConsume("mytopicqueue2", true, consumer);
                        Console.ReadKey();
                    }
                }
    

     iii)效果图

    3、图形解释

    当前的图形解释有些问题,应该是会有不同的routingkey,routingkey会自动匹配,而不是视频中的直接判断,然后赋值*.com和*.cn,这样的话不需要topic交换机,也可以实现的(个人观点)。

    因此代码进行了调整,示意图还是有点问题,自己理解下。

  • 相关阅读:
    Postgresql HStore 插件试用小结
    postgres-xl 安装与部署 【异常处理】ERROR: could not open file (null)/STDIN_***_0 for write, No such file or directory
    GPDB 5.x PSQL Quick Reference
    postgresql 数据库schema 复制
    hive 打印日志
    gp与 pg 查询进程
    jquery table 发送两次请求 解惑
    python 字符串拼接效率打脸帖
    postgresql 日期类型处理实践
    IBM Rational Rose软件下载以及全破解方法
  • 原文地址:https://www.cnblogs.com/sailing92/p/13684133.html
Copyright © 2011-2022 走看看