zoukankan      html  css  js  c++  java
  • RabbitMQ入门-发布订阅模式

    兔子的Publish/Subscribe是这样的:

    有个生产者PX代表交换机,交换机绑定队列,消费者从队列中取得消息。每次有消息,先发到交换机中,然后由交换机负责发送到它已知的队列中。

    生产者代码:

    package com.example.demo;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     * 4种交换类型
     * There are a few exchange types available: direct, topic, headers and fanout.
     * 扇出交换:将收到的消息广播到它所知道的所有队列里
     */
    public class PSSend {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            // 当我们发送时,需要一个路由密钥,但是对于扇出交换,他的值将被忽略
            // 第一个参数为交换的名字,第二个为交换的类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            String msg = "发布订阅";
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
            System.out.println("PS-Send:" + msg);
    
            channel.close();
            connection.close();
    
        }
    
    }

     消费者:

    package com.example.demo;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 
     *
     * There are a few exchange types available: direct, topic, headers and fanout.
     * 扇出交换:将收到的消息广播到它所知道的所有队列里
     */
    public class PSReceive {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
            factory.setHost("localhost");
            Connection connection = factory.newConnection();        // 获取连接
            Channel channel = connection.createChannel();
    
            // 当我们发送时,需要一个路由密钥,但是对于扇出交换,他的值将被忽略
            // 第一个参数为交换的名字,第二个为交换的类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 当声明队列,不加任何参数,产生的将是一个临时队列,getQueue返回的是队列名称
            String queueA = channel.queueDeclare().getQueue();
            //String queueB = channel.queueDeclare().getQueue();
            System.out.println("临时队列:" + queueA);
    
            // 下面绑定交换与队列
            channel.queueBind(queueA, EXCHANGE_NAME, "");
            //channel.queueBind(queueB, EXCHANGE_NAME, "");
    
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String recv = new String(body, "UTF-8");
                    System.out.println("PS-Receive:" + recv);
                }
            };
    
            channel.basicConsume(queueA, true, consumer);
            //channel.basicConsume(queueB, true, consumer);
    
    
        }
    
    }

     启动消费者和生产者,控制台打印 

     

  • 相关阅读:
    HTTP协议--详解
    汇编语言前五章总结
    [转] vscode C/C++ 插件预定义环境变量(linux)
    C++小细节
    CC++ 如何确定一个变量的类型(恶心的指针)
    Ubuntu下安装tensorflow
    github 上不去
    应用层01-HTTP
    C++ 传递数组引用
    C++ 命令行窗口打印二叉树(图形)
  • 原文地址:https://www.cnblogs.com/LUA123/p/8472371.html
Copyright © 2011-2022 走看看