zoukankan      html  css  js  c++  java
  • RabbitMQ_3、发布/订阅

    发布订阅

    官方例子

    一次向多个消费者发送消息

    send类

    /**
    * Actively declare a non-autodelete exchange with no extra arguments
    * @see com.rabbitmq.client.AMQP.Exchange.Declare
    * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
    * @param exchange the name of the exchange
    * @param type the exchange type
    * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
    * @throws java.io.IOException if an error is encountered
    * @return a declaration-confirm method to indicate the exchange was successfully declared
    */
    Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

    /**
     * @PackageName : com.rzk
     * @FileName : Send
     * @Description : 发布订阅-消息生产者
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:21
     * @Version : 1.0.0
     */
    public class Send {
    
        //定义交换机名称
        private final static String EXCHANGE_NAME = "exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
    
            try (
                    //连接工厂创建连接
                    Connection connection = factory.newConnection();
                    //创建信道
                    Channel channel = connection.createChannel()) {
                    //绑定交换机
                    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                    String message = " 发布  ";
                    //队列消息的生产者:发送消息
                    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
                    System.out.println(" [x] Sent '" + message + "'");
                }
        }
    }
    

    Recv1

    /**
     * @PackageName : com.rzk.simple.recv
     * @FileName : Recv
     * @Description : 发布/订阅-消息接收
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:22
     * @Version : 1.0.0
     */
    public class Recv01 {
        private final static String EXCHANGE_NAME = "exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
            //创建工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
            //连接工厂创建连接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //队列绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //获取队列(排他队列
            String queueName = channel.queueDeclare().getQueue();
            //绑定队列和交换机
            channel.queueBind(queueName,EXCHANGE_NAME,"");
    
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            /**
             * 监听队列消费消息
             * autoAck:自动应答
             * 当消费者收到该消息,会返回通知消息队列 我消费者已经收到消息了
             */
            channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
        }
    }
    

    Recv02

    /**
     * @PackageName : com.rzk.simple.recv
     * @FileName : Recv
     * @Description : 发布/订阅-消息接收
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:22
     * @Version : 1.0.0
     */
    public class Recv02 {
        private final static String EXCHANGE_NAME = "exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
            //创建工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
            //连接工厂创建连接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //获取队列(排他队列)
            String queueName = channel.queueDeclare().getQueue();
            //绑定队列和交换机
            channel.queueBind(queueName,EXCHANGE_NAME,"");
    
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            //监听队列消费消息
            channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
        }
    }
    

  • 相关阅读:
    MySql存储过程学习
    自己用C语言写的扫雷算法
    Spring学习——Hello World
    ICE Service使用方法简介
    DevExpress学习笔记(一)Ribbon
    DevExpress学习笔记(二)NavBarControl
    ORACLE DBLINK无法使用问题
    vbs脚本读写INI文件
    Python操作INI文件:configobj 更好
    个人发展的误区:越广越好,还是越深越好?
  • 原文地址:https://www.cnblogs.com/rzkwz/p/14928397.html
Copyright © 2011-2022 走看看