zoukankan      html  css  js  c++  java
  • 三.RabbitMQ之异步消息队列(Work Queue)

      上一篇文章简要介绍了RabbitMQ的基本知识点,并且写了一个简单的发送和接收消息的demo.这一篇文章继续介绍关于Work Queue(工作队列)方面的知识点,用于实现多个工作进程的分发式任务。

      一.Work Queues:我们可以把它翻译成工作队列,他有什么用呢?它的主要作用就是规避了实时的执行资源密集型任务( resource-intensive task),因为这会造成响应时间过长,影响用户体验。Work Queues通过把一些实时性不强的任务存储到消息队列中,然后后台的工作者(worker)会在特定的情况下完成这些任务。

      举个例子来说,用户注册是一个资源密集型的任务,因为它需要经过存储用户基本信息(用户名,邮箱,密码),发送邮箱验证码、或者更有甚者,存入注册日志(操作日志)等步骤。传统的串行做法如下所示。

      

      可以看到,在用户填写完注册信息并点击提交以后,需要经历3个步骤,其中第一个步骤,判断注册信息是否合法,合法则存入数据库,这是注册的核心步骤,而后面两个步骤并不是十分迫切,无需在这个请求中马上完成。而传统的串行模式一般都是在一个请求中塞满逻辑处理,无论是否迫切的逻辑请求。这样会大大加重一个请求的负担,无论是用户等待时间,程序的压力上,都不是一种好的做法。

      尤其是对于web应用,我们知道一个web请求是一个短连接,在一个短连接中做过于复杂的逻辑运算操作,显然是不合适的。所以消息分布队列在web应用中尤为有用。

      我们将上述串行的的方式改为用消息队列的形式来实现,可以看到此时我把一个请求里面做的事情分解到三个请求来实现,这样每个请求的时间都降低了,特别对于用户而言,他的等待时间大大减少,而这样也可以充分利用了cpu的性能。

    以上便是工作队列主要的原理及优点。

    二.一个work queues的demo

      延续上一个demo的轨迹,并结合我们举的注册的例子,模拟用户注册业务。

      1.首先,我们编写一个生产者,它除了执行将注册数据存储进数据库的方法外,还向RabbitMQ队列里发送了两条消息,分别用于存储有关邮箱验证和日志存储的内容。代码如下。

      

    package com.xdx.learn;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import net.sf.json.JSONObject;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class NewTask {
        private final static String QUEUE_NAME="register";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.1.195");//服务器ip
            factory.setPort(5672);//端口
            factory.setUsername("xdx");//登录名
            factory.setPassword("xxxxx");//密码
            Connection connection=factory.newConnection();//建立连接
            Channel channel=connection.createChannel();//建立频道
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);//建立一个队列
            System.out.println("首先,保存用户注册数据到数据库");
            JSONObject jsonObjet1=new JSONObject();
            jsonObjet1.put("msgType", "email");//该消息是针对发送验证邮件的。
            jsonObjet1.put("content", "执行发送验证邮件到邮箱操作");
            String message1=jsonObjet1.toString();
            channel.basicPublish("", QUEUE_NAME, null, message1.getBytes());//发布第一个异步消息
            System.out.println(channel+" Sent '"+message1+"'");
            JSONObject jsonObject2=new JSONObject();
            jsonObject2.put("msgType", "log");//该消息针对存储操作日志
            jsonObject2.put("content", "执行存储操作日志的操作");
            String message2=jsonObject2.toString();
            channel.basicPublish("", QUEUE_NAME, null, message2.getBytes());//发布第二个异步消息
            System.out.println(channel+" Sent '"+message2+"'");
            channel.close();
            connection.close();
        }
    }

      由上面的代码我们知道我们可以传输较为复杂的消息,我们用一个json类型对象来封装消息,并将该消息存储到消息队列中。执行上述代码,得到结果如下。

      首先,保存用户注册数据到数据库
      AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作"}'
      AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作"}'

      然后我们再到RabbitMQ的后台看看现在的queue的情况,发现多了一个名叫register的queue,并且在该queue中有两个消息,如下图所示。

      2.接下来我们编写一个消费者worker1,在worker1中,根据接收到的消息类型,调用不同的处理方法来处理消息中的任务。如下所示。

      

    package com.xdx.learn;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import net.sf.json.JSONObject;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Envelope;
    
    public class Worker1 {
        private final static String QUEUE_NAME="register";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //下面的配置与生产者相对应
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.1.195");//服务器ip
            factory.setPort(5672);//端口
            factory.setUsername("xdx");//登录名
            factory.setPassword("xxxxx");//密码
            Connection connection=factory.newConnection();
            final Channel channel=connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" worker1 Waiting for messages. To exit press CTRL+C");
            //每次从队列获取的数量
            channel.basicQos(1);
            //defaultConsumer实现了Consumer,我们将使用它来缓存生产者发送过来储存在队列中的消息。当我们可以接收消息的时候,从中获取。
            Consumer consumer=new DefaultConsumer(channel){
                 @Override
                  public void handleDelivery(String consumerTag, Envelope envelope,
                                             AMQP.BasicProperties properties, byte[] body)
                      throws IOException {
                    String message = new String(body, "UTF-8");
                    try {
                         JSONObject jsonObject=JSONObject.fromObject(message);
                            String msgType=jsonObject.get("msgType").toString();
                            System.out.println(" wokrer1 Received message,msgType is " + msgType);
                            if(msgType.equals("email")){
                                //调用邮箱验证代码
                                System.out.println("worker1 do "+jsonObject.get("content"));
                            }else{
                                //调用日志保存代码
                                System.out.println("worker1 do "+jsonObject.get("content"));
                            }
                    } catch (Exception e) {
                         channel.abort();
                    }finally{
                        System.out.println("Worker1 Done");
                        //注意这句为必须,否则会造成RabbitMQ因为重复的重新发送已处理的消息而内存溢出
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                   
                  }
            };
            //接收到消息以后,推送给RabbitMQ,确认收到了消息。第二个参数为false,表示手动确认消息处理完毕
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    
    }

      执行上述的代码,可以得到如下结果

       worker1 Waiting for messages. To exit press CTRL+C
       wokrer1 Received message,msgType is email
      worker1 do 执行发送验证邮件到邮箱操作
      Worker1 Done
       wokrer1 Received message,msgType is log
      worker1 do 执行存储操作日志的操作
      Worker1 Done

      可以看到我们能够解析到消息里面的内容,并且根据不同的消息类别调用不同的处理逻辑,上述代码需要注意的知识点均有注释。执行完毕后,再到RabbitMQ后台查看,发现待处理消息已经为0.

      3.并发处理,我们稍微改动一下NewTask方法,让它一次性发送多条消息到队列中。

      

    package com.xdx.learn;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import net.sf.json.JSONObject;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class NewTask {
        private final static String QUEUE_NAME="register";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.1.195");//服务器ip
            factory.setPort(5672);//端口
            factory.setUsername("xdx");//登录名
            factory.setPassword("xxxxx");//密码
            Connection connection=factory.newConnection();//建立连接
            Channel channel=connection.createChannel();//建立频道
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);//建立一个队列
            System.out.println("向消息队列中插入10条邮箱验证消息和10条日志存储消息");
            for(int i=0;i<10;i++){
                JSONObject jsonObjet1=new JSONObject();
                jsonObjet1.put("msgType", "email");//该消息是针对发送验证邮件的。
                jsonObjet1.put("content", "执行发送验证邮件到邮箱操作"+i);
                String message1=jsonObjet1.toString();
                channel.basicPublish("", QUEUE_NAME, null, message1.getBytes());//发布第一个异步消息
                System.out.println(channel+" Sent '"+message1+"'");
                JSONObject jsonObject2=new JSONObject();
                jsonObject2.put("msgType", "log");//该消息针对存储操作日志
                jsonObject2.put("content", "执行存储操作日志的操作"+i);
                String message2=jsonObject2.toString();
                channel.basicPublish("", QUEUE_NAME, null, message2.getBytes());
                System.out.println(channel+" Sent '"+message2+"'");
            }
            
            channel.close();
            connection.close();
        }
    }

      执行结果如下:

    向消息队列中插入10条邮箱验证消息和10条日志存储消息
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作0"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作0"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作1"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作1"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作2"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作2"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作3"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作3"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作4"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作4"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作5"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作5"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作6"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作6"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作7"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作7"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作8"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作8"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"执行发送验证邮件到邮箱操作9"}'
    AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"执行存储操作日志的操作9"}'

      这样以后,现在队列中已经有了20条的数据,如下所示。

      

      可以看到生产者已经生产成功,接下来我再编写一个消费者Worker2,用于分担Worker1的负担,它的代码与worker2基本类似,我们修改了worker1和worker2的代码,加入睡眠机制,每一个worker执行完消息的任务以后,如下。

    package com.xdx.learn;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import net.sf.json.JSONObject;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Envelope;
    
    public class Worker2 {
        private final static String QUEUE_NAME="register";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //下面的配置与生产者相对应
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.1.195");//服务器ip
            factory.setPort(5672);//端口
            factory.setUsername("xdx");//登录名
            factory.setPassword("xxxx");//密码
            Connection connection=factory.newConnection();
            final Channel channel=connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //每次从队列获取的数量
            channel.basicQos(1);
            //defaultConsumer实现了Consumer,我们将使用它来缓存生产者发送过来储存在队列中的消息。当我们可以接收消息的时候,从中获取。
            Consumer consumer=new DefaultConsumer(channel){
                 @Override
                  public void handleDelivery(String consumerTag, Envelope envelope,
                                             AMQP.BasicProperties properties, byte[] body)
                      throws IOException {
                    String message = new String(body, "UTF-8");
                    try {
                         JSONObject jsonObject=JSONObject.fromObject(message);
                            String msgType=jsonObject.get("msgType").toString();
                            if(msgType.equals("email")){
                                //调用邮箱验证代码
                                System.out.println("worker2 do "+jsonObject.get("content"));
                            }else{
                                //调用日志保存代码
                                System.out.println("worker2 do "+jsonObject.get("content"));
                            }
                    } catch (Exception e) {
                         channel.abort();
                    }finally{
                        channel.basicAck(envelope.getDeliveryTag(),false);
                      //执行以后睡一会,好让其他的worker有机会执行任务
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                   
                  }
            };
            //接收到消息以后,推送给RabbitMQ,告诉他确认收到了消息。第二个参数为false,表示手动确认消息处理完毕
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    
    }

      现在我同时执行worker1和worker2的代码。

    这样以后,在RabbitMQ的控制台,已经没有未处理的消息了。

       可以看到worker1和worker2确实分工合作,共同处理了这些消息队列中的任务。

      三.扩展

      1.message acknowledgment(消息确认):如果消费者在没有处理完一个消息就挂掉了,则这个消息就会遗失,所以必须在消费者代码中通知给RabbitMQ。默认是手动通知的,这样可以确保消息不会遗失。如果没有接收到确认,RabbitMQ会指派另外一个消费者处理任务。 channel.basicAck(envelope.getDeliveryTag(),false);和channel.basicConsume(QUEUE_NAME, false, consumer);都是必须的,否则会造成RabbitMQ无法释放已经处理过的消息和导致内存溢出。

      2.Message durability(消息持久化):可修改channel.queueDeclare("task_queue", durable, false, false, null);及channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());来使消息能持久化。需要注意的是,如果一个queue已经定义为非持久化,则不能再改为持久化,会出错,此时必须定义一个新的queue(换个名字)

      3.Fair dispatch(分配策略):默认情况下,RabbitMQ会平均的分配消息给消费者,它不会管这个消费者目前手上有多少未完成的任务,这可能会造成有的消费者很忙,有的消费者很闲。通过channel.basicQos(1);可以指定消费者每次只接收一条消息,只有当这条消息已经处理完毕,并且确认以后,才接收下一条的消息。

  • 相关阅读:
    保持URL不变和数字验证
    centOS ftp key?
    本地环境测试二级域名
    linux 解决You don't have permission to access 问题
    php smarty section loop
    php header Cannot modify header information headers already sent by ... 解决办法
    linux部分命令
    Linux 里面的文件操作权限说明
    用IT网络和安全专业人士视角来裁剪云的定义
    SQL Server 2008 R2炫酷报表"智"作有方
  • 原文地址:https://www.cnblogs.com/roy-blog/p/8026435.html
Copyright © 2011-2022 走看看