zoukankan      html  css  js  c++  java
  • 消息队列之RabbitMQ

    消息队列之activeMQ

    消息队列之kafka

    1.rabbitMQ介绍

    rabbitMQ是由erlang语言开发的,基于AMQP协议实现的消息队列。他是一种应用程序之间的通信方法,在分布式系统开发中应用非常广泛。

    rabbitMq的有点:

    1. 使用简单,功能强大
    2. 基于AMQP协议
    3. 社区活跃,文档完善
    4. 高并发性能好,erlang语言是专门用于开发高并发程序的
    5. springBoot默认集成rabbitMq

    AMQP(advanced Message Queuing Protocol),是一个提供统一消息服务的应用标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件的产品不同和开发语言不同的限制。JMS和AMQP的区别在于:JMS是java语言专属的消息服务标准,他是在api层定义标准,并且只能用于java应用,而AMQP是在协议层定义的标准,是可以跨语言的。

    2.工作流程

    发送消息:

    1. 生产者和broker建立TCP连接
    2. 生产者和broker建立通道
    3. 生产者通过通道消息发送给broker,由exchange将消息转发
    4. exchange将消息转发给指定的queue

    接受消息:

    1. 消费者和broker建立TCP连接
    2. 消费者和broker建立通道
    3. 消费者监听指定的queue
    4. 当有消息到达queue的时候broker默认将消息推送给消费者
    5. 消费者接受到消息并消费

    3.安装

    如果不想自己下载,需要我这里的软件的,可以在下面评论邮箱,我私发给你。

    1.安装erlang的环境,双击otp的运行程序,然后一路点击下一步(next)。

    配置环境变量

    在path中添加erlang的路径

    2.安装rabbitMq,双击rabbitmq的运行程序

    安装完成之后在菜单页面可以看到

    安装完RabbitMQ如果想要访问管理页面需要在rabbitmq的sbin目录中使用cmd执行:rabbitmq-plugins.bat enable rabbitmq_management(管理员身份运行此命令)添加可视化插件。

    点击上图中的start/stop来开启/停止服务。然后在浏览器上输入地址查看,rabbitMq的默认端口是15672。默认的用户名和密码都是guest

    如果安装失败,需要卸载重装的时候或者出现rabbitMq服务注册失败时,此时需要进入注册表清理erlang(搜索rabbitMQ,erlsrv将对应的项删除)

    4.代码实现

    1.添加依赖

    <!--添加rabbitMq的依赖-->
    <dependency>
    	<groupId>com.rabbitmq</groupId>
    	<artifactId>amqp-client</artifactId>
    	<version>5.7.3</version>
    </dependency>
    

    2.生产者代码实现

    package rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.net.ConnectException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @className: producer
     * @description: rabbitmq的生产者代码实现
     * @author: charon
     * @create: 2021-01-03 23:10
     */
    public class Producer {
        /**
         * 声明队列名
         */
        private static final String QUEUE = "hello charon";
    
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
            connectionFactory.setHost("127.0.0.1");
            // web端口默认为15672,通信端口为5672
            connectionFactory.setPort(5672);
            // 设置用户名和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            Connection connection = null;
            Channel channel = null;
            try {
                connection = connectionFactory.newConnection();
                // 创建通道
                channel = connection.createChannel();
                // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
                channel.queueDeclare(QUEUE, true, false, false, null);
                String message = "hello charon good evening";
                // 发布消息(交换机,RoutingKey即队列名,额外的消息属性,消息内容)
                channel.basicPublish("", QUEUE, null, message.getBytes());
                System.out.println("发送消息给mq:" + message);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                // 关闭资源
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    3.消费者代码实现

    package rabbitmq;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @className: Consumer
     * @description: 消费者的代码实现
     * @author: charon
     * @create: 2021-01-05 08:28
     */
    public class Consumer {
        /**
         * 声明队列名
         */
        private static final String QUEUE = "hello charon";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
            connectionFactory.setHost("127.0.0.1");
            // web端口默认为15672,通信端口为5672
            connectionFactory.setPort(5672);
            // 设置用户名和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            Connection connection = connectionFactory.newConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE, true, false, false, null);
            // 实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 *
                 * @param consumerTag 消费者标签
                 * @param envelope 信封,可以获取交换机等信息
                 * @param properties 消息属性
                 * @param body 消费内容,字节数组,可以转成字符串
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //                String exchange = envelope.getExchange();
    //                long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body,"utf-8");
                    System.out.println("收到的消息是:"+message);
                }
            };
            // 消费消息(队列名,是否自动确认,消费方法)
            channel.basicConsume(QUEUE,true,defaultConsumer);
        }
    }
    

    5.rabbitMq的工作模式

    1. Work queues 工作队列(资源竞争)

    ​ 生产者将消息放入到队列中,消费者可以有多个,同时监听同一个队列。如上图,消费者c1,c2共同争抢当前消息队列的内容,谁先拿到谁负责消费消息,缺点是在高并发的情况下,默认会产品一个消息被多个消费者共同使用,可以设置一个锁开关,保证一条消息只能被一个消费者使用。

    上面的代码,可以再添加一个消费者,这样就可以实现工作队列的工作模式。

    2.Publish/Subscribe 发布订阅(共享资源)

    X代表rabbitMq内部组件交换机,生产者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应的消费者拿到消息进行消费,对比工作队列而言,发布订阅可以实现工作队列的功能,但是比工作队列更强大。

    特点:
    1.每个消费者监听自己的队列
    2.生产者将消息发送给Broker,由交换机将消息转发到绑定的此交换机的每个队列,每个绑定交换机的队列都将接收到消息;

    生产者:

    package rabbitmq.publish;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @className: Producer
     * @description: 发布订阅的生产者
     * @author: charon
     * @create: 2021-01-07 22:02
     */
    public class Producer {
    
        /**邮件的队列*/
        public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    
        /**短信的队列*/
        public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    
        /**交换机*/
        public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
    
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
            connectionFactory.setHost("127.0.0.1");
            // web端口默认为15672,通信端口为5672
            connectionFactory.setPort(5672);
            // 设置用户名和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            Connection connection = null;
            Channel channel = null;
            try {
                connection = connectionFactory.newConnection();
                // 创建通道
                channel = connection.createChannel();
                // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
                channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
                channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
                // 交换机(交换机名称,交换机类型(fanout:发布订阅,direct:routing,topic:主题,headers:header模式))
                channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
                // 绑定交换机(队列名称,交换机名称,routingKey(发布订阅设置为空))
                channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
                channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
                // 发送多条消息
                for (int i = 0; i < 5; i++) {
                    String message = "hello charon good evening by publish";
                    // 指定交换机(交换机,RoutingKey即队列名,额外的消息属性,消息内容)
                    channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
                    System.out.println("发送消息给mq:" + message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                // 关闭资源
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    消费email的消费者:

    package rabbitmq.publish;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @className: EmailConsumer
     * @description: 邮件的消息消费者
     * @author: charon
     * @create: 2021-01-07 22:14
     */
    public class EmailConsumer {
    
        /**邮件的队列*/
        public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    
        /**短信的队列*/
        public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    
        /**交换机*/
        public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
            connectionFactory.setHost("127.0.0.1");
            // web端口默认为15672,通信端口为5672
            connectionFactory.setPort(5672);
            // 设置用户名和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            Connection connection = connectionFactory.newConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
            // 实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 *
                 * @param consumerTag 消费者标签
                 * @param envelope 信封,可以获取交换机等信息
                 * @param properties 消息属性
                 * @param body 消费内容,字节数组,可以转成字符串
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //                String exchange = envelope.getExchange();
    //                long deliveryTag = envelope.getDeliveryTag();
                    String message = new String(body,"utf-8");
                    System.out.println("收到的email消息是:"+message);
                }
            };
            // 消费消息(队列名,是否自动确认,消费方法)
            channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
        }
    }
    

    消费短信的消费者:

    package rabbitmq.publish;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @className: SmsConsumer
     * @description:
     * @author: charon
     * @create: 2021-01-07 22:17
     */
    public class SmsConsumer {
    
    
        /**邮件的队列*/
        public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    
        /**短信的队列*/
        public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    
        /**交换机*/
        public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
            connectionFactory.setHost("127.0.0.1");
            // web端口默认为15672,通信端口为5672
            connectionFactory.setPort(5672);
            // 设置用户名和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            Connection connection = connectionFactory.newConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
            // 实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 *
                 * @param consumerTag 消费者标签
                 * @param envelope 信封,可以获取交换机等信息
                 * @param properties 消息属性
                 * @param body 消费内容,字节数组,可以转成字符串
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"utf-8");
                    System.out.println("收到的短信消息是:"+message);
                }
            };
            // 消费消息(队列名,是否自动确认,消费方法)
            channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
        }
    }
    

    3.Routing 路由模式

    生产者将消息发送给交换机按照路由判断,交换机根据路由的key,只能匹配上路由key的对应的消息队列,对应的消费者才能消费消息。

    如上图,rabbitMq根据对应的key,将消息发送到对应的队列中,error通知将发送到amqp.gen-S9b上,由消费者c1消费。error,info,warning通知将发送到amqp.gen-Ag1上,由消费者c2消费。

    特点:
    1.每个消费者监听自己的队列,并且设置路由key
    2.生产者将消息发送给交换机,由交换机根据路由key来转发消息到指定的队列

    生产者:

    package rabbitmq.routing;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @className: Producer
     * @description: 路由模式下的生成者
     * @author: charon
     * @create: 2021-01-07 22:34
     */
    public class Producer {
    
        /**邮件的队列*/
        public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";
    
        /**短信的队列*/
        public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";
    
        /**交换机*/
        public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    
        /** 设置email的路由key */
        public static final String ROUTING_EMAIL = "routing_email";
    
        /** 设置sms的路由key */
        public static final String ROUTING_SMS = "routing_sms";
    
        public static void main(String[] args) {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
            connectionFactory.setHost("127.0.0.1");
            // web端口默认为15672,通信端口为5672
            connectionFactory.setPort(5672);
            // 设置用户名和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            Connection connection = null;
            Channel channel = null;
            try {
                connection = connectionFactory.newConnection();
                // 创建通道
                channel = connection.createChannel();
                // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
                channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
                channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
                // 交换机(交换机名称,交换机类型(fanout:发布订阅,direct:routing,topic:主题,headers:header模式))
                channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
                // 绑定交换机(队列名称,交换机名称,routingKey)
                channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
                channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
                // 发送多条消息
                for (int i = 0; i < 5; i++) {
                    String message = "hello charon good evening by routing --email";
                    // 指定交换机(交换机,RoutingKey,额外的消息属性,消息内容)
                    channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL, null, message.getBytes());
                    System.out.println("发送消息给mq:" + message);
                }
                // 发送多条消息
                for (int i = 0; i < 5; i++) {
                    String message = "hello charon good evening by routing --sms";
                    // 指定交换机(交换机,RoutingKey,额外的消息属性,消息内容)
                    channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_SMS, null, message.getBytes());
                    System.out.println("发送消息给mq:" + message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                // 关闭资源
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    消费email的消费者:

    package rabbitmq.routing;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @className: EmailConsumer
     * @description: 路由模式下的email消费者
     * @author: charon
     * @create: 2021-01-07 22:40
     */
    public class EmailConsumer {
        /**邮件的队列*/
        public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";
    
        /**交换机*/
        public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    
        /** 设置email的路由key */
        public static final String ROUTING_EMAIL = "routing_email";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
            connectionFactory.setHost("127.0.0.1");
            // web端口默认为15672,通信端口为5672
            connectionFactory.setPort(5672);
            // 设置用户名和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            Connection connection = connectionFactory.newConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            // 绑定队列并指明路由key
            channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
            // 实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 *
                 * @param consumerTag 消费者标签
                 * @param envelope 信封,可以获取交换机等信息
                 * @param properties 消息属性
                 * @param body 消费内容,字节数组,可以转成字符串
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"utf-8");
                    System.out.println("收到的email消息是:"+message);
                }
            };
            // 消费消息(队列名,是否自动确认,消费方法)
            channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer);
        }
    }
    

    消费短信的消费者:

    package rabbitmq.routing;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @className: EmailConsumer
     * @description: 路由模式下的email消费者
     * @author: charon
     * @create: 2021-01-07 22:40
     */
    public class SmsConsumer {
        /**邮件的队列*/
        public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";
    
        /**交换机*/
        public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
    
        /** 设置email的路由key */
        public static final String ROUTING_SMS = "routing_sms";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置ip,端口,因为是本机,所以直接设置为127.0.0.1
            connectionFactory.setHost("127.0.0.1");
            // web端口默认为15672,通信端口为5672
            connectionFactory.setPort(5672);
            // 设置用户名和密码
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            // 设置虚拟ip,默认为/,一个rabbitmq的服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            Connection connection = connectionFactory.newConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列(队列名称,是否持久化,是否排它,是否自动删除,队列的扩展参数比如设置存活时间等)
            channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            // 绑定队列并指明路由key
            channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
            // 实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 *
                 * @param consumerTag 消费者标签
                 * @param envelope 信封,可以获取交换机等信息
                 * @param properties 消息属性
                 * @param body 消费内容,字节数组,可以转成字符串
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"utf-8");
                    System.out.println("收到的短信消息是:"+message);
                }
            };
            // 消费消息(队列名,是否自动确认,消费方法)
            channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer);
        }
    }
    

    4.Topic 主题模式

    1. 星号井号代表通配符
    2. 星号代表一个单词,井号代表一个或多个单词
    3. 路由功能添加模糊匹配
    4. 消息产生者产生消息,把消息交给交换机
    5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

    特点:
    1.每个消费者监听自己的队列,并且设置带通配符的routingkey
    2.生产者将消息发送给broker,由交换机及根据路由key来转发消息到指定的队列

    5.Header 转发器

    取消了路由key,使用header中的key/value(键值对)来匹配队列。

    6.RPC 远程调用

    基于direct类型交换机实现。生产者将消息远程发送给rpc队列,消费者监听rpc消息队列的消息并消息,然后将返回结果放入到响应队列中,生产者监听响应队列中的消息,拿到消费者的处理结果,实现远程RPC远程调用。

    参考文件:

    https://www.cnblogs.com/Jeely/p/10784013.html
    https://lovnx.blog.csdn.net/article/details/70991021

    本文版权归Charon和博客园共有,原创文章,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    11-27:项目进度
    11-23:项目进度
    Scrum团队成立,阅读《构建之法》第6~7章,并参考以下链接,发布读后感、提出问题、并简要说明你对Scrum的理解
    团队项目:二次开发
    实验二 作业调度
    欣赏并且评价同学的复利计算博客
    对于我们的复利计算程序的升级
    用汉堡包的方式评价一下自己的合作伙伴
    结对项目:复利计算
    软件工程构建之法第四章读后感
  • 原文地址:https://www.cnblogs.com/pluto-charon/p/14288765.html
Copyright © 2011-2022 走看看