1、RBMQ安装
rabbitmq-docker 20200402
https://www.cnblogs.com/smallfa/p/12617308.html
https://group.cnblogs.com/topic/92616.html
http://192.168.157.128:15672 root/root
2、MQ中间件
异步、应用解藕、消量削峰
消息:发送者-->代理(message broker)-->目的地(destination)
形式:队列(queue):点对点信息通信(point-to-point)
不限一个接收者
主题(topic):发布(pubbish)/订阅(subscribe)通信
多接收者(订阅者)监听(订阅)这个主题
JMS:于JVM消息代理规范;ActiveMQ,HomeMQ
Java Api 非跨语言非跨平台
model:1)、Peer-2-Peer
2)、Pub/Sub
消息类型:TextMessage、MapMessage、BytesMessage、
StreamMessage、ObjectMessage、Message(消息头和属性)
评价:JavaApi,多Client通JMS进行交互
AMQP:高级信息队列协义;兼容JMS;RabbitMQ
网络协义 跨语言跨平台
model:1)、direct exchange Peer-2-Peer
2)、fanout exchange Pub/Sub+路由细分
3)、topic exchange Pub/Sub+路由细分
4)、headers exchange Pub/Sub+路由细分
5)、system exchange Pub/Sub+路由细分
消息类型:byte[]
当实际应用复杂消息可以序列化发送
评价:Write-level协义
Spring:
spring-jms提供了对JMS的支持
spring-rabbit提供了对AMQP的支持
ConnectionFactory连消息代理
提供JmsTemplate、RabbitTemplate发消息
@JmsListener(JMS)、RabbitListener(AMQP)注解在
方法监听消息代理发布的信息
@EnableJms、@EnableRabbit 开启支持
Sping Boot自配置
JmsAutoConfiguration
RabbitAutoCOnfiguration
3、RabbitMQ
####erlang##################################
####Exchange路由健表########################
####Publisher>代理(message broker)-->Consumer
Massage:消息头+消息体
消化头:可选属性(routing-key路由键、priority优先权、delivery-mode是否持久性
Publisher:生产者=>交换器
Exchange:生产者=>交换器=>队列
direct(默认)、fanout、topic、headers 策略
Queue:队列(容器、终点)
Binding:Exchange<=n:n=>Queue
Connection:TCP(Channel1....Channeln)
Channel:双向数道(多路复用),虚连(复用TCP)信道
Consumer:消费者 取消息
Virtual Host:虚交换机
Broker:队列服务器实体
Virtual Host(Exchange-Binding-Queue)
4、RabbitMQ机制
Binding:Exchange<=n:n=>Queue
Exchange:生产者=>交换器=>队列
direct(默认)、fanout、topic、headers 策略
headers:header非路由键,与direct完全一致但性能差多基本不用
direct:信息-路由键《=》binding key(Queue名)
信息-dog《=》binding key(dog) 完匹配单播
routing key=key5
brker.......................................
exchange:...................................
bindings:...................................
queues:.....key1..key2...key3...key4...key5.
fanout:信息《=》exchange binding (Queue_n)
信息-dog《=》exchange binding (Queue_n) 速度广播
Message
brker.......................................
exchange:...................................
bindings:...................................
queues:.....queue1..queue2...queue3...queue4.
fanout广播 与Routing key 无关可以不填如下图
topic:信息-模式匹配_路由键《=》binding key(模式匹配_Queue名)
通配符匹配单词数:
#:>=0个
*:1个
Message
routing key= routing key=
usa.news usa.weather
brker.......................................
exchange:...................................
bindings:...................................
queues:.....bindings key= bindings key=
usa.# #.news
5、RabbitMQ SpringRoot
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
spring.rabbitmq.host=192.168.157.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
@EnableRabbit
@SpringBootApplication
package com.daihaiwuliang.rabbitmq;
import com.daihaiwuliang.rabbitmq.bean.Book;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
//rabbitTemplate.convertAndSend("exchange.direct","US.news","exchange.direct Springboot发送信息");
// rabbitTemplate.convertAndSend("exchange.direct","China",new Book("新西记","吴大因"));
}
@Test//exchange.fanout
public void sendMsg(){
rabbitTemplate.convertAndSend("exchange.fanout",new Book("新西记","吴大因"));
}
@Test
public void receiveMsg(){
Object object=rabbitTemplate.receiveAndConvert("China");
System.out.println(object.getClass());
System.out.println(object);
}
/***
* 管理组件
*/
@Test
public void testAmqpAdmin(){
//amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));
//amqpAdmin.declareQueue(new Queue("amqpAdmin.queuel",true));
//amqpAdmin.declareBinding(new Binding("amqpAdmin.queuel", Binding.DestinationType.QUEUE,"amqpAdmin.exchange",
// "amqpAdmin.aa",null));
// amqpAdmin.declareQueue(new Queue("amqpAdmin.queuel2",true));
//System.out.println("创建完成");
amqpAdmin.deleteQueue("amqpAdmin.queuel");
System.out.println("删除成功");
}
}
####实体序利化
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
@Configuration
public class AmqpConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
源码rabbitmq
package com.daihaiwuliang.rabbitmq.service;
import com.daihaiwuliang.rabbitmq.bean.Book;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class BookService {
@RabbitListener(queues = "China")
public void receive(Book book){
System.out.println("收到信息:"+book.toString());
}
@RabbitListener(queues = "China.news")
public void receive2(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}
}