首先编写一个工作队列的生产者:
发送10条消息然后就关闭,10条消息让RabbitMQ先存着
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class WorkQueueInProducer { /** * 工作队列 * @param args */ public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.121"); connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672 connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 / connectionFactory.setUsername("test"); // guest connectionFactory.setPassword("123456"); // guest Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue", true, false, false, null); for (int i = 0; i < 10; i++) { String body = "send workQueue msg" + i; channel.basicPublish("", "work_queue", null, body.getBytes(StandardCharsets.UTF_8)); } channel.close(); connection.close(); } }
然后创建两个消费者:
两个消费者的代码是一样的,就是接收消息打印即可
但是不要关闭
package cn.dzz.workQueue; import com.rabbitmq.client.*; import java.io.IOException; import java.nio.charset.StandardCharsets; public class WorkQueueInConsumer2 { /** * 工作队列 消费者 * @param args */ public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.2.121"); connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672 connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 / connectionFactory.setUsername("test"); // guest connectionFactory.setPassword("123456"); // guest Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue", true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body(message) " + new String(body, StandardCharsets.UTF_8)); System.out.println("- - - - - over - - - - -"); } }; channel.basicConsume("work_queue", true, consumer); } }
先打开两个消费者:
然后再执行生产者发送消息:
RabbitMQ立即把生产者的消息分配过来给两个消费者:
消费者1输出
"C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=50684:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService argetclasses;C:UsersAdministrator.m2 epositorycom abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2 epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.workQueue.WorkQueueInConsumer1 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. body(message) send workQueue msg0 - - - - - over - - - - - body(message) send workQueue msg2 - - - - - over - - - - - body(message) send workQueue msg4 - - - - - over - - - - - body(message) send workQueue msg6 - - - - - over - - - - - body(message) send workQueue msg8 - - - - - over - - - - -
消费者2输出:
"C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=50693:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService argetclasses;C:UsersAdministrator.m2 epositorycom abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2 epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.workQueue.WorkQueueInConsumer2 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. body(message) send workQueue msg1 - - - - - over - - - - - body(message) send workQueue msg3 - - - - - over - - - - - body(message) send workQueue msg5 - - - - - over - - - - - body(message) send workQueue msg7 - - - - - over - - - - - body(message) send workQueue msg9 - - - - - over - - - - -
主要的作用是为了分摊这个10个消息
消费者之间是处于竞争关系,争夺消息的接收
工作队列用于任务过重的场景,用来提高任务处理速度