zoukankan      html  css  js  c++  java
  • 工作队列work queues 公平分发(fair dispatch) And 消息应答与消息持久化

    生产者

     1 package cn.wh.work;
     2 
     3 import cn.wh.util.RabbitMqConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class Send {
     8     private static final String QUEVE_NAME = "test_work_queue";
     9 
    10     public static void main(String[] args) throws Exception {
    11  ;
    12         Connection connection = RabbitMqConnectionUtil.getConnection();
    13         Channel channel = connection.createChannel();
    14         channel.queueDeclare(QUEVE_NAME, false, false, false, null);
    15 
    16         int i1 =1 ;
    17         channel.basicQos(i1);
    18         for (int i = 0; i < 50; i++) {
    19             String msg = "hello " + i;
    20             System.out.println(msg);
    21             channel.basicPublish("", QUEVE_NAME, null, msg.getBytes());
    22             Thread.sleep(i * 20);
    23         }
    24         channel.close();
    25         connection.close();
    26     }
    27 }

     消费者 1

     1 package cn.wh.work;
     2 
     3 import cn.wh.util.RabbitMqConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class Recv1 {
     9     private static final String QUEVE_NAME = "test_work_queue";
    10     public static void main(String[] args) throws Exception {
    11         Connection connection = RabbitMqConnectionUtil.getConnection();
    12       final   Channel channel = connection.createChannel();
    13         int i1 =1 ;
    14         channel.basicQos(i1);
    15         DefaultConsumer consumer = new DefaultConsumer(channel) {
    16             @Override
    17             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    18                 String msg = new String(body);
    19                 System.out.println("recv1"+msg);
    20                 try {
    21                     Thread.sleep(2000);
    22                 } catch (InterruptedException e) {
    23                     e.printStackTrace();
    24                 }finally {
    25                     System.out.println(1+"OK");
    26                     channel.basicAck(envelope.getDeliveryTag(),false);
    27                 }
    28             }
    29         };
    30         boolean autoAck=false;
    31         channel.basicConsume(QUEVE_NAME,autoAck,consumer);
    32     }
    33 }

    消费者2

     1 package cn.wh.work;
     2 
     3 import cn.wh.util.RabbitMqConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class Recv2 {
     9 
    10     private static final String QUEVE_NAME = "test_work_queue";
    11     public static void main(String[] args) throws Exception {
    12         Connection connection = RabbitMqConnectionUtil.getConnection();
    13      final    Channel channel = connection.createChannel();
    14         int i1 =1 ;
    15         channel.basicQos(i1);
    16         DefaultConsumer consumer = new DefaultConsumer(channel) {
    17             @Override
    18             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19                 String msg = new String(body);
    20                 System.out.println("recv2"+msg);
    21 
    22                 try {
    23                     Thread.sleep(1000);
    24                 } catch (InterruptedException e) {
    25                     e.printStackTrace();
    26                 }finally {
    27                     System.out.println(2+"OK");
    28                     channel.basicAck(envelope.getDeliveryTag(),false);
    29                 }
    30             }
    31         };
    32         boolean autoAck=false;
    33         channel.basicConsume(QUEVE_NAME,autoAck,consumer);
    34     }
    35 
    36 }

     这时候现象就是消费者 1 速度大于消费者

    Message acknowledgment(消息应答)

    • boolean autoAck = true;(自动确认模式)一旦 RabbitMQ 将消息分发给了消费者,就会从内存中删除。在这种情况下,如果杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但尚未处理的消息。
    • boolean autoAck = false; (手动确认模式) 我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。 为了确保消息不会丢失,RabbitMQ 支持消息应答。消费者送一个消息应答,告诉 RabbitMQ 这个消息已经接收并且处理完毕了。RabbitMQ 可以删除它了。
    • 消息应答是默认打开的。也就是 boolean autoAck =false

     Message durability(消息持久化)

    我们已经了解了如何确保即使消费者死亡,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍将失去!当 RabbitMQ 退出或者崩溃,将会丢失队列和消息。除非你不要队列和消息。两件事儿必须保证消息不被丢失:我们必须把“队列”和“消息”设为持久化。

      1. boolean durable = true;

      2. channel.queueDeclare("test_queue_work", durable, false, false, null);那么我们直接将程序里面的 false 改成 true 就行了?? 不可以会 报异常 channel error; protocol method: #method<channel.close>(reply-code=406, replytext=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue_work'

      尽管这行代码是正确的,他不会运行成功。因为我们已经定义了一个名叫 test_queue_work 的未持久化的队列。RabbitMQ 不允许使用不同的参数设定重新定义已经存在的队列,并且会返回一个错误。一个快速的解决方案——就是声明一个不同名字的队列,比如 task_queue。或者我们登录控制台将队列删除就可

  • 相关阅读:
    http请求消息体和响应消息体
    整型常量
    C语言中字符串后面的'\0'
    String类
    二进制转成十六进制
    http消息头
    NULL和NUL
    拷贝构造函数和赋值表达式
    awk中的FS
    之前给女性网增加的一个滚动展示
  • 原文地址:https://www.cnblogs.com/wh1520577322/p/10065758.html
Copyright © 2011-2022 走看看