zoukankan      html  css  js  c++  java
  • MQ的订阅模式

    一:介绍

    1.模式

      

    2.使用场景

      一个生产者,多个消费者

      每一个消费者都有自己的队列

      生产者没有直接把消息发送给队列,而是发送到了交换机

      每一个队列都要绑定到交换机

      可以实现一个消息被多个消费者消费。

    二:程序

    1.生产者

     1 package com.mq.PubSubFanout;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class FanoutSend {
     8     private static final String EXCHANGE_NAME="text_exchange_fanout";
     9     public static void main(String[] args) throws Exception {
    10         //获取一个连接
    11         Connection connection= ConnectionUtil.getConnection();
    12         //从连接中获取一个通道
    13         Channel channel=connection.createChannel();
    14         //创建交换机
    15         channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    16 
    17         //消息
    18         String msg="hello pubsub";
    19 
    20         //发送
    21         channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
    22 
    23         System.out.println("send msg:"+msg);
    24         //关闭连接
    25         channel.close();
    26         connection.close();
    27     }
    28 }

    2.消费者一

     1 package com.mq.PubSubFanout;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class FanoutReceive1 {
     9     private static final String EXCHANGE_NAME="text_exchange_fanout";
    10     private static final String QUENE_NAME="test_fanout_queue_email";
    11     public static void main(String[] args)throws Exception{
    12         //获取一个连接
    13         Connection connection = ConnectionUtil.getConnection();
    14         //创建通道
    15         final Channel channel = connection.createChannel();
    16         //创建队列声明
    17         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    18 
    19         //绑定交换机
    20         channel.queueBind(QUENE_NAME,EXCHANGE_NAME,"");
    21 
    22         //一次只能发送一个消息
    23         channel.basicQos(1);
    24 
    25         //创建消费者
    26         DefaultConsumer consumer=new DefaultConsumer(channel){
    27             @Override
    28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29                 String msg=new String(body,"utf-8");
    30                 System.out.println("[1]receive msg:"+msg);
    31                 try {
    32                     Thread.sleep(200);
    33                 } catch (InterruptedException e) {
    34                     e.printStackTrace();
    35                 }finally {
    36                     System.out.println("done");
    37                     //手动应答
    38                     channel.basicAck(envelope.getDeliveryTag(),false);
    39                 }
    40             }
    41         };
    42         //监听队列,不是自动应答
    43         boolean autoAck=false;
    44         channel.basicConsume(QUENE_NAME,autoAck,consumer);
    45     }
    46 }

    3.消费者二

     1 package com.mq.PubSubFanout;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class FanoutReceive2 {
     9     private static final String EXCHANGE_NAME="text_exchange_fanout";
    10     private static final String QUENE_NAME="test_fanout_queue_ems";
    11     public static void main(String[] args)throws Exception{
    12         //获取一个连接
    13         Connection connection = ConnectionUtil.getConnection();
    14         //创建通道
    15         final Channel channel = connection.createChannel();
    16         //创建队列声明
    17         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    18 
    19         //绑定交换机
    20         channel.queueBind(QUENE_NAME,EXCHANGE_NAME,"");
    21 
    22         //一次只能发送一个消息
    23         channel.basicQos(1);
    24 
    25         //创建消费者
    26         DefaultConsumer consumer=new DefaultConsumer(channel){
    27             @Override
    28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29                 String msg=new String(body,"utf-8");
    30                 System.out.println("[2]receive msg:"+msg);
    31                 try {
    32                     Thread.sleep(200);
    33                 } catch (InterruptedException e) {
    34                     e.printStackTrace();
    35                 }finally {
    36                     System.out.println("done");
    37                     //手动应答
    38                     channel.basicAck(envelope.getDeliveryTag(),false);
    39                 }
    40             }
    41         };
    42         //监听队列,不是自动应答
    43         boolean autoAck=false;
    44         channel.basicConsume(QUENE_NAME,autoAck,consumer);
    45     }
    46 }

    4.效果

      send:

      

      receive1:

      

      receive2:

      

    5.运行注意点

      如果之间运行receive类,会发现报错,因为没有交换机。

      所以,可以先运行send类,虽然交换机不能存储发送的消息,但是可以创建交换机。

      然后,就可以按照原来的方式。

      先启动两个消费者进行监听,然后启动生产者。

      现象:就是消费者都获取到了生产者生产的消息。

  • 相关阅读:
    《Advanced Bash-scripting Guide》学习(四):一个显示时间日期登录用户的脚本
    《Advanced Bash-scripting Guide》学习(三):自删除脚本和自读取内容的脚本
    51nod 1005 大数加法
    51nod1019 逆序数
    scoi2010 幸运数字
    COGS 513 八
    [HNOI2006]超级英雄Hero
    NOIP2010 关押罪犯
    [Scoi2010]游戏
    bzoj 2820: YY的GCD
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8605874.html
Copyright © 2011-2022 走看看