zoukankan      html  css  js  c++  java
  • RabbitMQ

    这次我们试试publish / subscribe模式,
    也就是将一个消息发送给多个consumer。

    这里用一个简单的小程序来说明publish / subscribe。
    由一个provider提供消息,这个消息会被多个consumer接收。
    consumer对同一个消息做出不同的反应,比如打印、保存到文件、数据库什么的。

    之前的例子可能会给人这种感觉:
    producer将消息发送到队列中,消息缓冲在队列中,consumer从队列获得消息。

    但这并不正确。
    在rabbit中,producer从来不会直接将消息发送到队列中。
    producer根本无从得知消息是否会发送到某个队列中。

    事实上,producer只能将消息发送到exchange中。
    这么一说虽然感觉多了个东西,但exchange并不复杂。
    exchange只是从producer获取消息并将消息推送到队列中。

    但为什么多了这么个步骤?
    比如exchange收到消息后,它应该将消息推送给某个特定的队列? 或者可以将消息推送给多个队列? 再或者直接抛弃该消息?
    这些规则取决于exchange的类型。

    以下是一些可用的exchange type(org.springframework.amqp.core.ExchangeTypes):

    public static final String DIRECT = "direct";
    public static final String TOPIC = "topic";
    public static final String FANOUT = "fanout";
    public static final String HEADERS = "headers";
    public static final String SYSTEM = "system";

    我们可以用以下方式定义一个exchange:

    channel.exchangeDeclare("logs", "fanout");

    正如其名,fanout就是将收到的消息发送给所有可访问的队列。

    如何查看已定义的exchange?
    查看已定义的exchange,我们可以用rabbitmqctl list_exchanges命令,如图:

    图中名为amq.*和没有名字的exchange都是默认自带的。
    (PS:之前的例子中我们还没有用到exchange的概念,但仍然成功地将消息发送到了队列中。
    这是因为我们使用的是默认的exchange。)

    我们需要将消息发送到指定的exchange中。
    basicPublish的第一个参数就是exchange的名称(重写的几个都是)。
    空字符串表示默认的exchange:

    channel.basicPublish( "logs", "", null, message.getBytes());

    队列的命名很重要,比如多个worker共享一个队列,producer和consumer的关系用队列名维系。
    但并不是所有的场景都需要我们亲自去命名。
    比如我们需要获得所有消息,而不是它的某个子集。
    或者我们更关心最新的消息,而不是更早放到队列的那些。

    我们需要让server随机命名队列,并且队列在consumer连接断开时自动删除。

    我们只需要一行代码来做这些:

    String queueName = channel.queueDeclare().getQueue();

    调用不带参数的queueDeclare()可以创建一个临时队列。


    到此我们就创建好了exchange和队列。
    我们需要用什么东西将他们联系起来,这个东西叫作"binding"。

    通过以下代码将他们联系起来:

    channel.queueBind(queueName, "logs", "");

    正如查看exchange那样,我们可以用rabbitmqctl list_bindings命令查看binding。
    如图:

    从producer到queue的关系图如下:

    写了个Channel静态工厂,写的不好。
    我打算在静态初始化块中定义两个exchange:

    final class ChannelFactory {
         
        //consumer的temporary queue与这两个exchange绑定
        final static String EXCHANGE_NAME = "log";
        final static String EXCHANGE_NAME_ = "log2";
         
        private static final ConnectionFactory factory = new ConnectionFactory();
         
        static{
            try {
                Channel temp = getChannel();
                temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
                temp.exchangeDeclare(EXCHANGE_NAME_, ExchangeTypes.FANOUT);
                closeChannel(temp);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
     
        private ChannelFactory() {
        }
     
        public static Channel getChannel() {
            try {
                return factory.newConnection().createChannel();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
     
        public static Channel getChannel(int channelNumber) {
            try {
                return factory.newConnection().createChannel();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
     
        public static void closeChannel(Channel channel) {
            try {
                channel.close();
                channel.getConnection().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
         
    }

    producer类,同一个producer给两个exchange发消息:

    class Publisher {
        public static void main(String[] args) throws IOException {
     
            Channel channel = ChannelFactory.getChannel();
     
            String message = "Here is the content";
            channel.basicPublish(ChannelFactory.EXCHANGE_NAME, StringUtils.EMPTY, null,
                    ("EXCHANGE_NAME 1:::"+message).getBytes());
            channel.basicPublish(ChannelFactory.EXCHANGE_NAME_, StringUtils.EMPTY, null,
                    ("EXCHANGE_NAME 2:::"+message).getBytes());
             
            ChannelFactory.closeChannel(channel);
        }
    }

    consumer类,临时队列需要和两个exchange进行绑定:

    class Logger {
        public static void main(String[] args) throws IOException,
                ShutdownSignalException, ConsumerCancelledException,
                InterruptedException {
            Channel channel = ChannelFactory.getChannel();
     
            String queue = channel.queueDeclare().getQueue();
            System.out.println("temporary queue name::"+queue);
     
            channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME, "");
            channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME_, "");
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queue, true, consumer);
     
            while (true) {
                System.out.println(new String(consumer.nextDelivery().getBody()));
            }
        }
    }

    由于使用的是临时队列,需要先运行consumer再运行producer。
    运行结果输出:

  • 相关阅读:
    1451. Rearrange Words in a Sentence
    1450. Number of Students Doing Homework at a Given Time
    1452. People Whose List of Favorite Companies Is Not a Subset of Another List
    1447. Simplified Fractions
    1446. Consecutive Characters
    1448. Count Good Nodes in Binary Tree
    709. To Lower Case
    211. Add and Search Word
    918. Maximum Sum Circular Subarray
    lua 时间戳和时间互转
  • 原文地址:https://www.cnblogs.com/kavlez/p/4100117.html
Copyright © 2011-2022 走看看