什么是RabbitMQ就是一个队列
优点
1.提高系统响应速度
我们发送请求然后直接通过Rabbit获得响应不用看我们数据是否在数据库中查到了:如签票软件会你点击过后直接给你一个正在抢票的界面
2.提高系统稳定性
如果服务挂了不关紧要,如上面所述
3.服务异步调用
也如1.所述我们的响应和我们的反应没有了直接关系
4.服务解耦
差不多了
5.保证顺序FIFO
队列特征
6.消除峰值
把一次100行请求分为一次5个请求
安装
RabbitMQ由Erlang语言开发
RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
Erlang下载
地址:http://erlang.org/download/otp
安装可视化界面
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ ,进入到Rabbit的sbin目录,使用cmd执行命令: rabbitmq-plugins.bat enable rabbitmq_management , 安装成功后重新RabbitMQ
进入浏览器,输入:http://localhost:15672 ,初始账号和密码:guest/guest
如果你的C盘下:如我的中文路径
则需要修改参考如下
解决方案:https://blog.csdn.net/leoma2012/article/details/97636859
工作原理
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
1.1.1. 消息发布接收流程:
1.发送消息
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
2.接收消息
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
单模型
发送端操作流程
1)创建连接
2)创建通道
3)声明队列
4)发送消息
接收端
1)创建连接
2)创建通道
3)声明队列
4)监听队列
5)接收消息
6)ack回复
发送者

public class Sender { public static final String QUERY_HELLO_WORD="hello_word"; //创建连接对象 //通过连接对象获得通道 //通过通道创建队列 //发送消息 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //3.使用通道创建(声明)队列 ,参数如下: //String queue,:队列名字 // boolean durable:持久化队列 // boolean exclusive: 队列是否独占此连接 // boolean autoDelete:用完之后自动删除队列 // Map<String, Object> arguments :其他的参数 channel.queueDeclare(QUERY_HELLO_WORD,true,false,true,null); /** * 消息发布方法 *String exchange, :交换机名字 "" 使用默认 *String routingKey, :队列的路由key(对列的名字) *AMQP.BasicProperties props,:其他属性 *byte[] body :发送的消息的内容 */ String mes = "hellowrod"; channel.basicPublish("",QUERY_HELLO_WORD,null,mes.getBytes()); System.out.println("发送消息。。。。"); } }
接收者

public class Consutom { //创建连接对象 //通过连接对象获得通道 //接受消息 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //定义消费方法 DefaultConsumer consumer = new DefaultConsumer(channel){ /** * 消费者接收消息调用此方法 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送) * @param properties * @param body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息。。。。。。。。。。。。"); System.out.println("consumerTagd的内容"+consumerTag); System.out.println("getExchange的内容"+envelope.getExchange()); System.out.println("getDeliveryTag的内容"+envelope.getDeliveryTag()); System.out.println("getRoutingKey的内容"+envelope.getRoutingKey()); System.out.println("properties"+properties); System.out.println("consumerTagd的内容"+new String(body)); } }; /** * 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称 * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复 * 3、消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(Sender.QUERY_HELLO_WORD,true,consumer); } }
就是两个消费者一个队列
send

package cn.jiedada._02_word_quary; import cn.jiedada.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { public static final String QUERY_WORK_QUARY="query_work_quary"; //创建连接对象 //通过连接对象获得通道 //通过通道创建队列 //发送消息 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /** * 声明队列,如果Rabbit中没有此队列将自动创建 * param1:队列名称 * param2:是否持久化 * param3:队列是否独占此连接 * param4:队列不再使用时是否自动删除此队列 * param5:队列参数 */ channel.queueDeclare(QUERY_WORK_QUARY,true,false,true,null); /** * 消息发布方法 * param1:Exchange的名称,如果没有指定,则使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * param3:消息包含的属性 * param4:消息体 */ String mes = "杰大大真的帅"; channel.basicPublish("",QUERY_WORK_QUARY,null,mes.getBytes()); System.out.println("发送消息。。。。"); } }
接收者在复制一份就可以了

package cn.jiedada._02_word_quary; import cn.jiedada.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consutom2 { //创建连接对象 //通过连接对象获得通道 //接受消息 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //定义消费方法 DefaultConsumer consumer = new DefaultConsumer(channel){ /** * 消费者接收消息调用此方法 * @param consumerTag 消费者的标签,在channel.basicConsume()去指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送) * @param properties * @param body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consutem2+收到了"); System.out.println("consumerTagd的内容"+consumerTag); System.out.println("getExchange的内容"+envelope.getExchange()); System.out.println("getDeliveryTag的内容"+envelope.getDeliveryTag()); System.out.println("getRoutingKey的内容"+envelope.getRoutingKey()); System.out.println("properties"+properties); System.out.println("consumerTagd的内容"+new String(body)); System.out.println("............................."); //手动签收消息 //long deliveryTag, :交货的ID //boolean multiple:false,签收当前则一个消息 channel.basicAck(envelope.getDeliveryTag(),false); } }; /** * 监听队列String queue, boolean autoAck,Consumer callback * 参数明细 * 1、队列名称 * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复 * 3、消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(Sender.QUERY_WORK_QUARY,false,consumer); } }
订阅模型基本逻辑都这样
这里我们在发送者中创建交换机
然后接收者创建队列来接收
只不过创建的交换机不一样罢了分为Faout,direct,topic三种
send

package cn.jiedada._03_fanout; import cn.jiedada.utils.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { public static final String NAME_EXCHANGE_FANOUT = "name_exchange_fanout"; //1.使用工具创建连接对象 //2.使用连接创建通道对象 //3.使用通道创建交换机Fanout类型 //4.准备消息内容 //5.发送消息到MQ不指定队列 public static void main(String[] args) throws Exception { //1.使用工具创建连接对象 Connection connection = ConnectionUtil.getConnection(); //2.使用连接创建通道对象 Channel channel = connection.createChannel(); //3.3.使用通道创建交换机[变化] :广播 channel.exchangeDeclare(NAME_EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT); //4.准备消息内容 String messgae = "is a fanout message over!!!"; //5.发送消息到MQ //String exchange, :交换机名字 "" 使用默认 //String routingKey, :队列的路由key(对列的名字) //AMQP.BasicProperties props,:其他属性 //byte[] body :发送的消息的内容 channel.basicPublish(NAME_EXCHANGE_FANOUT,"",null,messgae.getBytes()); System.out.println("Sender消息发送完毕..."); } }
接收者

package cn.jiedada._03_fanout; import cn.jiedada.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consutom { public static final String NAME_QUEUE_FANOUT_C1 = "name_queue_fanout_c1"; //1.使用工具创建连接对象 //2.使用连接创建通道对象 //3.使用通道创建队列【变化】 //4.把队列绑定到交换机【变化,routingkey不写】 //5.准备消息处理回调 //6.监听队列,消费消息 public static void main(String[] args) throws Exception { //1.使用工具创建连接对象 Connection connection = ConnectionUtil.getConnection(); //2.使用连接创建通道对象 final Channel channel = connection.createChannel(); //同时消费一个消息 channel.basicQos(1); //3.使用通道创建队列【变化】 channel.queueDeclare( NAME_QUEUE_FANOUT_C1, true, false, false, null ); //把队列绑定到交换机【变化】 //String queue,队列名 //String exchange,绑定的交换机名 //String routingKey,队列的routingkey,广播模式,不写 channel.queueBind( NAME_QUEUE_FANOUT_C1, Sender.NAME_EXCHANGE_FANOUT, ""); //5.准备消息处理回调 Consumer callback = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Receiver1-1:收到消息....."); //消费者ID System.out.println("consumerTag:"+consumerTag); //交换机 System.out.println("getExchange:"+envelope.getExchange()); System.out.println("getRoutingKey:"+envelope.getRoutingKey()); //收货ID(消息ID) System.out.println("getDeliveryTag:"+envelope.getDeliveryTag()); System.out.println("消息内容:"+new String(body)); System.out.println("Receiver1-1:消息完毕............."); //手动签收消息 //long deliveryTag, :交货的ID //boolean multiple:false,签收当前则一个消息 channel.basicAck(envelope.getDeliveryTag(),false); } }; //4.监听队列,消费消息 //String queue:消费的队列的名字 //boolean autoAck:自动签收消息,队列把消息推送给消费者之后,自动删除消息, // 可能会出问题:如果消费失败,消息也会被删除,会出现消息丢失,手动签收可以解决 //Consumer callback :拿到消息之后的处理器的回调 //channel.basicConsume(Sender.NAME_QUEUE_HELLOWORLD,true,callback); channel.basicConsume(NAME_QUEUE_FANOUT_C1,false,callback); } }
重要的三个持久化,一般来说需要设置
交换机持久化
队列持久化
消息持久化