zoukankan      html  css  js  c++  java
  • rabbitmq-01

    MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQPAdvanced Message
    Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
    发中应用非常广泛

    应用场景
    1、任务异步处理。
    将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
    2、应用程序解耦合
    MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合

     Jms和AMQP区别

    JMSjava提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java
    jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么 不同,jmsjava语言专属的消
    息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的

    Broker:消息队列服务进程,此进程包括两个部分:ExchangeQueue
    Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

    Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
    Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ
    Consumer:消息消费者,即消费方客户端,接收MQ转发的消息

    安装

    管理员运行rabbitmq-plugins.bat enable rabbitmq_management

    输入:http://localhost:15672  guest guest

    2.Demo演示 hello world

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp‐client</artifactId>
      <version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring‐boot‐starter‐logging</artifactId>
    </dependency>

    生产者

     1 public class Producer01 {
     2 
     3     private static final String QUEUE = "helloworld";
     4 
     5     public static void main(String[] args) {
     6         //通过连接工厂
     7         ConnectionFactory connectionFactory = new ConnectionFactory();
     8         connectionFactory.setHost("127.0.0.1");
     9         connectionFactory.setPort(5672);
    10         connectionFactory.setUsername("guest");
    11         connectionFactory.setPassword("guest");
    12 
    13         //设置虚拟机,一个mq设置多个虚拟机
    14         connectionFactory.setVirtualHost("/");
    15         Connection connection = null;
    16         Channel channel = null;
    17 
    18         try {
    19             connection = connectionFactory.newConnection();
    20              channel = connection.createChannel();
    21 
    22             /**
    23              * QueryDeclare
    24              * 1.query 队列名称
    25              * 2.durable 是否持久化
    26              * 3.exclusive 是否独占链路
    27              * 4.autoDelete 自动删除
    28              * 5.argument 扩展参数
    29              *
    30              */
    31 
    32             channel.queueDeclare(QUEUE,true,false,false,null);
    33 
    34             /**
    35              * 参数明细channel.basicPublish()
    36              * 1.exchange 交换机,不指定为"".
    37              * 2.routingkey 路由key 交换机根据路由key来将消息转发到指定队列
    38              * 3.props 消息的属性
    39              * 4.body 消息内容
    40              */
    41             String message = "hello rabbitmq";
    42             channel.basicPublish("",QUEUE,null,message.getBytes());
    43             System.out.println("send to mq"+message);
    44 
    45         } catch (Exception e) {
    46             e.printStackTrace();
    47         } finally {
    48             //关闭连接
    49             //关闭通道
    50             try {
    51                 channel.close();
    52             } catch (IOException e) {
    53                 e.printStackTrace();
    54             } catch (TimeoutException e) {
    55                 e.printStackTrace();
    56             }
    57             try {
    58                 connection.close();
    59             } catch (IOException e) {
    60                 e.printStackTrace();
    61             }
    62         }
    63     }
    64 }

    消费者

     1 public class Consumer01 {
     2 
     3     private static final String QUEUE = "helloworld";
     4 
     5     public static void main(String[] args) throws IOException,TimeoutException{
     6         ConnectionFactory factory = new ConnectionFactory();
     7         //设置MabbitMQ所在服务器的ip和端口
     8         factory.setHost("127.0.0.1");
     9         factory.setPort(5672);
    10 
    11         Connection connection = factory.newConnection();
    12         Channel channel = connection.createChannel();
    13 
    14         channel.queueDeclare(QUEUE, true, false, false, null);
    15 
    16         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    17 
    18 
    19 
    20             /**
    21              * toString方法,接受到方法此方法调用
    22              * @param consumerTag:消费者标签,用来标识消费者
    23              * @param envelope:信封,
    24              * @param properties:消息属性
    25              * @param body:消费内容
    26              * @throws IOException
    27              */
    28 
    29             @Override
    30             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    31 
    32                 String exchange = envelope.getExchange();
    33 
    34                 long deliveryTag = envelope.getDeliveryTag();
    35 
    36                 String message = new String(body,"utf-8");
    37                 System.out.println("receive message:" + message);
    38 
    39             }
    40         };
    41         /**监听队列String queue, boolean autoAck,Consumer callback
    42          * 1.queue 队列名称
    43          * 2.autoack 自动回复
    44          * 3. callback 消费方法,当消费者接收到的消息执行的方法
    45          */
    46         channel.basicConsume(QUEUE,true,defaultConsumer);
    47 
    48     }
    49 }

    1、发送端操作流程
    1)创建连接,2)创建通道,3)声明队列,4)发送消息
    2、接收端
    1)创建连接,2)创建通道,3)声明队列,4)监听队列,5)接收消息 

    工作模式

    1.workqueue

    多了一个消费端,两个消费端共同消费同一个队列中的消息。
    应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

    2.publish/subscribe

    发布订阅模式:
    1、每个消费者监听自己的队列。
    2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
    到消息 

    案例:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。

    声明Exchange_fanout_inform交换机 ,声明两个队列并且绑定到此交换机, 且不需指定routeing key

    生产者

     1 public class Producer02 {
     2 
     3     private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
     4     private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
     5     private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
     6 
     7     public static void main(String[] args) {
     8         //通过连接工厂
     9         ConnectionFactory connectionFactory = new ConnectionFactory();
    10         connectionFactory.setHost("127.0.0.1");
    11         connectionFactory.setPort(5672);
    12         connectionFactory.setUsername("guest");
    13         connectionFactory.setPassword("guest");
    14 
    15         //设置虚拟机,一个mq设置多个虚拟机
    16         connectionFactory.setVirtualHost("/");
    17         Connection connection = null;
    18         Channel channel = null;
    19 
    20         try {
    21             connection = connectionFactory.newConnection();
    22              channel = connection.createChannel();
    23 
    24             /**
    25              * QueryDeclare
    26              * 1.query 队列名称
    27              * 2.durable 是否持久化
    28              * 3.exclusive 是否独占链路
    29              * 4.autoDelete 自动删除
    30              * 5.argument 扩展参数
    31              *
    32              */
    33             //声明两个队列
    34             channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    35             channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    36 
    37             //声明一个交换机
    38 
    39             /**
    40              * String exchange String type
    41              * 1.交换机的名称
    42              * 2.交换机的类型
    43              *fanout 对应的工作模式 publish/subscribe
    44              * direct 对应Routing
    45              * topic 对应Topics
    46              * headers 对应headers
    47              */
    48             channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
    49             //交换机和队列进行绑定
    50             /** 1.queue 队列名称
    51              * 2.exchange 交换机名称
    52              * 3.routingKey 在发布订阅模式中为空字符串,交换机根据路由key的值将消息转发到指定队列
    53              */
    54             channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
    55             channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
    56 
    57 
    58 
    59             /**
    60              * 参数明细channel.basicPublish()
    61              * 1.exchange 交换机,不指定为"".
    62              * 2.routingkey 路由key 交换机根据路由key来将消息转发到指定队列
    63              * 3.props 消息的属性
    64              * 4.body 消息内容
    65              */
    66             for (int i = 0;i<5;i++){
    67                 String message = "send inform to user";
    68                 channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());
    69                 System.out.println("send to mq"+message);
    70             }
    71 
    72         } catch (Exception e) {
    73             e.printStackTrace();
    74         } finally {
    75             //关闭连接
    76             //关闭通道
    77             try {
    78                 channel.close();
    79             } catch (IOException e) {
    80                 e.printStackTrace();
    81             } catch (TimeoutException e) {
    82                 e.printStackTrace();
    83             }
    84             try {
    85                 connection.close();
    86             } catch (IOException e) {
    87                 e.printStackTrace();
    88             }
    89         }
    90     }
    91 }

    消费者-邮件消费者

     1 public class Consumer02_subscribe_sms {
     2     //队列名称
     3     private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
     4     private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
     5 
     6 
     7     public static void main(String[] args) throws IOException, TimeoutException {
     8         //通过连接工厂创建新的连接和mq建立连接
     9         ConnectionFactory factory = new ConnectionFactory();
    10         factory.setHost("127.0.0.1");
    11         factory.setPort(5672);//端口
    12         factory.setUsername("guest");
    13         factory.setPassword("guest");
    14         //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
    15         factory.setVirtualHost("/");
    16 
    17         //建立新连接
    18         Connection connection = factory.newConnection();
    19         //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
    20         Channel channel = connection.createChannel();
    21 
    22         /**
    23          * 参数明细
    24          * 1、queue 队列名称
    25          * 2、durable 是否持久化,如果持久化,mq重启后队列还在
    26          * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
    27          * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
    28          * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
    29          */
    30         channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
    31         //声明一个交换机
    32         //参数:String exchange, String type
    33         /**
    34          * 参数明细:
    35          * 1、交换机的名称
    36          * 2、交换机的类型
    37          * fanout:对应的rabbitmq的工作模式是 publish/subscribe
    38          * direct:对应的Routing    工作模式
    39          * topic:对应的Topics工作模式
    40          * headers: 对应的headers工作模式
    41          */
    42         channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
    43         //进行交换机和队列绑定
    44         //参数:String queue, String exchange, String routingKey
    45         /**
    46          * 参数明细:
    47          * 1、queue 队列名称
    48          * 2、exchange 交换机名称
    49          * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
    50          */
    51         channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");
    52 
    53         //实现消费方法
    54         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    55 
    56             /**
    57              * 当接收到消息后此方法将被调用
    58              * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
    59              * @param envelope 信封,通过envelope
    60              * @param properties 消息属性
    61              * @param body 消息内容
    62              * @throws IOException
    63              */
    64             @Override
    65             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    66                 //交换机
    67                 String exchange = envelope.getExchange();
    68                 //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
    69                 long deliveryTag = envelope.getDeliveryTag();
    70                 //消息内容
    71                 String message= new String(body,"utf-8");
    72                 System.out.println("receive message:"+message);
    73             }
    74         };
    75 
    76         //监听队列
    77         //参数:String queue, boolean autoAck, Consumer callback
    78         /**
    79          * 参数明细:
    80          * 1、queue 队列名称
    81          * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
    82          * 3、callback,消费方法,当消费者接收到消息要执行的方法
    83          */
    84         channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
    85 
    86     }
    87 }

    消费者-email消费者

     1 public class Consumer02_subscribe_email {
     2 
     3     private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
     4     private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
     5 
     6 
     7     public static void main(String[] args) throws IOException,TimeoutException {
     8         ConnectionFactory factory = new ConnectionFactory();
     9     //设置MabbitMQ所在服务器的ip和端口
    10         factory.setHost("127.0.0.1");
    11         factory.setPort(5672);
    12         factory.setUsername("guest");
    13         factory.setPassword("guest");
    14         factory.setVirtualHost("/");
    15 
    16         Connection connection = factory.newConnection();
    17         Channel channel = connection.createChannel();
    18 
    19         channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
    20         //声明一个交换机
    21 
    22         channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
    23 
    24         channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
    25 
    26         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    27 
    28             /**
    29              * toString方法,接受到方法此方法调用
    30              * @param consumerTag:消费者标签,用来标识消费者
    31              * @param envelope:信封,
    32              * @param properties:消息属性
    33              * @param body:消费内容
    34              * @throws IOException
    35              */
    36 
    37             @Override
    38             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    39 
    40                 String exchange = envelope.getExchange();
    41 
    42                 long deliveryTag = envelope.getDeliveryTag();
    43 
    44                 String message = new String(body,"utf-8");
    45                 System.out.println("receive message:" + message);
    46 
    47             }
    48         };
    49 
    50 
    51 
    52 
    53         /**
    54          * 1.queue 队列名称
    55          * 2.autoack 自动回复
    56          * 3. callback 消费方法,当消费者接收到的消息执行的方法
    57          */
    58         channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    59     }
    60 }

    1publish/subscribework queues有什么区别。
    区别:
    1work queues不用定义交换机,而publish/subscribe需要定义交换机。
    2publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认
    交换机)
    3publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实质上work queues会将队列绑
    定到默认的交换机 。
    相同点:
    所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。

    Routing

    路由模式:
    1、每个消费者监听自己的队列,并且设置routingkey
    2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列

    案例:

    声明exchange_routing_inform交换机。
    声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
    发送消息时需要指定routingkey

    生产者

      1 public class Producer03 {
      2 
      3     private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
      4     private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
      5     private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
      6     private static final String ROUTINGKEY_EMAIL="inform_email";
      7     private static final String ROUTINGKEY_SMS="inform_sms";
      8 
      9     public static void main(String[] args) {
     10         //通过连接工厂
     11         ConnectionFactory connectionFactory = new ConnectionFactory();
     12         connectionFactory.setHost("127.0.0.1");
     13         connectionFactory.setPort(5672);
     14         connectionFactory.setUsername("guest");
     15         connectionFactory.setPassword("guest");
     16 
     17         //设置虚拟机,一个mq设置多个虚拟机
     18         connectionFactory.setVirtualHost("/");
     19         Connection connection = null;
     20         Channel channel = null;
     21 
     22         try {
     23             connection = connectionFactory.newConnection();
     24             channel = connection.createChannel();
     25 
     26             /**
     27              * QueryDeclare
     28              * 1.query 队列名称
     29              * 2.durable 是否持久化
     30              * 3.exclusive 是否独占链路
     31              * 4.autoDelete 自动删除
     32              * 5.argument 扩展参数
     33              *
     34              */
     35             //声明两个队列
     36             channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
     37             channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
     38 
     39             //声明一个交换机
     40 
     41             /**
     42              * String exchange String type
     43              * 1.交换机的名称
     44              * 2.交换机的类型
     45              *fanout 对应的工作模式 publish/subscribe
     46              * direct 对应Routing
     47              * topic 对应Topics
     48              * headers 对应headers
     49              */
     50             channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
     51             //交换机和队列进行绑定
     52             /** 1.queue 队列名称
     53              * 2.exchange 交换机名称
     54              * 3.routingKey 在发布订阅模式中为空字符串,交换机根据路由key的值将消息转发到指定队列
     55              */
     56             channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
     57             channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
     58 
     59 
     60 
     61             /**
     62              * 参数明细channel.basicPublish()
     63              * 1.exchange 交换机,不指定为"".
     64              * 2.routingkey 路由key 交换机根据路由key来将消息转发到指定队列
     65              * 3.props 消息的属性
     66              * 4.body 消息内容
     67              */
     68             for (int i = 0;i<5;i++){
     69                 String message = "send email inform to user";
     70                 channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());
     71                 System.out.println("send to mq"+message);
     72             }
     73 
     74             for (int i = 0;i<5;i++){
     75                 String message = "send sms inform to user";
     76                 channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());
     77                 System.out.println("send to mq"+message);
     78             }
     79 
     80 
     81 
     82         } catch (Exception e) {
     83             e.printStackTrace();
     84         } finally {
     85             //关闭连接
     86             //关闭通道
     87             try {
     88                 channel.close();
     89             } catch (IOException e) {
     90                 e.printStackTrace();
     91             } catch (TimeoutException e) {
     92                 e.printStackTrace();
     93             }
     94             try {
     95                 connection.close();
     96             } catch (IOException e) {
     97                 e.printStackTrace();
     98             }
     99 
    100 
    101         }
    102 
    103 
    104     }
    105 
    106 }

    消费者 email消费者

     1 public class Consumer03_routing_email {
     2 
     3     private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
     4     private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
     5     private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
     6     private static final String ROUTINGKEY_EMAIL="inform_email";
     7     private static final String ROUTINGKEY_SMS="inform_sms";
     8 
     9 
    10     public static void main(String[] args) throws IOException,TimeoutException {
    11         ConnectionFactory factory = new ConnectionFactory();
    12     //设置MabbitMQ所在服务器的ip和端口
    13         factory.setHost("127.0.0.1");
    14         factory.setPort(5672);
    15         factory.setUsername("guest");
    16         factory.setPassword("guest");
    17         factory.setVirtualHost("/");
    18 
    19         Connection connection = factory.newConnection();
    20         Channel channel = connection.createChannel();
    21 
    22         channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
    23         //声明一个交换机
    24         channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
    25 
    26         channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
    27 
    28         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    29 
    30 
    31 
    32             /**
    33              * toString方法,接受到方法此方法调用
    34              * @param consumerTag:消费者标签,用来标识消费者
    35              * @param envelope:信封,
    36              * @param properties:消息属性
    37              * @param body:消费内容
    38              * @throws IOException
    39              */
    40 
    41             @Override
    42             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    43 
    44                 String exchange = envelope.getExchange();
    45 
    46                 long deliveryTag = envelope.getDeliveryTag();
    47 
    48                 String message = new String(body,"utf-8");
    49                 System.out.println("receive message:" + message);
    50 
    51             }
    52         };
    53 
    54         /**
    55          * 1.queue 队列名称
    56          * 2.autoack 自动回复
    57          * 3. callback 消费方法,当消费者接收到的消息执行的方法
    58          */
    59         channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    60 
    61     }
    62 
    63 }

    Topics

    通配符模式

    1、每个消费者监听自己的队列,并且设置带统配符的routingkey
    2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

    Topic模式更多加强大,它可以实现Routingpublish/subscirbe模式的功能。

    案例: 根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种
    通知类型都接收的则两种通知都有效

    生产者

      1 public class Producer04 {
      2 
      3     private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
      4     private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
      5     private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
      6     private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
      7     private static final String ROUTINGKEY_SMS="inform.#.sms.#";
      8 
      9 
     10 
     11     public static void main(String[] args) {
     12         //通过连接工厂
     13         ConnectionFactory connectionFactory = new ConnectionFactory();
     14         connectionFactory.setHost("127.0.0.1");
     15         connectionFactory.setPort(5672);
     16         connectionFactory.setUsername("guest");
     17         connectionFactory.setPassword("guest");
     18 
     19         //设置虚拟机,一个mq设置多个虚拟机
     20         connectionFactory.setVirtualHost("/");
     21         Connection connection = null;
     22         Channel channel = null;
     23 
     24         try {
     25             connection = connectionFactory.newConnection();
     26             channel = connection.createChannel();
     27 
     28             /**
     29              * QueryDeclare
     30              * 1.query 队列名称
     31              * 2.durable 是否持久化
     32              * 3.exclusive 是否独占链路
     33              * 4.autoDelete 自动删除
     34              * 5.argument 扩展参数
     35              *
     36              */
     37             //声明两个队列
     38             channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
     39             channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
     40 
     41             //声明一个交换机
     42 
     43             /**
     44              * String exchange String type
     45              * 1.交换机的名称
     46              * 2.交换机的类型
     47              *fanout 对应的工作模式 publish/subscribe
     48              * direct 对应Routing
     49              * topic 对应Topics
     50              * headers 对应headers
     51              */
     52             channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
     53             //交换机和队列进行绑定
     54             /** 1.queue 队列名称
     55              * 2.exchange 交换机名称
     56              * 3.routingKey 在发布订阅模式中为空字符串,交换机根据路由key的值将消息转发到指定队列
     57              */
     58             channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
     59             channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
     60 
     61 
     62 
     63             /**
     64              * 参数明细channel.basicPublish()
     65              * 1.exchange 交换机,不指定为"".
     66              * 2.routingkey 路由key 交换机根据路由key来将消息转发到指定队列
     67              * 3.props 消息的属性
     68              * 4.body 消息内容
     69              */
     70             for (int i = 0;i<5;i++){
     71                 String message = "send email inform to user";
     72                 channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
     73                 System.out.println("send to mq"+message);
     74             }
     75 
     76             for (int i = 0;i<5;i++){
     77                 String message = "send sms inform to user";
     78                 channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
     79                 System.out.println("send to mq"+message);
     80             }
     81             for (int i = 0;i<5;i++){
     82                 String message = "send sms and email inform to user";
     83                 channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email.sms",null,message.getBytes());
     84                 System.out.println("send to mq"+message);
     85             }
     86 
     87 
     88 
     89         } catch (Exception e) {
     90             e.printStackTrace();
     91         } finally {
     92             //关闭连接
     93             //关闭通道
     94             try {
     95                 channel.close();
     96             } catch (IOException e) {
     97                 e.printStackTrace();
     98             } catch (TimeoutException e) {
     99                 e.printStackTrace();
    100             }
    101             try {
    102                 connection.close();
    103             } catch (IOException e) {
    104                 e.printStackTrace();
    105             }
    106         }
    107 
    108     }
    109 
    110 }

    消费者

    队列绑定交换机指定通配符:
    统配符规则:
    中间以“.”分隔。
    符号#可以匹配多个词,符号*可以匹配一个词语。

    消费者email

     1 public class Consumer04_topics_email {
     2 
     3     private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
     4     private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
     5     private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
     6 
     7 
     8     public static void main(String[] args) throws IOException,TimeoutException {
     9         ConnectionFactory factory = new ConnectionFactory();
    10     //设置MabbitMQ所在服务器的ip和端口
    11         factory.setHost("127.0.0.1");
    12         factory.setPort(5672);
    13         factory.setUsername("guest");
    14         factory.setPassword("guest");
    15         factory.setVirtualHost("/");
    16 
    17         Connection connection = factory.newConnection();
    18         Channel channel = connection.createChannel();
    19 
    20         channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
    21         //声明一个交换机
    22         channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
    23         channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
    24 
    25         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    26 
    27             /**
    28              * toString方法,接受到方法此方法调用
    29              * @param consumerTag:消费者标签,用来标识消费者
    30              * @param envelope:信封,
    31              * @param properties:消息属性
    32              * @param body:消费内容
    33              * @throws IOException
    34              */
    35 
    36             @Override
    37             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    38 
    39                 String exchange = envelope.getExchange();
    40 
    41                 long deliveryTag = envelope.getDeliveryTag();
    42 
    43                 String message = new String(body,"utf-8");
    44                 System.out.println("receive message:" + message);
    45 
    46             }
    47         };
    48 
    49 
    50 
    51 
    52         /**监听队列
    53          * 1.queue 队列名称
    54          * 2.autoack 自动回复
    55          * 3. callback 消费方法,当消费者接收到的消息执行的方法
    56          */
    57         channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    58 
    59     }
    60 
    61 
    62 
    63 }

    消费者 sms

     1 public class Consumer04_topics_sms {
     2 
     3     private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
     4     private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
     5     private static final String ROUTINGKEY_SMS="inform.#.sms.#";
     6 
     7 
     8     public static void main(String[] args) throws IOException,TimeoutException {
     9         ConnectionFactory factory = new ConnectionFactory();
    10     //设置MabbitMQ所在服务器的ip和端口
    11         factory.setHost("127.0.0.1");
    12         factory.setPort(5672);
    13         factory.setUsername("guest");
    14         factory.setPassword("guest");
    15         factory.setVirtualHost("/");
    16 
    17         Connection connection = factory.newConnection();
    18         Channel channel = connection.createChannel();
    19 
    20         channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
    21         //声明一个交换机
    22         channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
    23         channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
    24 
    25         DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    26 
    27             /**
    28              * toString方法,接受到方法此方法调用
    29              * @param consumerTag:消费者标签,用来标识消费者
    30              * @param envelope:信封,
    31              * @param properties:消息属性
    32              * @param body:消费内容
    33              * @throws IOException
    34              */
    35 
    36             @Override
    37             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    38 
    39                 String exchange = envelope.getExchange();
    40 
    41                 long deliveryTag = envelope.getDeliveryTag();
    42 
    43                 String message = new String(body,"utf-8");
    44                 System.out.println("receive message:" + message);
    45 
    46             }
    47         };
    48 
    49 
    50 
    51 
    52         /**监听队列
    53          * 1.queue 队列名称
    54          * 2.autoack 自动回复
    55          * 3. callback 消费方法,当消费者接收到的消息执行的方法
    56          */
    57         channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
    58 
    59 
    60 
    61     }

    header模式

    routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配
    队列

    rpc模式

    RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
    1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
    2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
    3、服务端将RPC方法 的结果发送到RPC响应队列
    4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

    Springboot 整合rabbitmq

    添加依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter‐amqp</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter‐test</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>

    application.yml

    server:
        port: 44000
    spring:
        application:
            name: test‐rabbitmq‐producer
        rabbitmq:
            host: 127.0.0.1
            port: 5672
            username: guest
            password: guest
            virtualHost: /            

     定义RabbitConfig类,配置ExchangeQueue、及绑定交换机

     1 @Configuration
     2 public class RabbitmqConfig {
     3 
     4     public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
     5     public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
     6     public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
     7     public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
     8     public static final String ROUTINGKEY_SMS="inform.#.sms.#";
     9 
    10     //声明交换机
    11     @Bean(EXCHANGE_TOPICS_INFORM)
    12     public Exchange EXCHANGE_TOPICS_INFORM(){
    13         //durable(true) 持久化,mq重启之后交换机还在
    14         return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    15     }
    16 
    17     //声明QUEUE_INFORM_EMAIL队列
    18     @Bean(QUEUE_INFORM_EMAIL)
    19     public Queue QUEUE_INFORM_EMAIL(){
    20         return new Queue(QUEUE_INFORM_EMAIL);
    21     }
    22     //声明QUEUE_INFORM_SMS队列
    23     @Bean(QUEUE_INFORM_SMS)
    24     public Queue QUEUE_INFORM_SMS(){
    25         return new Queue(QUEUE_INFORM_SMS);
    26     }
    27 
    28     //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    29     @Bean
    30     public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
    31                                               @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
    32         return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    33     }
    34     //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    35     @Bean
    36     public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
    37                                           @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
    38         return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    39     }
    40 
    41 }

    生产者

    使用RabbitmqTemplate

     1 @SpringBootTest
     2 @RunWith(SpringRunner.class)
     3 public class Producer05 {
     4 
     5     @Autowired
     6     RabbitTemplate rabbitTemplate;
     7 
     8     @Test
     9     public void testSendEmail(){
    10 
    11         String message = "send email message to user";
    12         /**
    13          * 参数:
    14          * 1、交换机名称
    15          * 2、routingKey
    16          * 3、消息内容
    17          */
    18         rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.email",message);
    19 
    20     }
    21 
    22 }

    消费者

     1 @Component
     2 public class Receivehander {
     3 
     4     @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
     5     public void send_email(String msg, Message message, Channel channel){
     6         System.out.println(msg);
     7     }
     8 
     9 
    10 
    11 }



     

     

  • 相关阅读:
    [mysql] 删除唯一约束unique
    onethink 路由规则无效问题解决
    mysql source 乱码
    NLPIR
    词性标记集--计算所汉语
    [thinkphp] 无限极分类
    UITableViewCell在非Nib及Cell重用下设置CellStyle
    UIViewController的初始化
    转:NSString / NSData / char* 类型之间的转换
    转:苹果Xcode帮助文档阅读指南
  • 原文地址:https://www.cnblogs.com/quyangyang/p/10995992.html
Copyright © 2011-2022 走看看