zoukankan      html  css  js  c++  java
  • Java使用RabbitMQ之订阅分发(Topic)

            使用RabbitMQ进行消息发布和订阅,生产者将消息发送给转发器(exchange),转发器根据路由键匹配已绑定的消息队列并转发消息,主题模式支持路由键的通配。

    生产者代码:

     1 package org.study.exchange3.topic3;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import org.junit.Test;
     6 import org.study.utils.ConnectionUtils;
     7 
     8 import java.io.IOException;
     9 import java.util.concurrent.TimeoutException;
    10 
    11 /**
    12  * topic-主题模式(分发订阅)
    13  * exchange只转发消息,但是没有存储能力,只有队列才有存储能力
    14  * 主题模式支持路由键的通配符
    15  * “#”表示0个或若干个关键字,“*”表示一个关键字。
    16  */
    17 public class Sender {
    18     public static final String QUEUE_NAME = "test_topic_queue";
    19     public static final String EXCHANGE_NAME = "topic_exchange";
    20 
    21     @Test
    22     public void send() throws IOException, TimeoutException, InterruptedException {
    23         // 获取连接
    24         Connection conn = ConnectionUtils.getConnection();
    25         // 获取通道
    26         Channel channel = conn.createChannel();
    27 //        //创建队列
    28 //        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    29         //声明转发器
    30         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    31         //每个消费者发送确认消息前,只发送一条消息
    32         channel.basicQos(1);
    33 
    34         String msg = "hello rabbitmq topic !";
    35         //发送消息至转发器,指定路由键
    36         channel.basicPublish(EXCHANGE_NAME, "key.key", null, msg.getBytes());
    37         System.out.println("[send] msg " + msg);
    38 
    39         channel.close();
    40         conn.close();
    41     }
    42 }

    消费者代码:

     1 package org.study.exchange3.topic3;
     2 
     3 import com.rabbitmq.client.*;
     4 import org.junit.Test;
     5 import org.study.utils.ConnectionUtils;
     6 
     7 import java.io.IOException;
     8 import java.util.concurrent.TimeoutException;
     9 
    10 /**
    11  * 主题模式-接收消息
    12  */
    13 public class Recv {
    14     public static final String QUEUE_NAME = "test_topic_queue";
    15     public static final String EXCHANGE_NAME = "topic_exchange";
    16 
    17     @Test
    18     public void recv() throws IOException, TimeoutException, InterruptedException {
    19         Connection conn = ConnectionUtils.getConnection();
    20         Channel channel = conn.createChannel();
    21         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    22         channel.basicQos(1);
    23         /*
    24          * 队列绑定转发器,路由键通配符#和*
    25          * #:表示0个或多个字符
    26          * *:表示一个字符
    27          * */
    28         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#");
    29 
    30         //定义消费者
    31         DefaultConsumer consumer = new DefaultConsumer(channel) {
    32             //重写获取到达消息
    33             @Override
    34             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    35                 String msg = new String(body, "utf-8");
    36                 System.out.println("[1] recv: " + msg);
    37 
    38                 try {
    39                     Thread.sleep(100);
    40                 } catch (InterruptedException e) {
    41                     e.printStackTrace();
    42                 } finally {
    43                     System.out.println("[1] done!");
    44                     // 回执
    45                     channel.basicAck(envelope.getDeliveryTag(), false);
    46                 }
    47             }
    48         };
    49 
    50         while (true) {
    51             //监听队列
    52             channel.basicConsume(QUEUE_NAME, false, consumer);
    53             Thread.sleep(1000);
    54         }
    55 
    56     }
    57 }
  • 相关阅读:
    数据结构——数据结构的起源和研究内容
    数据结构——学习数据结构的意义
    C++中动态内存申请的结果
    C++中函数异常规格的说明
    C++异常处理的深入理解
    NOIP 2012 Day2
    NOIP 2012 Day1
    NOIP 2011 Day2
    NOIP 2011 Day 1
    NOIP 2010
  • 原文地址:https://www.cnblogs.com/gongxr/p/9646577.html
Copyright © 2011-2022 走看看