zoukankan      html  css  js  c++  java
  • RabbitMQ 的工作模式

    简介

    • MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法; 
    • RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言)编写的,可复用的企业级消息系统; 
    • AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端、消息中间件、不同的开发语言环境等条件的限制; 
    • 支持主流操作系统:Linux、Windows,MacOX等; 
    • 支持多种客户端开发语言:Java、Python、Ruby、.NET,PHP、C/C++、Node.js等

    RabbitMQ实例

    1.1.  简单模式

    特点:一个生产者P发送消息到队列Q,一个消费者C接收

    生产者实现步骤: 

      1. 创建连接工厂
      2. 设置服务地址、端口、用户名、密码、vHost
      3. 从连接工厂中获取连接
      4. 从连接获取通道channel
      5. 使用channle 声明队列
      6. 使用channle 向队列发送消息
      7. 关闭channle
      8. 关闭连接

     代码如下:

    public class Send {
    
        private final static String QUEUE_NAME = "test_queue";
        
        public static void main(String[] args) {
            
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("127.0.0.1");
            //端口
            factory.setPort(5672);
            //设置 vhost ,用户名称,账号
            factory.setVirtualHost("taotao");
            factory.setUsername("taotao");
            factory.setPassword("123456");
            try {
                //通过工厂获取连接
                Connection conn= factory.newConnection();
                //连接中获取通道中
                Channel channel = conn.createChannel();
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //设置消息内容
                String msg = "Hello World! - 02";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                
                System.out.println("Send :发送成功!");
                channel.close();
                conn.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    当生产者代码成功运行后,我们通过管理工具查看会发现一个队列,并且队列中有一条信息:

    消费者实现步骤:

      1. 创建连接工厂
      2. 设置服务地址、端口、用户名、密码、vHost
      3. 从连接工厂中获取连接
      4. 从连接获取通道channel
      5. 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
      6. 定义队列消费者
      7. 监听队列
      8. 获取消息

    代码如下:

    public class Revc {
        
        private final static String QUEUE_NAME = "test_queue";
        
        public static void main(String[] args) throws Exception {
            
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("127.0.0.1");
            //设置端口
            factory.setPort(5672);
            //设置vhost 用户名称  密码
            factory.setUsername("taotao");
            factory.setPassword("123456");
            factory.setVirtualHost("taotao");
            
            //创建连接
            Connection conn = factory.newConnection();
            //从连接中获取通道
            Channel channel = conn.createChannel();
            
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //定义队列的消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            
            //监听队列
            channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
            //获取消息
            while (true) 
            {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("Revc :"+msg);
            }
        }
    }

    当消费者代码成功运行后,我们通过管理工具会发现队列中消息的数量从1变为0,这就说明消息被消费者获取以后,会被队列删除。

    连接工具类:

    /**
     * rabbitmq 工具类
     * @author Administrator
     *
     */
    public class ConnectionUtils {
    
        public static Connection getConnection() throws Exception
        {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("127.0.0.1");
            //设置端口
            factory.setPort(5672);
            //设置vhost
            factory.setVirtualHost("taotao");
            //设置用户名称
            factory.setUsername("taotao");
            //设置密码
            factory.setPassword("123456");
            //从工厂中获取连接
            Connection conn =  factory.newConnection();
            return conn;
        }
    }

    1.2.  工作队列模式 Work Queue

     

    特点:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

    生产者实现步骤【参考1.1】

    public class Send {
    
        private final static String QUEUE_NAME = "test_queue_work";
        public static void main(String[] args) {
            
            try
            {
                //获取连接
                Connection conn = ConnectionUtils.getConnection();
                //从连接中获取管道
                Channel channel = conn.createChannel();
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //设置消息
                for(int i = 0; i < 50; i++)
                {
                    String msg = "Holle Work ! -"+i;
                    channel.basicPublish("", QUEUE_NAME, null,msg.getBytes());
                    Thread.sleep(10*i);
                }
                channel.close();
                conn.close();
            }
            catch(Exception e)
            {
                
            }
        }
    }

    消费者实现步骤:

      1. 创建连接工厂
      2. 设置服务地址、端口、用户名、密码、vHost
      3. 从连接工厂中获取连接
      4. 从连接获取通道channel
      5. 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
      6. 定义队列消费者
      7. 监听队列并手动返回确认
      8. 获取消息【如果设置了手动返回确认则需要返回确消息】
    public class Revc {
    
        private final static String QUEUE_NAME = "test_queue_work";
        public static void main(String[] args) {
            try
            {
                //获取连接
                Connection conn =  ConnectionUtils.getConnection();
                //从连接中获取管道
                Channel channel = conn.createChannel();
                //声明队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //同一时刻服务器只会发一条消息费着
                channel.basicQos(1);
                
                //定义队列消费者
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //监听队列,手动返回完成
                channel.basicConsume(QUEUE_NAME, false,consumer);
                //获取消息
                while(true)
                {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String msg = new String(delivery.getBody());
                    System.out.println("Revc-->"+msg);
                    
                    //返回确认状态
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                    Thread.sleep(10);
                }
            }
            catch(Exception e)
            {
                
            }    
        }
    }

    可以设置多个消费者。并设置其休眠时间不同。可以得出,休眠时间越长,获取消息越少。这也体现了work模式的【能者多劳】模式。

    注意:设置能者多劳模式必须设置:同一时刻服务器只会发一条消息费者

    channel.basicQos(1);

    1.3.  发布/订阅模式Publish/Subscribe

    特点:

    1、 一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

    2、 生产者:可以将消息发送到队列或者是交换机

    3、 消费者:只能从队列中获取消息。

    4、 如果消息发送到没有队列绑定的交换机上,那么消息将丢失。交换机不能存储消息,消息存储在队列中

    生产者实现步骤:  

      1. 创建连接工厂
      2. 设置服务地址、端口、用户名、密码、vHost
      3. 从连接工厂中获取连接
      4. 从连接获取通道channel
      5. 使用channle 声明交换机
      6. 发送消息到交换机
      7. 关闭channel
      8. 关闭连接
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";    
        
        public static void main(String[] args) throws Exception {
            
             // 获取到连接以及mq通道
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
    
            // 声明exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 消息内容
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            channel.close();
            connection.close();
        }
    }

    消费者实现步骤:

    1.     创建连接工厂

    2.     设置服务地址、端口、用户名、密码、vHost

    3.     从连接工厂中获取连接

    4.     从连接获取通道channel

    5.     使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】

    6.     将队列绑定到交换机

    7.     定义队列消费者

    8.     监听队列并手动返回确认

     

    9.     获取消息【如果设置了手动返回确认则需要返回确消息】

    public class Revc {
        private final static String QUEUE_NAME = "test_queue_work";
    
        private final static String EXCHANGE_NAME = "test_exchange_fanout";
    
        public static void main(String[] argv) throws Exception {
    
            //获取连接
            Connection conn = ConnectionUtils.getConnection();
            //从连接中获取管道
            Channel ch = conn.createChannel();
            //声明队列
            ch.queueDeclare(QUEUE_NAME, false, false, false, null);
            //将队列绑定到交换机
            ch.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //定义队列消费者
            QueueingConsumer consumer = new QueueingConsumer(ch);
            //监听队列,手动返回完成
            ch.basicConsume(QUEUE_NAME, false,consumer);
            
            //获取消息
            while(true)
            {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println(" [ Revc ]"+msg);
                Thread.sleep(100);
                ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

    1.4.路由模式Routing

     

    特点:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key 

    生产者实现步骤:

      1. 创建连接工厂
      2. 设置服务地址、端口、用户名、密码、vHost
      3. 从连接工厂中获取连接
      4. 从连接获取通道channel
      5. 使用channle 声明交换机
      6. 发送消息到交换机并指定key
      7. 关闭channel
      8. 关闭连接
    public class Send {
    
        private final static String EXCHANGE_NAME = "test_exchange_direct";
        public static void main(String[] args) throws Exception {
            
            //获取连接
            Connection conn = ConnectionUtils.getConnection();
            //从连接中获取管道
            Channel ch = conn.createChannel();
            //声明exchange
            ch.exchangeDeclare(EXCHANGE_NAME, "direct");
            //设置消息内容
            String msg = "Hello Work !";
            ch.basicPublish(EXCHANGE_NAME, "key", null,msg.getBytes());
            System.out.println(" [x] Sent '" + msg + "'");
            ch.close();
            conn.close();
            
        }
    }

    消费者的实现步骤:

      1. 创建连接工厂
      2. 设置服务地址、端口、用户名、密码、vHost
      3. 从连接工厂中获取连接
      4. 从连接获取通道channel
      5. 使用channle 声明队列【可以不声明队列,但是要保证该队列存在,否则会报错】
      6. 将队列绑定到交换机并指定key
      7. 定义队列消费者
      8. 监听队列并手动返回确认
      9. 获取消息【如果设置了手动返回确认则需要返回确消息】
    public class Revc {
    
        private final static String QUEUE_NAME = "test_queue_work";
    
        private final static String EXCHANGE_NAME = "test_exchange_direct";
        
        public static void main(String[] args) throws Exception{
            
            //获取连接
            Connection conn = ConnectionUtils.getConnection();
            //获取chanel 通道
            Channel ch = conn.createChannel();
            //声明队列
            ch.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机
            ch.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");
            //同一时刻服务器只发送一条信息给消费者
            ch.basicQos(1);
            //定义队列消费者
            QueueingConsumer consumer = new QueueingConsumer(ch);
            //监听队列,并手动返回信息
            ch.basicConsume(QUEUE_NAME, false,consumer);
            
            //获取信息
            while(true)
            {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println(" [x] Received '" + msg + "'");
                Thread.sleep(10);
    
                ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

    1.5. 通配符模式Topics

    特点:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor

    “#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

    实现步骤参考路由模式

     

     

     

     

     

     

     

  • 相关阅读:
    2020牛客多校第二场A.All with Pairs hash+kmp
    2020杭电多校第三场
    2020牛客多校第六场K.K-Bag (思维?)
    2020牛客多校第六场 G.Grid Coloring 构造
    2020杭电多校第一场
    2020牛客暑期多校训练营(第三场)D.Points Construction Problem 构造
    ACM模板_axiomofchoice
    关于deque的行为分析
    Codeforces Round #665 (Div. 2) 题解 (CDEF)
    Codeforces Global Round 10 题解 (DEF)
  • 原文地址:https://www.cnblogs.com/wujiaofen/p/11084249.html
Copyright © 2011-2022 走看看