zoukankan      html  css  js  c++  java
  • RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

      工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者。

      举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败)。如果在一个系统中,用户注册信息有邮箱、手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息。利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列。注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息。但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处理。这个时候就可以利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者。

    1、什么是发布/订阅模式(Publish/Subscribe)

      简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。下图取自于官方网站(RabbitMQ)的发布/订阅模式的图例

    P:消息的生产者

    X:交换机

    红色:队列

    C1,C2:消息消费者

    下面是利用用户注册解释的该模式。(先运行两个消费者,在运行生产者。如果没有提前将队列绑定到交换机,那么直接运行生产者的话,消息是不会发到任何队列里的

    2、生产者(Send)代码

    public class Send
    {
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
        
        public static void main(String[] args)
        {
            try
            {
                //获取连接
                Connection connection = ConnectionUtil.getConnection();
                //从连接中获取一个通道
                Channel channel = connection.createChannel();
                //声明交换机(分发:发布/订阅模式)
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                //发送消息
                for (int i = 0; i < 10; i++)
                {
                    String message = "this is user registe message" + i;
                    System.out.println("[send]:" + message);
                    //发送消息
                    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
                    Thread.sleep(5 * i);
                }
                channel.close();
                connection.close();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }

    运行结果:

    [send]:this is user registe message0
    [send]:this is user registe message1
    [send]:this is user registe message2
    [send]:this is user registe message3
    [send]:this is user registe message4
    [send]:this is user registe message5
    [send]:this is user registe message6
    [send]:this is user registe message7
    [send]:this is user registe message8
    [send]:this is user registe message9

    3、消费者1(ReceiveEmail)

    public class ReceiveEmail
    {
        //交换机名称
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
        
        //队列名称
        private static final String QUEUE_NAME    = "test_queue_email";
        
        public static void main(String[] args)
        {
            try
            {
                
                //获取连接
                Connection connection = ConnectionUtil.getConnection();
                //从连接中获取一个通道
                final Channel channel = connection.createChannel();
                //声明交换机(分发:发布/订阅模式)
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //将队列绑定到交换机
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
                //保证一次只分发一个  
                int prefetchCount = 1;
                channel.basicQos(prefetchCount);
                //定义消费者
                DefaultConsumer consumer = new DefaultConsumer(channel)
                {
                    //当消息到达时执行回调方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException
                    {
                        String message = new String(body, "utf-8");
                        System.out.println("[email] Receive message:" + message);
                        try
                        {
                            //消费者休息2s处理业务
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        finally
                        {
                            System.out.println("[1] done");
                            //手动应答
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                //设置手动应答
                boolean autoAck = false;
                //监听队列
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
        
    }

    运行结果:
    [email] Receive message:this is user registe message0
    [1] done
    [email] Receive message:this is user registe message1
    [1] done
    [email] Receive message:this is user registe message2
    [1] done
    [email] Receive message:this is user registe message3
    [1] done
    [email] Receive message:this is user registe message4
    [1] done
    [email] Receive message:this is user registe message5
    [1] done
    [email] Receive message:this is user registe message6
    [1] done
    [email] Receive message:this is user registe message7
    [1] done
    [email] Receive message:this is user registe message8
    [1] done
    [email] Receive message:this is user registe message9
    [1] done

    4、消费者2(ReceivePhone)

    public class ReceivePhone
    {
        //交换机名称
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
        
        //队列名称
        private static final String QUEUE_NAME    = "test_queue_phone";
        
        public static void main(String[] args)
        {
            try
            {
                
                //获取连接
                Connection connection = ConnectionUtil.getConnection();
                //从连接中获取一个通道
                final Channel channel = connection.createChannel();
                //声明交换机(分发:发布/订阅模式)
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //将队列绑定到交换机
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
                //保证一次只分发一个  
                int prefetchCount = 1;
                channel.basicQos(prefetchCount);
                //定义消费者
                DefaultConsumer consumer = new DefaultConsumer(channel)
                {
                    //当消息到达时执行回调方法
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException
                    {
                        String message = new String(body, "utf-8");
                        System.out.println("[phone] Receive message:" + message);
                        try
                        {
                            //消费者休息2s处理业务
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        finally
                        {
                            System.out.println("[2] done");
                            //手动应答
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                };
                //设置手动应答
                boolean autoAck = false;
                //监听队列
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
        
    }

    运行结果:
    [phone] Receive message:this is user registe message0
    [2] done
    [phone] Receive message:this is user registe message1
    [2] done
    [phone] Receive message:this is user registe message2
    [2] done
    [phone] Receive message:this is user registe message3
    [2] done
    [phone] Receive message:this is user registe message4
    [2] done
    [phone] Receive message:this is user registe message5
    [2] done
    [phone] Receive message:this is user registe message6
    [2] done
    [phone] Receive message:this is user registe message7
    [2] done
    [phone] Receive message:this is user registe message8
    [2] done
    [phone] Receive message:this is user registe message9
    [2] done

    总结:

      1、该模式下生产者并不是直接操作队列,而是将数据发送给交换机,由交换机将数据发送给与之绑定的队列。从运行结果中可以看到,两中类型的消费者(Email,Phone)都收到相同数量的消息。

      2、该模式必须声明交换机,并且设置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout")  fanout指分发模式(将每一条消息都发送到与交换机绑定的队列。

      3、 队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    注意:本文仅代表个人理解和看法哟!和本人所在公司和团体无任何关系!

  • 相关阅读:
    .NET程序默认启动线程数
    TPL中Task执行的内联性线程重入
    Unity容器中的对象生存期管理
    C# 异步 TCP 服务器完整实现
    WPF中多源控制Button的状态
    C# 对 TCP 客户端的状态封装
    WPF异步MVVM等待窗体
    C#实现UDP分包组包
    C#实现RTP数据包传输
    PHP 传引用调用
  • 原文地址:https://www.cnblogs.com/wy697495/p/9614005.html
Copyright © 2011-2022 走看看