zoukankan      html  css  js  c++  java
  • rabbitMQ第五种模型(Topic)

    Routing之订阅模型-Topic

      Topic类型的Exchange和Direct相比,都是可以根据RoutingKey把消息路由到不同列队

    只不过Topic类型Exchange可以让列队再绑定RoutingKey的时候使用通配符!

    这种模型RoutingKey一般都是由一个或多个单词组成,多个单词以“.”分割,例如:item.insert

      

      

    生产者:

    public class Provider {
        public static void main(String[] args) throws IOException {
            Connection connection = rabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare("topics","topic");
            String routingkey="user.save.delete";
            channel.basicPublish("topics",routingkey,null,("这里是topics动态路由模型,routingkey["+routingkey+"]").getBytes());
            rabbitMQUtils.connectionAndchannelClose(connection,channel);
        }
    }

    消费者1:

    public class Customer1 {
        public static void main(String[] args) throws IOException {
            Connection connection = rabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare("topics","topic");
            String queue = channel.queueDeclare().getQueue();
            channel.queueBind(queue,"topics","user.*");
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    System.out.println("消费者-1"+new String(body));
                }
            });
        }
    }

    消费者2:

    public class Customer2 {
        public static void main(String[] args) throws IOException {
            Connection connection = rabbitMQUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare("topics","topic");
            String queue = channel.queueDeclare().getQueue();
            channel.queueBind(queue,"topics","user.#");
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    System.out.println("消费者-2"+new String(body));
                }
            });
        }
    }
  • 相关阅读:
    Captura
    食用Win系统自带的PowerShell登录服务器
    uTools
    图片镜像缓存服务
    博客园自定义网站ico
    超星图床
    教程翻译-理解基于矢量场寻路算法
    CentOS防火墙命令
    CentOS7的vsftpd安装和配置
    Unity网路编程-TCP实现细节备忘
  • 原文地址:https://www.cnblogs.com/yz-bky/p/13060198.html
Copyright © 2011-2022 走看看