如果一个消费者需要处理一个耗时的任务,那么队列中其他的任务将被迫等待这个消费者处理完成,所以为了避免这样的情况,可以建立对个消费者进行工作。
本例中使用Thread.sleep()方法来假装消费者在处理一个耗时的任务。我们将把字符串中的点的个数作为其复杂度; 每个点都将占“工作”的一秒钟。例如,由Hello ...描述的假任务 将需要三秒钟。我们在启动这个程序的时候,设置java参数,如 java NewTask hello ...
定义一个NewTask.java:
1 package com.rabbitMQ; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 public class NewTask { 8 9 private final static String QUEUE_NAME = "work"; 10 11 public static void main(String[] args) throws Exception { 12 // 创建连接工厂 13 ConnectionFactory factory = new ConnectionFactory(); 14 // 设置连接rabbitMQ服务器的ip 15 factory.setHost("localhost"); 16 // factory.setPort(5672); 17 // 创建一个连接到服务器的链接 18 Connection connection = factory.newConnection(); 19 // 创建连接通道 20 Channel channel = connection.createChannel(); 21 28 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 29 30 String message = getMessage(args); 31 37 channel.basicPublish("", "hello", null, message.getBytes()); 38 39 System.out.println(" [x] Sent '" + message + "'"); 40 41 channel.close(); 42 43 connection.close(); 44 } 45 46 private static String getMessage(String[] strings) { 47 if (strings.length < 1) 48 return "Hello World!"; 49 return joinStrings(strings, " "); 50 } 51 52 private static String joinStrings(String[] strings, String delimiter) { 53 int length = strings.length; 54 if (length == 0) 55 return ""; 56 StringBuilder words = new StringBuilder(strings[0]); 57 for (int i = 1; i < length; i++) { 58 words.append(delimiter).append(strings[i]); 59 } 60 return words.toString(); 61 } 62 63 }
定义一个消费工作者Worker.java:
1 package com.rabbitMQ; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.Consumer; 10 import com.rabbitmq.client.DefaultConsumer; 11 import com.rabbitmq.client.Envelope; 12 13 /** 14 15 * @author may 16 * 17 */ 18 public class Worker { 19 20 private final static String QUEUE_NAME = "work"; 21 22 public static void main(String[] argv) throws Exception { 23 ConnectionFactory factory = new ConnectionFactory(); 24 factory.setHost("localhost"); 25 Connection connection = factory.newConnection(); 26 Channel channel = connection.createChannel(); 27 /** 28 * queue the name of the queue durable true if we are declaring a 29 * durable queue (the queue will survive a server restart) exclusive 30 * true if we are declaring an exclusive queue (restricted to this 31 * connection) autoDelete true if we are declaring an autodelete queue 32 * (server will delete it when no longer in use) arguments other 33 * properties (construction arguments) for the queue 34 */ 35 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 36 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 37 // 定义一个消费者 38 final Consumer consumer = new DefaultConsumer(channel) { 39 @Override 40 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 41 byte[] body) throws IOException { 42 String message = new String(body, "UTF-8"); 43 44 System.out.println(" [x] Received '" + message + "'"); 45 try { 46 doWork(message); 47 } catch (InterruptedException e) { 48 // TODO Auto-generated catch block 49 e.printStackTrace(); 50 } finally { 51 System.out.println(" [x] Done"); 52 } 53 } 54 }; 55 // 异步 56 /** 57 * queue the name of the queue 队列名 autoAck true if the server should 58 * consider messages acknowledged once delivered; false if the server 59 * should expect explicit acknowledgements callback an interface to the 60 * consumer object 61 * 可以通过以下命令去查看队列中没有返回ack的消息个数 62 * rabbitmqctl list_queues name messages_ready messages_unacknowledged 63 */ 64 boolean autoAck = true; 65 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 66 67 // rabbitmqctl.bat list_queues 可以列出当前有多少个队列 68 } 69 70 private static void doWork(String task) throws InterruptedException { 71 for (char ch : task.toCharArray()) { 72 if (ch == '.') 73 Thread.sleep(1000); 74 } 75 } 76 77 }
第70行的doWork方法对收到的消息字符串进行遍历,有多少个.就会休眠多少秒。以此来模拟耗时任务。
循环调度
启动两个work,然后多次启动NewTask,每次发送的字符串消息不同
在Linux环境下
export CP=.:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar
# shell 3 java -cp $CP NewTask # => First message. java -cp $CP NewTask # => Second message.. java -cp $CP NewTask # => Third message... java -cp $CP NewTask # => Fourth message.... java -cp $CP NewTask # => Fifth message.....
查看两个work的输出情况
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
如果是在windows环境下,那么使用以下的命令
set CP=.;amqp-client-4.0.2.jar;slf4j-api-1.7.21.jar;slf4j-simple-1.7.22.jar java -cp %CP% NewTask
....(把$CP改成%CP%,其他一样)
eclipse环境下右键run as 选择run configurations...
可以看出,默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者将获得相同数量的消息。这种分发消息的方式叫做round-robin。
消息确认
执行任务可能需要几秒钟。你可能会想,如果一个消费者开始一个非常耗时的任务,并且只运行了一部分时间,就被异常终止了,比如down机。上面的代码,一旦RabbitMQ向客户发送消息,它立即将这个消息从内存中删除。在这种情况下,如果你杀死一个消费者,我们将丢失正在处理的消息。我们还会丢失所有发送给该特定消费者但尚未处理的消息。
但是我们不想失去任何任务。如果一个消费者终止,我们希望把这个任务交给另一个消费者。
为了确保消息永远不会丢失,RabbitMQ支持消息确认。从消费者发送一个确认信息(ack)告诉RabbitMQ已经收到,处理了特定的消息,并且RabbitMQ可以删除它。
如果消费者死机(其通道关闭,连接关闭或TCP连接丢失),而不发送确认信息,RabbitMQ将会知道消息未被完全处理需要重新排队。如果同时有其他消费者在线,则会迅速将其重新提供给另一个消费者。这样就可以确保没有消息丢失。
为了防止消费者意外终止造成消息的丢失,我们可以设置autoAck为false,禁止自动确认消息,我们应该在消息处理成功后手动确认消息。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false);//任务处理完成后手动确认消息 } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
使用这个代码,我们可以确定即使在处理消息时,使用CTRL + C杀死一个消费者,也不会丢失任何东西。消费者被杀死之后不久,所有未确认的消息将被重新发送。
忘记确认
如果忘记手动确认消息,那么这些消息将被堆积在队列中,会消耗内存。我们可以通过rabbitmqctl list_queues name messages_ready messages_unacknowledged命令来查看有多少消息未被确认的。
第三个数字就表示相应队列中已被读取但未被正确处理的消息有多少个。
消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你不告诉它。需要两件事来确保消息不会丢失:我们需要将队列和消息标记为持久。
首先,我们需要确保RabbitMQ不会失去我们的队列。为了这样做,我们需要将其声明为持久的:
boolean durable = true ;
channel.queueDeclare(“hello”,durable,false,false,null);
虽然这个命令本身是正确的,但是在我们目前的设置中是不行的。这是因为我们已经定义了一个名为hello的非持久性队列。RabbitMQ不允许您重新定义具有不同参数的现有队列,并会向尝试执行此操作的任何程序返回错误。但是有一个快速的解决方法 - 让我们用不同的名称声明一个队列,例如task_queue:
boolean durable = true ;
channel.queueDeclare(“task_queue”,durable,false,false,null);
生产者和消费者的queueDeclare都要更改成持久性队列。
在这一点上,我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要通过将MessageProperties(实现了BasicProperties)设置PERSISTENT_TEXT_PLAIN来标记我们的消息是哪种类型的持久化消息。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish(“”,“task_queue”,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
公平分派
前面代码实现的消息队列是平均地将任务分发给每个消费者,如果此时有其中一个消费者处理消息非常的耗时,而另外的一个消费者可以很快地处理完消息,这个时候就出问题了,如果队列中存在三条消息,rabbitMQ将第一条给了耗时的消费者,把第二条给了不耗时的消费者,最后把第三条给了耗时的消费者,这个时候,耗时的消费者一直在忙碌,而不耗时的消费者没事干。
这是因为当消息进入队列时,RabbitMQ只会盲目地平均分派消息,不会检查被分派任务的消费者是否已经将消息处理完成。
为了避免这种问题,在消费者的代码中设置以下代码。消费者告诉RabbitMQ不要一次性给我多个消息。或者换句话说,在处理并确认前一个消息之前,不要向我发送新消息,你应该将消息发给不忙的其他消费者。
int prefetchCount = 1 ;
channel.basicQos(prefetchCount);
注意队列大小
如果所有的消费者都忙,队列会被填满。这个时候你应该增加新的消费者或者其他的方式去消耗队列中的消息。
NewTask.java类的最终代码:
1 package com.rabbitMQ; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 import com.rabbitmq.client.MessageProperties; 7 8 public class NewTask_fairDispatch { 9 10 private final static String QUEUE_NAME = "task_queue"; 11 12 public static void main(String[] args) throws Exception { 13 // 创建连接工厂 14 ConnectionFactory factory = new ConnectionFactory(); 15 // 设置连接rabbitMQ服务器的ip 16 factory.setHost("localhost"); 17 // factory.setPort(5672); 18 // 创建一个连接到服务器的链接 19 Connection connection = factory.newConnection(); 20 // 创建连接通道 21 Channel channel = connection.createChannel(); 22 23 24 boolean durable = true; 25 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); 26 27 String message = getMessage(args); 28 29 //将队列中的信息定义为可持久化的纯文本 30 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 31 32 System.out.println(" [x] Sent '" + message + "'"); 33 34 channel.close(); 35 36 connection.close(); 37 } 38 39 private static String getMessage(String[] strings) { 40 if (strings.length < 1) 41 return "Hello World!"; 42 return joinStrings(strings, " "); 43 } 44 45 private static String joinStrings(String[] strings, String delimiter) { 46 int length = strings.length; 47 if (length == 0) 48 return ""; 49 StringBuilder words = new StringBuilder(strings[0]); 50 for (int i = 1; i < length; i++) { 51 words.append(delimiter).append(strings[i]); 52 } 53 return words.toString(); 54 } 55 56 }
Worker.java:
1 package com.rabbitMQ; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 import com.rabbitmq.client.Consumer; 10 import com.rabbitmq.client.DefaultConsumer; 11 import com.rabbitmq.client.Envelope; 12 13 /** 14 * @author may 15 * 16 */ 17 public class Worker_fairDispatch { 18 19 private final static String QUEUE_NAME = "hello"; 20 21 public static void main(String[] argv) throws Exception { 22 ConnectionFactory factory = new ConnectionFactory(); 23 factory.setHost("localhost"); 24 Connection connection = factory.newConnection(); 25 Channel channel = connection.createChannel(); 26 int prefetchCount = 1; 27 //服务传送的最大消息数量,0表示不限制 28 channel.basicQos(prefetchCount); 29 30 boolean durable = true; 31 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); 32 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 33 // 定义一个消费者 34 final Consumer consumer = new DefaultConsumer(channel) { 35 @Override 36 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 37 byte[] body) throws IOException { 38 String message = new String(body, "UTF-8"); 39 40 System.out.println(" [x] Received '" + message + "'"); 41 try { 42 doWork(message); 43 } catch (InterruptedException e) { 44 // TODO Auto-generated catch block 45 e.printStackTrace(); 46 } finally { 47 System.out.println(" [x] Done"); 48 //确认消息,表示任务已经处理完成 49 channel.basicAck(envelope.getDeliveryTag(), false); 50 } 51 } 52 }; 53 54 boolean autoAck = false; 55 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 56 57 } 58 59 private static void doWork(String task) throws InterruptedException { 60 for (char ch : task.toCharArray()) { 61 if (ch == '.') 62 Thread.sleep(10000); 63 } 64 } 65 66 }