官网:https://www.rabbitmq.com/
参考:https://blog.csdn.net/hellozpc/article/details/81436980#52_204
一.消息中间件的作用
异步处理
应用解耦
流量削峰
日志处理
二.rabbitmq 安装与配置
下载:https://www.rabbitmq.com/download.html
在cmd 窗口输入:
rabbitmq-service start
rabbitmq-plugins enable rabbitmq_management
浏览器输入:localhost:15672 guest/guest
三. Java 操作 rabbitmq
(1) simple 简单队列
添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency>
定义连接工具类:
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("testhost");
factory.setUsername("admin");
factory.setPassword("admin");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
定义生产者:
public class Send {
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
}
定义消费者:
public class Recv {
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
(2)work queue 工作队列
一个生产者,多个消费者
消费者01:
public class Consumer01 {
private static final String queue = "my_queue";
public static void main(String []args) throws Exception{
Connection connection = MqConnection.getConnect();
Channel channel = connection.createChannel();
channel.queueDeclare(queue,false,false,false,null);
// channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
// channel.basicConsume(queue,false,consumer);
channel.basicConsume(queue,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("Consum-01-[Receive]:"+msg);
Thread.sleep(10);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者02:
public class Consumer02 { private static final String queue = "my_queue"; public static void main(String []args) throws Exception{ Connection connection = MqConnection.getConnect(); Channel channel = connection.createChannel(); channel.queueDeclare(queue,false,false,false,null); //channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); //channel.basicConsume(queue,false,consumer); channel.basicConsume(queue,true,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("Consum-02-[Receive]:"+msg); Thread.sleep(500); //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
生产者:
public class Producer { private static final String queue = "my_queue"; public static void main(String []args) throws Exception{ Connection connection = MqConnection.getConnect(); Channel channel = connection.createChannel(); channel.queueDeclare(queue,false,false,false,null); for(int i=0;i<50;i++){ String msg = "第"+i+"条消息。。。。。"; channel.basicPublish("",queue,null,msg.getBytes()); System.out.println("[发送"+i+"消息]:"+msg); Thread.sleep(10); } channel.close(); connection.close(); } }
结果:
1.消费者01 与消费者02 获取的内容不同,同一个消息只能被一个消费者获取
2.消费者01 与消费者02 获取的消息的相等的
不合理: 消费者01 处理的时间更短,可以获取更多的消息
机质:
轮询分发(round-robin):使用任务队列可以并行的工作。默认的情况下,rabbitmq 将诸葛发送到序列中的消费者,不考虑每个任务的时间,且是提前一次性分配。每个
消费者获取相等数量的消息,这种方式分发消息机制成为 Round-Robin 轮询
虽然上面的分配方法可行,但如果某个任务时间长,别的消费者比较闲
解决方法:
使用basicQos(prefetchCount = 1) ,限制 RabbitMQ 只发送不超过 1条的消息给同一个消费者。当消息处理完毕后才发送第二条。
能者多劳:
// 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1);
//开启这行 表示使用手动确认模式 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 监听队列,false表示手动返回完成状态,true表示自动 channel.basicConsume(QUEUE_NAME, false, consumer);
(3)publish/subscribe 发布/订阅
1.一个生产者,多个消费者
2.每个消费者都有自己的一个队列
3.每个队列都要绑定到交换机
4.生产者将消息发送到交换机
5.当消息发送到没有队列绑定的交换机时,消息丢失。因为交换机没有存储消息的能力,下拍戏只能存在队列种。
生产者:向交换机发送消息
public class Product { private static final String EXCHANGE_Name = "text_exchange_fanout"; public static void main(String []args) throws Exception{ Connection connection = MqConnection.getConnect(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_Name,"fanout"); String msg = "hello,world"; channel.basicPublish(EXCHANGE_Name,"",null,msg.getBytes()); System.out.println("[Send]:"+msg); channel.close(); connection.close(); } }
消费者01:
public class Consumer01 { private static final String EXCHANGE_Name = "text_exchange_fanout"; private static final String QUEUE_Name = "my_queue01"; public static void main(String []args) throws Exception{ Connection connection = MqConnection.getConnect(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_Name,false,false,false,null); channel.queueBind(QUEUE_Name,EXCHANGE_Name,""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_Name,true,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("[Recive01]:"+msg); Thread.sleep(10); } } }
消费者02:
public class Consumer02 { private static final String EXCHANGE_Name = "text_exchange_fanout"; private static final String QUEUE_Name = "my_queue02"; public static void main(String []args) throws Exception{ Connection connection = MqConnection.getConnect(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_Name,false,false,false,null); channel.queueBind(QUEUE_Name,EXCHANGE_Name,""); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_Name,true,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("[Recive02]:"+msg); Thread.sleep(100); } } }
结果:当生产者发送一条消息时,多个消费者可以获取到消息。一个消费者队列可以有多个消费者实例,只要其中一个消费者实例会消费到消息。
(4)routing 路由选择
生产者:
public class Producer { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String []args) throws Exception{ Connection connection = MqConnection.getConnect(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"direct"); String msg = "hello,你哈"; channel.basicPublish(EXCHANGE_NAME,"update",null,msg.getBytes()); System.out.println("[send]:"+msg); channel.close(); connection.close(); } }
消费者01:
1 public class Consumer01 { 2 private static final String EXCHANGE_NAME = "test_exchange_direct"; 3 private static final String Queue_name = "queue_01"; 4 5 public static void main(String []args) throws Exception{ 6 Connection connection = MqConnection.getConnect(); 7 Channel channel = connection.createChannel(); 8 9 channel.queueDeclare(Queue_name,false,false,false,null); 10 11 channel.queueBind(Queue_name,EXCHANGE_NAME,"select"); 12 channel.queueBind(Queue_name,EXCHANGE_NAME,"delete"); 13 14 channel.basicQos(1); 15 16 QueueingConsumer consumer = new QueueingConsumer(channel); 17 18 channel.basicConsume(Queue_name,true,consumer); 19 20 while(true){ 21 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 22 String msg = new String(delivery.getBody()); 23 System.out.println("[queue_01]:"+msg); 24 Thread.sleep(10); 25 26 } 27 } 28 }
消费者02:
1 public class Consumer02 { 2 private static final String EXCHANGE_NAME = "test_exchange_direct"; 3 private static final String Queue_name = "queue_02"; 4 5 public static void main(String []args) throws Exception{ 6 Connection connection = MqConnection.getConnect(); 7 Channel channel = connection.createChannel(); 8 9 channel.queueDeclare(Queue_name,false,false,false,null); 10 11 channel.queueBind(Queue_name,EXCHANGE_NAME,"select"); 12 channel.queueBind(Queue_name,EXCHANGE_NAME,"update"); 13 14 channel.basicQos(1); 15 16 QueueingConsumer consumer = new QueueingConsumer(channel); 17 18 channel.basicConsume(Queue_name,true,consumer); 19 20 while(true){ 21 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 22 String msg = new String(delivery.getBody()); 23 System.out.println("[queue_02]:"+msg); 24 Thread.sleep(10); 25 26 } 27 } 28 }
结果:生产者产生的消息会附带key, 消费者也会附带key,当生产者的key == 消费者的key 才能让消费者获得消息。
(5)Topics 主题
同一个消息被多个消费者获取。
消息生产者:
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 5 public static void main(String []args) throws Exception{ 6 7 Connection connection = MqConnection.getConnect(); 8 Channel channel = connection.createChannel(); 9 10 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); 11 12 String msg = "Hello,World"; 13 14 channel.basicPublish(EXCHANGE_NAME,"emp.update.del",null,msg.getBytes()); 15 System.out.println("[Send]:"+msg); 16 17 channel.close(); 18 connection.close(); 19 } 20 21 }
消费者01:
1 public class Rec01 { 2 private final static String EXCHANGE_NAME = "test_exchange_topic"; 3 private final static String QUEUE_NAME = "test_queue_topic01"; 4 5 6 public static void main(String []args) throws Exception{ 7 Connection connection = MqConnection.getConnect(); 8 Channel channel = connection.createChannel(); 9 10 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 11 12 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"emp.*"); 13 14 channel.basicQos(1); 15 16 QueueingConsumer consumer = new QueueingConsumer(channel); 17 channel.basicConsume(QUEUE_NAME,false,consumer); 18 19 while(true){ 20 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 21 String msg = new String(delivery.getBody()); 22 System.out.println("[rec]01:"+msg); 23 Thread.sleep(100); 24 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 25 } 26 } 27 }
消费者02:
1 public class Rec02 { 2 private final static String EXCHANGE_NAME = "test_exchange_topic"; 3 private final static String QUEUE_NAME = "test_queue_topic02"; 4 5 6 public static void main(String []args) throws Exception{ 7 Connection connection = MqConnection.getConnect(); 8 Channel channel = connection.createChannel(); 9 10 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 11 12 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"emp.#"); 13 14 channel.basicQos(1); 15 16 QueueingConsumer consumer = new QueueingConsumer(channel); 17 channel.basicConsume(QUEUE_NAME,false,consumer); 18 19 while(true){ 20 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 21 String msg = new String(delivery.getBody()); 22 System.out.println("[rec]02:"+msg); 23 Thread.sleep(500); 24 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 25 } 26 } 27 }
* :一个,#一个的或者多个