1、MQ介绍
1.1 什么是MQ
消息队列是一种通过典型的生产者和消费者模型,生产者向消息队列发送消息,消费者从消息队列获取消息。进行异步的发送和接受,实现系统解耦的方案。也较消息中间件。
1.2 常见MQ
常见的mq如下:
* ActiveMq
* Rabbitmq
* Kafka
* RocketMq
1.3 不同MQ的特点
2、RabbitMq介绍
2.1 RabbitMq
RabbitMq是基于AMQP协议,erlang语言开发,是部署最广泛的消息中间件之一。
AMQP协议
2.2 RabbitMq安装
也可以使用yum安装
当使用3.8时,没有config文件,我们可以新建一个用户赋予administrator权限,可以在非localhost地方访问。
步骤4:执行下面的命令创建一个用户
rabbitmqctl add_user 用户名 密码
步骤5:执行下面的命令设置用户为超级管理员。
rabbitmqctl set_user_tags 用户名 administrator
以上操作完成重启 service rabbitmq-server restart
用新账号登录即可.
2.3 RabbitMq管理命令行
2、rabbitMq消息模型
rabbitmq中的虚拟主机相当于数据库的database,web应用的/,是一个server下提供的不同区域,相互不干扰。
2.1 基础的发送接受编码
-
直连型
发送
` @Test
public void SendMessage() throws IOException, TimeoutException {//创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置连接的主机 connectionFactory.setHost("192.168.1.104"); //设置端口号 connectionFactory.setPort(5672); //设置访问的虚拟主机 connectionFactory.setVirtualHost("/ems"); //设置用户、密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取连接中通道 Channel channel = connection.createChannel(); //通道绑定消息队列 //参数: 1 队列名称,没有则创建 ; // 2 定义是否持久化,true为需要持久化; // 3 是否为独占队列,true代表不可以被其余连接使用, // 4 是否在消费完成后删除,true为是 // 5 附加参数 channel.queueDeclare("hello",false,false,false,null); //发布消息 //1 交换机名称 //2 队列名称 //3 额外设置 //4 消息具体内容 channel.basicPublish("","hello",null,"hello rabbitmq".getBytes()); channel.close(); connection.close();`
接受
` public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接的主机
connectionFactory.setHost("192.168.1.104");
//设置端口号
connectionFactory.setPort(5672);
//设置访问的虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置用户、密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定消息队列
//参数: 1 队列名称,没有则创建 ;
// 2 定义是否持久化,true为需要持久化;
// 3 是否为独占队列,true代表不可以被其余连接使用,
// 4 是否在消费完成后删除,true为是
// 5 附加参数
channel.queueDeclare("hello",false,false,false,null);
//消费消息
//1 消费的队列名称
//2 消息的自动确认机制
//3 消费时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body"+new String(body));
}
});
Thread.sleep(3000);
channel.close();
connection.close();
}`
持久化改为true后队列没有丢失,消息丢失了?
-
work-queue模型
一个生产者,多个消费者的模式
` public static void main(String[] args) throws Exception {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();//一次消费一条 channel.basicQos(1); channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1====" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); /* try { Thread.sleep(1000); }catch (Exception e){ e.printStackTrace(); }*/ } });
}`
消息手动确认
-
Fanout模式、广播订阅模式
生产者
` public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();//定义交换机 channel.exchangeDeclare("logs","fanout"); //发布消息 channel.basicPublish("logs","",null,"fanout message".getBytes()); channel.close(); connection.close();
}
消费者
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();//绑定交换机,不是必要的 channel.exchangeDeclare("logs","fanout"); String queue = channel.queueDeclare().getQueue(); //绑定队列 channel.queueBind(queue,"logs",""); channel.basicConsume(queue,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2====" + new String(body)); } });
}`
-
路由模型
1、直连模型
2、Topic模型
发送
` public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//申明交换机为topic类型
channel.exchangeDeclare("topics","topic");
String key = "user.haha.xjtu";
channel.basicPublish("topics",key,null,"routingKey gag Message".getBytes());
channel.close();
connection.close();
}
接受
public static void main(String[] args) throws Exception {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1====" + new String(body));
}
});
}`
3 springboot整合rabbitmq
![](https://img2020.cnblogs.com/blog/1614406/202102/1614406-20210201201735719-1793305655.png)
4 MQ应用场景
1 异步
2 解耦
3 削峰
5 rabbitMQ集群
5.1 普通集群
当集群中某一时刻master节点宕机,可以对queue中的信息进行备份。从节点不能自动升级为主节点