zoukankan      html  css  js  c++  java
  • 《RabbitMQ 消息中间件》RabbitMQ详解

    前言

    安装完RabbitMQ之后,我们就来了解一下RabbitMQ的管理命令。

    详情

    第一条命令:帮助命令

    rabbitmqctl help

    第二条命令:插件管理

    rabbitmq-plugins

    list 插件列表,enable启用插件,disable禁用插件,set重置插件。

    管理界面

    RabbitMQ使用的是AMQP的协议。

    RabbitMQ的7中消息模型:

     

    第一种消息模型-直连(SpringBoot演练)

    加入依赖Jar包(Gradle)

    compile('org.springframework.boot:spring-boot-starter-amqp')

    封装类:

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class MqUtil {
        public static Connection getConnection() throws IOException, TimeoutException {
            // 创建rabbitmq的连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机IP
            connectionFactory.setHost("47.105.72.224");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置主机名
            connectionFactory.setVirtualHost("/my");
            // 设置访问虚拟主机的IP和端口
            connectionFactory.setUsername("rabbitmq");
            connectionFactory.setPassword("rabbitmq");
            return connectionFactory.newConnection();
        }
    }

    发送消息代码:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class Provoder {
    
        public void sendMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
            /**
             * 参数1:队列名称,不存在自动创建
             * 参数2:是否持久化队列
             * 参数3:exclusive 是否独占队列(当前队列只能被当前连接使用)
             * 参数4:autoDelete 是否需要消费完之后,删除参数。
             * 参数5:额外信息
             */
            channel.queueDeclare("hello",true,false,false,null);
    
            /**
             * 发布消息
             * 参数1:交互机
             * 参数2:队列
             * 参数3:消息额外设置(MessageProperties.MINIMAL_PERSISTENT_BASIC)
             * 参数4:具体内容
             */
            channel.basicPublish("","hello",MessageProperties.MINIMAL_PERSISTENT_BASIC,"Hello World!".getBytes());
            channel.close();
        }
    }

    测试Controller

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @RestController
    public class TestController3 {
    
        @Autowired
        SendMqProvoder sendMqProvoder;
    
        @RequestMapping("/rabbitmq.do")
        public String event() throws IOException, TimeoutException {
            sendMqProvoder.sendMessage();
            return "success";
        }
    }

    效果展示:

    网页访问后的RabbitMq消息情况,rabbitmq的管理界面中就出现了Hello队列,并且有两条未消费的消息。

    消费者:

    import com.rabbitmq.client.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class Customer {
    
        public void receiveMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
            /**
             * 参数1:队列名称,不存在自动创建
             * 参数2:是否持久化队列
             * 参数3:exclusive 是否独占队列(当前队列只能被当前连接使用)
             * 参数4:autoDelete 是否需要消费完之后,删除参数。
             * 参数5:额外信息
             */
            channel.queueDeclare("hello",true,false,false,null);
    
            /**
             * queue 队列名称
             * autoAck 开启消息自动确认机制
             * callback 消费消息的回调函数
             */
            channel.basicConsume("hello",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
                    System.out.println(new String(body));
                }
            });
            
            //消费者需要一直监听消息。
            //channel.close();
            //connection.close();
        }
    }

    测试类:

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @RestController
    public class TestController3 {
    
        @Autowired
        Provoder provoder;
    
        @Autowired
        Customer customer;
    
        @RequestMapping("/rabbitmq.do")
        public String event() throws IOException, TimeoutException {
            provoder.sendMessage();
            return "success";
        }
    
        @RequestMapping("/receive.do")
        public String receive() throws IOException, TimeoutException {
            customer.receiveMessage();
            return "success";
        }
    }

    接收消息的结果:

    第二种消息模型-work queue(SpringBoot演练)

    1. 支持多个消费者,一个消息只能被消费一次。

    2. 消息被多个消费者平均分配消费。

    生产者:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class WorkProvoder {
        public void sendMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
            /**
             * 参数1:队列名称,不存在自动创建
             * 参数2:是否持久化队列
             * 参数3:exclusive 是否独占队列(当前队列只能被当前连接使用)
             * 参数4:autoDelete 是否需要消费完之后,删除参数。
             * 参数5:额外信息
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * 发布消息
             * 参数1:交互机
             * 参数2:队列
             * 参数3:消息额外设置(MessageProperties.MINIMAL_PERSISTENT_BASIC)
             * 参数4:具体内容
             */
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("","work",MessageProperties.MINIMAL_PERSISTENT_BASIC,(i+"Hello Work Queue!").getBytes());
            }
            channel.close();
            connection.close();
        }
    }

    消费者1

    import com.rabbitmq.client.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class WorkCustomer {
    
        public void receiveMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
            /**
             * 参数1:队列名称,不存在自动创建
             * 参数2:是否持久化队列
             * 参数3:exclusive 是否独占队列(当前队列只能被当前连接使用)
             * 参数4:autoDelete 是否需要消费完之后,删除参数。
             * 参数5:额外信息
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * queue 队列名称
             * autoAck 开启消息自动确认机制
             * callback 消费消息的回调函数
             */
            channel.basicConsume("work",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                    System.out.println("消费者-1:"+new String(body));
                }
            });
        }
    }

    消费者2

    import com.rabbitmq.client.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class WorkCustomer2 {
    
        public void receiveMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
            /**
             * 参数1:队列名称,不存在自动创建
             * 参数2:是否持久化队列
             * 参数3:exclusive 是否独占队列(当前队列只能被当前连接使用)
             * 参数4:autoDelete 是否需要消费完之后,删除参数。
             * 参数5:额外信息
             */
            channel.queueDeclare("work",true,false,false,null);
    
            /**
             * queue 队列名称
             * autoAck 开启消息自动确认机制
             * callback 消费消息的回调函数
             */
            channel.basicConsume("work",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                    System.out.println("消费者-2:"+new String(body));
                }
            });
        }
    }

    测试类:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @RestController
    public class TestController4 {
    
        @Autowired
        WorkProvoder workProvoder;
    
        @Autowired
        WorkCustomer customer1;
    
        @Autowired
        WorkCustomer2 customer2;
    
        @RequestMapping("/workSend.do")
        public String workSend() throws IOException, TimeoutException {
            workProvoder.sendMessage();
            return "success";
        }
    
        @RequestMapping("/workReceive1.do")
        public String workReceive1() throws IOException, TimeoutException {
            customer1.receiveMessage();
            return "success";
        }
    
        @RequestMapping("/workReceive2.do")
        public String workReceive2() throws IOException, TimeoutException {
            customer2.receiveMessage();
            return "success";
        }
    }

    运行结果:

    现在的模式会出现消息丢失,平均分配的问题。

    修改一下代码,加入手动确认和每次消费一条消息,来保证消息的完整。

    import com.rabbitmq.client.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class WorkCustomer2 {
    
        public void receiveMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
            /**
             * 参数1:队列名称,不存在自动创建
             * 参数2:是否持久化队列
             * 参数3:exclusive 是否独占队列(当前队列只能被当前连接使用)
             * 参数4:autoDelete 是否需要消费完之后,删除参数。
             * 参数5:额外信息
             */
            channel.queueDeclare("work",true,false,false,null);
    
            // 一次处理一个
            channel.basicQos(1);
    
            /**
             * queue 队列名称
             * autoAck 开启消息自动确认机制
             * callback 消费消息的回调函数
             */
            channel.basicConsume("work",false,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                    System.out.println("消费者-2:"+new String(body));
                    // 传一个标记确认消息。
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    }

    第三种消息模型-广播(fanout)

    生产者:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class FanProvider {
    
        public void sendMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
    
            /**
             * 设置交换机
             * 参数1:交换机的名字(自取)。
             * 参数2:交换机类型:fanout:就是广播。
             */
            channel.exchangeDeclare("exchange1","fanout");
    
            /**
             * 发布消息
             * 参数1:交互机
             * 参数2:队列
             * 参数3:消息额外设置(MessageProperties.MINIMAL_PERSISTENT_BASIC:消息持久化)
             * 参数4:具体内容
             */
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("exchange1","",MessageProperties.MINIMAL_PERSISTENT_BASIC,(i+"Hello Fan Queue!").getBytes());
            }
            channel.close();
            connection.close();
        }
    }

    消费者:

    import com.rabbitmq.client.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class FanCustomer {
    
        public void receiveMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
    
            /**
             * 绑定交换机
             * 参数1:交换机名
             * 参数2:交换机类型
             */
            channel.exchangeDeclare("exchange1","fanout");
    
            /********* 创建一个临时队列 *********/
            String queue = channel.queueDeclare().getQueue();
    
            /********* 绑定交换机和队列 *********/
            channel.queueBind(queue,"exchange1","");
    
    
            /**
             * queue 队列名称
             * autoAck 开启消息自动确认机制
             * callback 消费消息的回调函数
             */
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                    System.out.println("消费者-1:"+new String(body));
                }
            });
        }
    }

    运行结果:

    两个消费者都能消费到消息。

    第四种消息模型-订阅(Routing):实现不同的人去订阅不同的消息。

    生产者:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class DirectProvider {
    
        public void sendMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
    
            /**
             * 设置交换机
             * 参数1:交换机的名字(自取)。
             * 参数2:交换机类型:direct:路由模式。
             */
            channel.exchangeDeclare("logs","direct");
    
            /******* 设置路由key ******/
            String routingKey = "info";
            String routingKey1 = "warn";
    
            /**
             * 发布消息
             * 参数1:交互机
             * 参数2:队列
             * 参数3:消息额外设置(MessageProperties.MINIMAL_PERSISTENT_BASIC:消息持久化)
             * 参数4:具体内容
             */
            channel.basicPublish("logs",routingKey,MessageProperties.MINIMAL_PERSISTENT_BASIC,(routingKey+":Hello Fan Queue!").getBytes());
            channel.basicPublish("logs",routingKey1,MessageProperties.MINIMAL_PERSISTENT_BASIC,(routingKey1+":Hello Fan Queue!").getBytes());
            channel.basicPublish("logs",routingKey1,MessageProperties.MINIMAL_PERSISTENT_BASIC,(routingKey1+":Hello Fan Queue!").getBytes());
    
            channel.close();
            connection.close();
        }
    }

    消费者:

    import com.rabbitmq.client.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class DirectCustomer {
    
        public void receiveMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
    
            /**
             * 绑定交换机
             * 参数1:交换机名
             * 参数2:交换机类型
             */
            channel.exchangeDeclare("logs","direct");
    
            /********* 创建一个临时队列 *********/
            String queue = channel.queueDeclare().getQueue();
    
            /********* 绑定交换机和队列和路由key *********/
            channel.queueBind(queue,"logs","info");
    
    
            /**
             * queue 队列名称
             * autoAck 开启消息自动确认机制
             * callback 消费消息的回调函数
             */
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                    System.out.println("消费者-1:"+new String(body));
                }
            });
        }
    }

    运行结果:

    第五种消息模型-动态订阅(Topic):实现不同的人去订阅不同的消息,支持设置通配符。

    * 匹配任意一个单词。  *.rabbitmq

    # 匹配多个任意单词。 user.#

    生产者:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.MessageProperties;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class TopicProvider {
    
        public void sendMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
    
            /**
             * 设置交换机
             * 参数1:交换机的名字(自取)。
             * 参数2:交换机类型:topic:动态路由模式。
             */
            channel.exchangeDeclare("topic_logs","topic");
    
            /******* 设置路由key ******/
            String routingKey = "user.info";
            String routingKey1 = "user.warn";
    
            /**
             * 发布消息
             * 参数1:交互机
             * 参数2:队列
             * 参数3:消息额外设置(MessageProperties.MINIMAL_PERSISTENT_BASIC:消息持久化)
             * 参数4:具体内容
             */
            channel.basicPublish("topic_logs",routingKey,MessageProperties.MINIMAL_PERSISTENT_BASIC,(routingKey+":Hello Topic Queue!").getBytes());
            channel.basicPublish("topic_logs",routingKey1,MessageProperties.MINIMAL_PERSISTENT_BASIC,(routingKey1+":Hello Topic Queue!").getBytes());
    
            channel.close();
            connection.close();
        }
    }

    消费者1:

    import com.rabbitmq.client.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class TopicCustomer {
    
        public void receiveMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
    
            /**
             * 绑定交换机
             * 参数1:交换机名
             * 参数2:交换机类型
             */
            channel.exchangeDeclare("topic_logs","topic");
    
            /********* 创建一个临时队列 *********/
            String queue = channel.queueDeclare().getQueue();
    
            /********* 绑定交换机和队列和路由key *********/
            channel.queueBind(queue,"topic_logs","*.info");
    
    
            /**
             * queue 队列名称
             * autoAck 开启消息自动确认机制
             * callback 消费消息的回调函数
             */
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                    System.out.println("消费者-1:"+new String(body));
                }
            });
        }
    }

    消费者2:

    import com.rabbitmq.client.*;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @Component
    public class TopicCustomer1 {
    
        public void receiveMessage() throws IOException, TimeoutException {
            Connection connection = MqUtil.getConnection();
            // 创建MQ渠道
            Channel channel = connection.createChannel();
    
            /**
             * 绑定交换机
             * 参数1:交换机名
             * 参数2:交换机类型
             */
            channel.exchangeDeclare("topic_logs","topic");
    
            /********* 创建一个临时队列 *********/
            String queue = channel.queueDeclare().getQueue();
    
            /********* 绑定交换机和队列和路由key *********/
            channel.queueBind(queue,"topic_logs","user.*");
    
            /**
             * queue 队列名称
             * autoAck 开启消息自动确认机制
             * callback 消费消息的回调函数
             */
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
                    System.out.println("消费者-2:"+new String(body));
                }
            });
        }
    }

    运行结果:

    第六种通道协议(RPC)略。

    第7种消息模型-发布确认模型 略。

    总结

    1. rabbitmq 目前支持6中消息模型,一种通道协议。

    2. rabbitmq 消息支持队列和消息持久化

    3. 结构:生产者,消费者,队列,交换机,虚拟主机。

    4. 消息确认机制,默认平均分配给每一个消费者。

  • 相关阅读:
    redis学习
    win2008下c#调用directshow问题
    vs2005升级到vs2010相关问题
    spark-shell 启动失败,显示端口问题
    监控spark-sql 等脚本
    spark 相关配置 shuffle 相关配置选项
    spark on Yarn 语句
    使用hive thriftserver 连接spark sql
    HBase 报错系列之region is not online
    HBase 表迁移中对丢失的表检查使用的语句
  • 原文地址:https://www.cnblogs.com/jssj/p/13942827.html
Copyright © 2011-2022 走看看