zoukankan      html  css  js  c++  java
  • RabbitMQ

    这次我们试试publish / subscribe模式,
    也就是将一个消息发送给多个consumer。

    这里用一个简单的小程序来说明publish / subscribe。
    由一个provider提供消息,这个消息会被多个consumer接收。
    consumer对同一个消息做出不同的反应,比如打印、保存到文件、数据库什么的。

    之前的例子可能会给人这种感觉:
    producer将消息发送到队列中,消息缓冲在队列中,consumer从队列获得消息。

    但这并不正确。
    在rabbit中,producer从来不会直接将消息发送到队列中。
    producer根本无从得知消息是否会发送到某个队列中。

    事实上,producer只能将消息发送到exchange中。
    这么一说虽然感觉多了个东西,但exchange并不复杂。
    exchange只是从producer获取消息并将消息推送到队列中。

    但为什么多了这么个步骤?
    比如exchange收到消息后,它应该将消息推送给某个特定的队列? 或者可以将消息推送给多个队列? 再或者直接抛弃该消息?
    这些规则取决于exchange的类型。

    以下是一些可用的exchange type(org.springframework.amqp.core.ExchangeTypes):

    public static final String DIRECT = "direct";
    public static final String TOPIC = "topic";
    public static final String FANOUT = "fanout";
    public static final String HEADERS = "headers";
    public static final String SYSTEM = "system";

    我们可以用以下方式定义一个exchange:

    channel.exchangeDeclare("logs", "fanout");

    正如其名,fanout就是将收到的消息发送给所有可访问的队列。

    如何查看已定义的exchange?
    查看已定义的exchange,我们可以用rabbitmqctl list_exchanges命令,如图:

    图中名为amq.*和没有名字的exchange都是默认自带的。
    (PS:之前的例子中我们还没有用到exchange的概念,但仍然成功地将消息发送到了队列中。
    这是因为我们使用的是默认的exchange。)

    我们需要将消息发送到指定的exchange中。
    basicPublish的第一个参数就是exchange的名称(重写的几个都是)。
    空字符串表示默认的exchange:

    channel.basicPublish( "logs", "", null, message.getBytes());

    队列的命名很重要,比如多个worker共享一个队列,producer和consumer的关系用队列名维系。
    但并不是所有的场景都需要我们亲自去命名。
    比如我们需要获得所有消息,而不是它的某个子集。
    或者我们更关心最新的消息,而不是更早放到队列的那些。

    我们需要让server随机命名队列,并且队列在consumer连接断开时自动删除。

    我们只需要一行代码来做这些:

    String queueName = channel.queueDeclare().getQueue();

    调用不带参数的queueDeclare()可以创建一个临时队列。


    到此我们就创建好了exchange和队列。
    我们需要用什么东西将他们联系起来,这个东西叫作"binding"。

    通过以下代码将他们联系起来:

    channel.queueBind(queueName, "logs", "");

    正如查看exchange那样,我们可以用rabbitmqctl list_bindings命令查看binding。
    如图:

    从producer到queue的关系图如下:

    写了个Channel静态工厂,写的不好。
    我打算在静态初始化块中定义两个exchange:

    final class ChannelFactory {
         
        //consumer的temporary queue与这两个exchange绑定
        final static String EXCHANGE_NAME = "log";
        final static String EXCHANGE_NAME_ = "log2";
         
        private static final ConnectionFactory factory = new ConnectionFactory();
         
        static{
            try {
                Channel temp = getChannel();
                temp.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
                temp.exchangeDeclare(EXCHANGE_NAME_, ExchangeTypes.FANOUT);
                closeChannel(temp);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
     
        private ChannelFactory() {
        }
     
        public static Channel getChannel() {
            try {
                return factory.newConnection().createChannel();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
     
        public static Channel getChannel(int channelNumber) {
            try {
                return factory.newConnection().createChannel();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
     
        public static void closeChannel(Channel channel) {
            try {
                channel.close();
                channel.getConnection().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
         
    }

    producer类,同一个producer给两个exchange发消息:

    class Publisher {
        public static void main(String[] args) throws IOException {
     
            Channel channel = ChannelFactory.getChannel();
     
            String message = "Here is the content";
            channel.basicPublish(ChannelFactory.EXCHANGE_NAME, StringUtils.EMPTY, null,
                    ("EXCHANGE_NAME 1:::"+message).getBytes());
            channel.basicPublish(ChannelFactory.EXCHANGE_NAME_, StringUtils.EMPTY, null,
                    ("EXCHANGE_NAME 2:::"+message).getBytes());
             
            ChannelFactory.closeChannel(channel);
        }
    }

    consumer类,临时队列需要和两个exchange进行绑定:

    class Logger {
        public static void main(String[] args) throws IOException,
                ShutdownSignalException, ConsumerCancelledException,
                InterruptedException {
            Channel channel = ChannelFactory.getChannel();
     
            String queue = channel.queueDeclare().getQueue();
            System.out.println("temporary queue name::"+queue);
     
            channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME, "");
            channel.queueBind(queue, ChannelFactory.EXCHANGE_NAME_, "");
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(queue, true, consumer);
     
            while (true) {
                System.out.println(new String(consumer.nextDelivery().getBody()));
            }
        }
    }

    由于使用的是临时队列,需要先运行consumer再运行producer。
    运行结果输出:

  • 相关阅读:
    ASP.NET Core 发布 centos7 配置守护进程
    AutoMapper在asp.netcore中的使用
    git忽略文件并删除git仓库中的文件
    Animate.css 一款牛逼的css3动画库
    URL中特殊符号的处理
    efcore 配置链接sqlserver
    简单抓取小程序大全,并展示。
    UEditor上传图片到七牛C#(后端实现)
    软件项目管理三国启示录01 群雄争霸之项目经理的自我修养
    【调侃】IOC前世今生
  • 原文地址:https://www.cnblogs.com/kavlez/p/4100117.html
Copyright © 2011-2022 走看看