zoukankan      html  css  js  c++  java
  • RabbitMQ Pub/Sub订阅模式


    实现一个生产者

    package com.mq.mqproducer;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 发送消息
     */
    public class Producer_PubSub {
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //主机地址;默认为 localhost
            connectionFactory.setHost("localhost");
            //连接端口;默认为 5672
            connectionFactory.setPort(5672);
            //虚拟主机名称;默认为 /
            connectionFactory.setVirtualHost("/");
            //连接用户名;默认为guest
            connectionFactory.setUsername("guest");
            //连接密码;默认为guest
            connectionFactory.setPassword("guest");
    
            //创建连接
            Connection connection = null;
            connection = connectionFactory.newConnection();
    
            // 创建频道
            Channel channel = null;
    
            channel = connection.createChannel();
    
           /*
    
           exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
           参数:
            1. exchange:交换机名称
            2. type:交换机类型
                DIRECT("direct"),:定向
                FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
                TOPIC("topic"),通配符的方式
                HEADERS("headers");参数匹配
    
            3. durable:是否持久化
            4. autoDelete:自动删除
            5. internal:内部使用。 一般false
            6. arguments:参数
            */
    
            String exchangeName = "test_fanout";
            //5. 创建交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
            //6. 创建队列
            String queue1Name = "test_fanout_queue1";
            String queue2Name = "test_fanout_queue2";
            channel.queueDeclare(queue1Name,true,false,false,null);
            channel.queueDeclare(queue2Name,true,false,false,null);
            //7. 绑定队列和交换机
            /*
            queueBind(String queue, String exchange, String routingKey)
            参数:
                1. queue:队列名称
                2. exchange:交换机名称
                3. routingKey:路由键,绑定规则
                    如果交换机的类型为fanout ,routingKey设置为""
             */
            channel.queueBind(queue1Name,exchangeName,"");
            channel.queueBind(queue2Name,exchangeName,"");
    
            String body = "日志信息:张三调用了findAll方法...日志级别:info...";
            //8. 发送消息
            channel.basicPublish(exchangeName,"",null,body.getBytes());
    
            //9. 释放资源
    //        channel.close();
    //        connection.close();
    
        }
    }
    
    

    实现多个消费者(这里的例子实现两个消费者)

    消费者1

    package com.mq.mqconsumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_PubSub1 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //主机地址;默认为 localhost
            connectionFactory.setHost("localhost");
            //连接端口;默认为 5672
            connectionFactory.setPort(5672);
            //虚拟主机名称;默认为 /
            connectionFactory.setVirtualHost("/");
            //连接用户名;默认为guest
            connectionFactory.setUsername("guest");
            //连接密码;默认为guest
            connectionFactory.setPassword("guest");
    
            //创建连接
            Connection connection = null;
            connection = connectionFactory.newConnection();
    
            // 创建频道
            Channel channel = null;
    
            channel = connection.createChannel();
    
    
            String queue1Name = "test_fanout_queue1";
            String queue2Name = "test_fanout_queue2";
    
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息打印到控制台.....");
                }
            };
            channel.basicConsume(queue1Name,true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    

    消费者2

    package com.mq.mqconsumer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer_PubSub2 {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //主机地址;默认为 localhost
            connectionFactory.setHost("localhost");
            //连接端口;默认为 5672
            connectionFactory.setPort(5672);
            //虚拟主机名称;默认为 /
            connectionFactory.setVirtualHost("/");
            //连接用户名;默认为guest
            connectionFactory.setUsername("guest");
            //连接密码;默认为guest
            connectionFactory.setPassword("guest");
    
            //创建连接
            Connection connection = null;
            connection = connectionFactory.newConnection();
    
            // 创建频道
            Channel channel = null;
    
            channel = connection.createChannel();
    
    
            String queue1Name = "test_fanout_queue1";
            String queue2Name = "test_fanout_queue2";
    
    
            /*
            basicConsume(String queue, boolean autoAck, Consumer callback)
            参数:
                1. queue:队列名称
                2. autoAck:是否自动确认
                3. callback:回调对象
    
             */
            // 接收消息
            Consumer consumer = new DefaultConsumer(channel){
                /*
                    回调方法,当收到消息后,会自动执行该方法
    
                    1. consumerTag:标识
                    2. envelope:获取一些信息,交换机,路由key...
                    3. properties:配置信息
                    4. body:数据
    
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  /*  System.out.println("consumerTag:"+consumerTag);
                    System.out.println("Exchange:"+envelope.getExchange());
                    System.out.println("RoutingKey:"+envelope.getRoutingKey());
                    System.out.println("properties:"+properties);*/
                    System.out.println("body:"+new String(body));
                    System.out.println("将日志信息保存数据库2.....");
                }
            };
            channel.basicConsume(queue2Name,true,consumer);
    
    
            //关闭资源?不要
    
        }
    }
    
    
  • 相关阅读:
    基于MQTT协议进行应用开发
    shell ss命令
    组播基本概念、IGMP、IGMP监听学习笔记
    关于Android的HAL的一些理解
    Android HAL层与Linux Kernel层驱动开发简介
    YPBPR_PC下图像有毛刺或者水纹干扰的处理办法
    CHAKRA3 UART2
    实际用户ID和有效用户ID (三) *****
    svn使用svnsync实现双机热备
    pyspider介绍及安装
  • 原文地址:https://www.cnblogs.com/lyd447113735/p/14678923.html
Copyright © 2011-2022 走看看