zoukankan      html  css  js  c++  java
  • springboot集成rabbitmq(实战)

    RabbitMQ简介
    RabbitMQ使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现(AMQP的主要特征是面向消息、队列、路由、可靠性、安全)。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现很出色。

    相关概念

    消息队列通常有三个概念:发送消息(生产者)、队列、接收消息(消费者)。RabbitMQ在这个基本概念之上,多做了一层抽象,在发送消息和队列之间,加入了交换机。这样发送消息和队列就没有直接关系,而是通过交换机来做转发,交换机会根据分发策略把消息转给队列。

    图一(MQ基本模型):

    P为发送消息(生产者)、Q为消息队列、C为接收消息(消费者)

    图二(RabbitMQ模型):

    P为发送消息(生产者)、X为交换机、Q为消息队列、C为接收消息(消费者)

       

                                                  图一                             图二

    RabbitMQ比较重要的几个概念:

    虚拟主机:RabbitMQ支持权限控制,但是最小控制粒度为虚拟主机。一个虚拟主机可以包含多个交换机、队列、绑定。

    交换机:RabbitMQ分发器,根据不同的策略将消息分发到相关的队列。

    队列:缓存消息的容器。

    绑定:设置交换机与队列的关系。

    是时候表演真正的技术了

    为了方便演示,我们分别创建两个springboot项目:

    spring-boot-rabbitmq-producer(生产者)

    spring-boot-rabbitmq-consumer(消费者)

    注意:实际项目中,一个系统可能即为生产者、又为消费者。

    1.添加基础配置

    生产者、消费者基础配置相同。

    1.1)集成rabbitmq,添加maven依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

     1.2)添加rabbitmq服务配置(application.properties)

    #rabbitmq相关配置
    spring.rabbitmq.host=192.168.15.131
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123456

    2.交换机——DirectExchange

    DirectExchange是RabbitMQ的默认交换机,直接使用routingKey匹配队列。

    2.1)添加一个配置类(消费者)

    配置一个routingKey为notify.payment的消息队列

    @Configuration
    public class DirectConfig {
    	@Bean
    	public Queue paymentNotifyQueue() {
    		return new Queue("notify.payment");
    	}
    }

    2.2)添加一个消息监听类(消费者)

    监听routingKey为notify.payment的队列消息

    @Component
    @RabbitListener(queues = "notify.payment")
    public class PaymentNotifyReceive {
    	@RabbitHandler
    	public void receive(String msg) {
    		LogUtil.info("notify.payment receive message: "+msg);
    	}
    }

      

    2.3)添加一个消息发送类(生产者)

    将消息发送至默认的交换机且routingKey为notify.payment

    @Component
    public class PaymentNotifySender {
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    	
    	public void sender(String msg){
    		LogUtil.info("notify.payment send message: "+msg);
    		rabbitTemplate.convertAndSend("notify.payment", msg);
    	}
    }
    

      

    2.4)添加一个测试类(生产者)

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class PaymentNotifySenderTests {
    	@Autowired
    	private PaymentNotifySender sender;
    	
    	@Test
    	public void test_sender() {
    		sender.sender("支付订单号:"+System.currentTimeMillis());
    	}
    }
    

    2.5)执行test_sender()方法

    生产者日志:

    2018-05-14 16:28:53.264  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 10624 (started by lianjinsoft...
    2018-05-14 16:28:53.265  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
    2018-05-14 16:28:53.305  INFO 10624 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@...
    2018-05-14 16:28:54.133  INFO 10624 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
    2018-05-14 16:28:55.104  INFO 10624 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:28:55.114  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.246 seconds (JVM running for 3.199)
    2018-05-14 16:28:55.343  INFO 10624 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:1526286535342
    2018-05-14 16:28:55.383  INFO 10624 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:28:55.444  INFO 10624 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#38be305c:0/SimpleConnection@71984c3 ...
    2018-05-14 16:28:55.483  INFO 10624 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
    2018-05-14 16:28:55.485  INFO 10624 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者日志:

    2018-05-14 16:28:55.490  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:1526286535342

    分析日志:

    从生产者日志第7行可以看出,消息已经成功发送。

    从消费者日志可以看出,消息已经成功接收。

    3.交换机——TopicExchange

    TopicExchange是按规则转发消息,是交换机中最灵活的一个。也是最常用的一个。

    3.1)添加一个配置类(消费者)

    配置一个routingKey为api.core的消息队列并绑定在coreExchange交换机上(交换机的匹配规则为api.core.*)

    配置一个routingKey为api.payment的消息队列并绑定在paymentExchange交换机上(交换机的匹配规则为api.payment.#)

    @Configuration
    public class TopicConfig {
    	@Bean
    	public Queue coreQueue() {
    		return new Queue("api.core");
    	}
    	
    	@Bean
    	public Queue paymentQueue() {
    		return new Queue("api.payment");
    	}
    	
    	@Bean
    	public TopicExchange coreExchange() {
    		return new TopicExchange("coreExchange");
    	}
    	
    	@Bean
    	public TopicExchange paymentExchange() {
    		return new TopicExchange("paymentExchange");
    	}
    	
    	@Bean
    	public Binding bindingCoreExchange(Queue coreQueue, TopicExchange coreExchange) {
    		return BindingBuilder.bind(coreQueue).to(coreExchange).with("api.core.*");
    	}
    	
    	@Bean
    	public Binding bindingPaymentExchange(Queue paymentQueue, TopicExchange paymentExchange) {
    		return BindingBuilder.bind(paymentQueue).to(paymentExchange).with("api.payment.#");
    	}
    }
    

      

    3.2)添加两个消息监听类(消费者)

    监听routingKey为api.core的队列消息

    @Component
    public class ApiCoreReceive {
    	@RabbitHandler
    	@RabbitListener(queues = "api.core")
    	public void user(String msg) {
    		LogUtil.info("api.core receive message: "+msg);
    	}
    }
    

    监听routingKey为api.payment的队列消息

    @Component
    public class ApiPaymentReceive {
    	@RabbitHandler
    	@RabbitListener(queues = "api.payment")
    	public void order(String msg) {
    		LogUtil.info("api.payment.order receive message: "+msg);
    	}
    }
    

      

    3.3)添加两个消息发送类(生产者)

    添加一个user()方法,发送消息至coreExchange交换机且routingKey为api.core.user

    添加一个userQuery()方法,发送消息至coreExchange交换机且routingKey为api.core.user.query

    @Component
    public class ApiCoreSender {
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    	
    	public void user(String msg){
    		LogUtil.info("api.core.user send message: "+msg);
    		rabbitTemplate.convertAndSend("coreExchange", "api.core.user", msg);
    	}
    	
    	public void userQuery(String msg){
    		LogUtil.info("api.core.user.query send message: "+msg);
    		rabbitTemplate.convertAndSend("coreExchange", "api.core.user.query", msg);
    	}
    }

    添加一个order()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order

    添加一个orderQuery()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order.query

    添加一个orderDetailQuery()方法,发送消息至paymentExchange交换机且routingKey为api.payment.order.detail.query

    @Component
    public class ApiPaymentSender {
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    	
    	public void order(String msg){
    		LogUtil.info("api.payment.order send message: "+msg);
    		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order", msg);
    	}
    	
    	public void orderQuery(String msg){
    		LogUtil.info("api.payment.order.query send message: "+msg);
    		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.query", msg);
    	}
    	
    	public void orderDetailQuery(String msg){
    		LogUtil.info("api.payment.order.detail.query send message: "+msg);
    		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.detail.query", msg);
    	}
    }
    

      

    3.4)添加两个测试类(生产者)

    测试ApiCoreSender类中的相关方法

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApiCoreSenderTests {
    	@Autowired
    	private ApiCoreSender sender;
    	
    	@Test
    	public void test_user() {
    		sender.user("用户管理!");
    	}
    	
    	@Test
    	public void test_userQuery() {
    		sender.userQuery("查询用户信息!");
    	}
    }
    

    测试ApiPaymentSender类中的相关方法

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApiPaymentSenderTests {
    	@Autowired
    	private ApiPaymentSender sender;
    	
    	@Test
    	public void test_order() {
    		sender.order("订单管理!");
    	}
    	
    	@Test
    	public void test_orderQuery() {
    		sender.orderQuery("查询订单信息!");
    	}
    	
    	@Test
    	public void test_orderDetailQuery() {
    		sender.orderDetailQuery("查询订单详情信息!");
    	}
    }
    

     

    3.5)验证

    3.5.1)执行ApiCoreSenderTests测试类

    生产者日志:

    2018-05-14 16:30:05.804  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : Starting ApiCoreSenderTests on LAPTOP-1DF7S904 with PID 7340 (started by lianjinsoft in ...
    2018-05-14 16:30:05.805  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : No active profile set, falling back to default profiles: default
    2018-05-14 16:30:05.851  INFO 7340 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
    2018-05-14 16:30:06.553  INFO 7340 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
    2018-05-14 16:30:07.375  INFO 7340 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:30:07.385  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : Started ApiCoreSenderTests in 1.922 seconds (JVM running for 2.846)
    2018-05-14 16:30:07.431  INFO 7340 --- [           main] com.lianjinsoft.util.LogUtil             : api.core.user send message: 用户管理!
    2018-05-14 16:30:07.463  INFO 7340 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:30:07.578  INFO 7340 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
    2018-05-14 16:30:07.647  INFO 7340 --- [           main] com.lianjinsoft.util.LogUtil             : api.core.user.query send message: 查询用户信息!
    2018-05-14 16:30:07.716  INFO 7340 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
    2018-05-14 16:30:07.728  INFO 7340 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者日志:

    2018-05-14 16:30:07.609  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.core receive message: 用户管理!

    分析日志:

    从生产者日志第7、10行可以看出,api.core.user和api.core.user.query消息均已发送成功。

    从消费者日志可以看出,只有api.core.user发送的消息被收到了。

    问题:

    为什么api.core.user.query发送的消息没有被api.core队列监听消费?

    答:因为在TopicConfig配置类中,我们对api.core队列绑定的交换机规则是api.core.*,而通配符“*”只能向后多匹配一层路径。

    3.5.2)执行ApiPaymentSenderTests测试类

    生产者日志:

    2018-05-14 16:31:12.823  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : Starting ApiPaymentSenderTests on LAPTOP-1DF7S904 with PID 6460 (started by lianjinsoft in ...
    2018-05-14 16:31:12.823  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : No active profile set, falling back to default profiles: default
    2018-05-14 16:31:12.857  INFO 6460 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
    2018-05-14 16:31:13.718  INFO 6460 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
    2018-05-14 16:31:14.530  INFO 6460 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:31:14.539  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : Started ApiPaymentSenderTests in 2.05 seconds (JVM running for 2.945)
    2018-05-14 16:31:14.592  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order.query send message: 查询订单信息!
    2018-05-14 16:31:14.638  INFO 6460 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:31:14.762  INFO 6460 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#47404bea:0/SimpleConnection@6b54655f ...
    2018-05-14 16:31:14.819  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order.detail.query send message: 查询订单详情信息!
    2018-05-14 16:31:14.825  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order send message: 订单管理!
    2018-05-14 16:31:14.836  INFO 6460 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
    2018-05-14 16:31:14.840  INFO 6460 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者日志:

    2018-05-14 16:31:14.809  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 查询订单信息!
    2018-05-14 16:31:14.821  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 查询订单详情信息!
    2018-05-14 16:31:14.829  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 订单管理!

    分析日志:

    从生产者日志第7、10、11行可以看出,api.payment.order.query、api.payment.order.detail.query、api.payment.order消息均发送成功。

    从消费者日志可以看出,api.payment.order队列监听到了所有消息并均处理成功了。

    知识点:

    TopicExchange交换机支持使用通配符*、#

    *号只能向后多匹配一层路径。

    #号可以向后匹配多层路径。

    4.交换机——HeadersExchange

    HeadersExchange交换机是根据请求消息中设置的header attribute参数类型来匹配的(和routingKey没有关系)。

    4.1)添加一个配置类(消费者)

    配置一个routingKey为credit.bank的消息队列并绑定在creditBankExchange交换机上

    配置一个routingKey为credit.finance的消息队列并绑定在creditFinanceExchange交换机上

    @Configuration
    public class HeadersConfig {
    	@Bean
    	public Queue creditBankQueue() {
    		return new Queue("credit.bank");
    	}
    	
    	@Bean
    	public Queue creditFinanceQueue() {
    		return new Queue("credit.finance");
    	}
    	
    	@Bean
    	public HeadersExchange creditBankExchange() {
    		 return new HeadersExchange("creditBankExchange");
    	}
    	
    	@Bean
    	public HeadersExchange creditFinanceExchange() {
    		 return new HeadersExchange("creditFinanceExchange");
    	}
    	
    	@Bean
    	public Binding bindingCreditAExchange(Queue creditBankQueue, HeadersExchange creditBankExchange) {
    		Map<String,Object> headerValues = new HashMap<>();
    		headerValues.put("type", "cash");
    		headerValues.put("aging", "fast");
    		return BindingBuilder.bind(creditBankQueue).to(creditBankExchange).whereAll(headerValues).match();
    	}
    	
    	@Bean
    	public Binding bindingCreditBExchange(Queue creditFinanceQueue, HeadersExchange creditFinanceExchange) {
    		Map<String,Object> headerValues = new HashMap<>();
    		headerValues.put("type", "cash");
    		headerValues.put("aging", "fast");
    		return BindingBuilder.bind(creditFinanceQueue).to(creditFinanceExchange).whereAny(headerValues).match();
    	}
    }
    

      

    4.2)添加一个消息监听类(消费者)

    添加creditBank()方法,监听routingKey为credit.bank的队列消息

    添加creditFinance()方法,监听routingKey为credit.finance的队列消息

    @Component
    public class ApiCreditReceive {
    	@RabbitHandler
    	@RabbitListener(queues = "credit.bank")
    	public void creditBank(String msg) {
    		LogUtil.info("credit.bank receive message: "+msg);
    	}
    	
    	@RabbitHandler
    	@RabbitListener(queues = "credit.finance")
    	public void creditFinance(String msg) {
    		LogUtil.info("credit.finance receive message: "+msg);
    	}
    }
    

      

    4.3)添加一个消息发送类(生产者)

    添加一个creditBank()方法,发送消息至creditBankExchange交换机且routingKey为credit.bank

    添加一个creditFinance()方法,发送消息至creditFinanceExchange交换机且routingKey为credit.finance

    @Component
    public class ApiCreditSender {
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    	
    	public void creditBank(Map<String, Object> head, String msg){
    		LogUtil.info("credit.bank send message: "+msg);
    		rabbitTemplate.convertAndSend("creditBankExchange", "credit.bank", getMessage(head, msg));
    	}
    	
    	public void creditFinance(Map<String, Object> head, String msg){
    		LogUtil.info("credit.finance send message: "+msg);
    		rabbitTemplate.convertAndSend("creditFinanceExchange", "credit.finance", getMessage(head, msg));
    	}
    }
    

      

    4.4)添加一个测试类(生产者)

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApiCreditSenderTests {
    	@Autowired
    	private ApiCreditSender sender;
    	
    	@Test
    	public void test_creditBank_type() {
    		Map<String,Object> head = new HashMap<>();
    		head.put("type", "cash");
    		sender.creditBank(head, "银行授信(部分匹配)");
    	}
    	
    	@Test
    	public void test_creditBank_all() {
    		Map<String,Object> head = new HashMap<>();
    		head.put("type", "cash");
    		head.put("aging", "fast");
    		sender.creditBank(head, "银行授信(全部匹配)");
    	}
    	
    	@Test
    	public void test_creditFinance_type() {
    		Map<String,Object> head = new HashMap<>();
    		head.put("type", "cash");
    		sender.creditFinance(head, "金融公司授信(部分匹配)");
    	}
    	
    	@Test
    	public void test_creditFinance_all() {
    		Map<String,Object> head = new HashMap<>();
    		head.put("type", "cash");
    		head.put("aging", "fast");
    		sender.creditFinance(head, "金融公司授信(全部匹配)");
    	}
    }
    

      

    4.5)执行ApiCreditSenderTests测试类

    生产者日志:

    2018-05-14 16:32:18.954  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : Starting ApiCreditSenderTests on LAPTOP-1DF7S904 with PID 5204 (started by lianjinsoft in...
    2018-05-14 16:32:18.964  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : No active profile set, falling back to default profiles: default
    2018-05-14 16:32:19.007  INFO 5204 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
    2018-05-14 16:32:19.609  INFO 5204 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
    2018-05-14 16:32:20.437  INFO 5204 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:32:20.446  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : Started ApiCreditSenderTests in 1.839 seconds (JVM running for 2.759)
    2018-05-14 16:32:20.566  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.bank send message: 银行授信(部分匹配)
    2018-05-14 16:32:20.574  INFO 5204 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:32:20.666  INFO 5204 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
    2018-05-14 16:32:21.064  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.finance send message: 金融公司授信(全部匹配)
    2018-05-14 16:32:21.070  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.bank send message: 银行授信(全部匹配)
    2018-05-14 16:32:21.077  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.finance send message: 金融公司授信(部分匹配)
    2018-05-14 16:32:21.109  INFO 5204 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
    2018-05-14 16:32:21.114  INFO 5204 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者日志:

    2018-05-14 16:32:21.093  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.finance receive message: 金融公司授信(全部匹配)
    2018-05-14 16:32:21.094  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.bank receive message: 银行授信(全部匹配)
    2018-05-14 16:32:21.097  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.finance receive message: 金融公司授信(部分匹配)

    分析日志:

    通过生产者日志第7、10、11、12可以看出,测试的4个方法均已成功发送消息。

    通过消费者日志可以看出,credit.bank监听的队列有一条消息没有接收到。

    问题:

    为什么ApiCreditSenderTests.test_creditBank_type()发送的消息,没有被处理?

    答:因为在HeadersConfig配置类中,creditBankExchange交换机的匹配规则是完全匹配,即header attribute参数必须完成一致。

    5.交换机——FanoutExchange

    FanoutExchange交换机是转发消息到所有绑定队列(广播模式,和routingKey没有关系)。

    5.1)添加一个配置类(消费者)

    配置一个routingKey为api.report.payment的消息队列并绑定在reportExchange交换机上

    配置一个routingKey为api.report.refund的消息队列并绑定在reportExchange交换机上

    @Configuration
    public class FanoutConfig {
    	@Bean
    	public Queue reportPaymentQueue() {
    		return new Queue("api.report.payment");
    	}
    	
    	@Bean
    	public Queue reportRefundQueue() {
    		return new Queue("api.report.refund");
    	}
    	
    	@Bean
    	public FanoutExchange reportExchange() {
    		 return new FanoutExchange("reportExchange");
    	}
    	
    	@Bean
    	public Binding bindingReportPaymentExchange(Queue reportPaymentQueue, FanoutExchange reportExchange) {
    		return BindingBuilder.bind(reportPaymentQueue).to(reportExchange);
    	}
    	
    	@Bean
    	public Binding bindingReportRefundExchange(Queue reportRefundQueue, FanoutExchange reportExchange) {
    		return BindingBuilder.bind(reportRefundQueue).to(reportExchange);
    	}
    }
    

      

    5.2)添加一个消息监听类(消费者)

    添加payment()方法,监听routingKey为api.report.payment的队列消息

    添加refund()方法,监听routingKey为api.report.refund的队列消息

    @Component
    public class ApiReportReceive {
    	@RabbitHandler
    	@RabbitListener(queues = "api.report.payment")
    	public void payment(String msg) {
    		LogUtil.info("api.report.payment receive message: "+msg);
    	}
    	
    	@RabbitHandler
    	@RabbitListener(queues = "api.report.refund")
    	public void refund(String msg) {
    		LogUtil.info("api.report.refund receive message: "+msg);
    	}
    }
    

      

    5.3)添加一个消息发送类(生产者)

    添加一个generateReports()方法,发送消息至reportExchange交换机

    @Component
    public class ApiReportSender {
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    	
    	public void generateReports(String msg){
    		LogUtil.info("api.generate.reports send message: "+msg);
    		rabbitTemplate.convertAndSend("reportExchange", "api.generate.reports", msg);
    	}
    }
    

      

    5.4)添加一个测试类(生产者)

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApiReportSenderTests {
    	@Autowired
    	private ApiReportSender sender;
    	
    	@Test
    	public void test_generateReports() {
    		sender.generateReports("开始生成报表!");
    	}
    }
    

      

    5.5)执行ApiReportSenderTests测试类

    生产者日志:

    2018-05-14 16:33:41.453  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : Starting ApiReportSenderTests on LAPTOP-1DF7S904 with PID 14356 (started by lianjinsoft in ...
    2018-05-14 16:33:41.454  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : No active profile set, falling back to default profiles: default
    2018-05-14 16:33:41.490  INFO 14356 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
    2018-05-14 16:33:42.094  INFO 14356 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type...
    2018-05-14 16:33:42.960  INFO 14356 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:33:42.972  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : Started ApiReportSenderTests in 1.939 seconds (JVM running for 2.843)
    2018-05-14 16:33:43.037  INFO 14356 --- [           main] com.lianjinsoft.util.LogUtil             : api.generate.reports send message: 开始生成报表!
    2018-05-14 16:33:43.054  INFO 14356 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:33:43.174  INFO 14356 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
    2018-05-14 16:33:43.237  INFO 14356 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa:...
    2018-05-14 16:33:43.240  INFO 14356 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者日志:

    2018-05-14 16:33:43.205  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.report.payment receive message: 开始生成报表!
    2018-05-14 16:33:43.207  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.report.refund receive message: 开始生成报表!

    分析日志:

    通过生产者日志第7行可以看出,消息已发送成功。

    通过消费者日志可以看出,api.report.payment和api.report.refund队列均收到了同一个消息。

    6.多对一

    在实际项目中,我们的系统通常会做集群、分布式或灾备部署。那么就会出现一对多、多对一或多对多的场景。

    那么咱们本地如何模拟多对一呢?

    6.1)为了测试方便,我们复用PaymentNotifyReceive案例,在PaymentNotifySenderTests测试类中,增加测试方法

    test_sender_many2one_1:请求参数为偶数

    test_sender_many2one_2:请求参数为奇数

    @Test
    public void test_sender_many2one_1() throws Exception {
    	for (int i = 0; i < 20; i+=2) {
    		sender.sender("支付订单号:"+i);
    		Thread.sleep(1000);
    	}
    }
    
    @Test
    public void test_sender_many2one_2() throws Exception {
    	for (int i = 1; i < 20; i+=2) {
    		sender.sender("支付订单号:"+i);
    		Thread.sleep(1000);
    	}
    }
    

      

    6.2)执行test_sender_many2one_1()、test_sender_many2one_2()方法

    生产者1日志:

    2018-05-14 16:34:49.249  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 5064 (started by lianjinsoft in...
    2018-05-14 16:34:49.250  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
    2018-05-14 16:34:49.297  INFO 5064 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
    2018-05-14 16:34:49.989  INFO 5064 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
    2018-05-14 16:34:51.267  INFO 5064 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:34:51.293  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.449 seconds (JVM running for 3.366)
    2018-05-14 16:34:51.357  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:0
    2018-05-14 16:34:51.370  INFO 5064 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:34:51.817  INFO 5064 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f ...
    2018-05-14 16:34:52.866  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:2
    2018-05-14 16:34:53.870  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:4
    2018-05-14 16:34:54.870  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:6
    2018-05-14 16:34:55.871  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:8
    2018-05-14 16:34:56.872  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:10
    2018-05-14 16:34:57.872  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:12
    2018-05-14 16:34:58.873  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:14
    2018-05-14 16:34:59.875  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:16
    2018-05-14 16:35:00.876  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:18
    2018-05-14 16:35:01.882  INFO 5064 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
    2018-05-14 16:35:01.883  INFO 5064 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    生产者2日志:

    2018-05-14 16:34:52.689  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 13988 (started by lianjinsoft in...
    2018-05-14 16:34:52.690  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
    2018-05-14 16:34:52.738  INFO 13988 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
    2018-05-14 16:34:53.444  INFO 13988 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
    2018-05-14 16:34:54.567  INFO 13988 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:34:54.575  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.237 seconds (JVM running for 4.18)
    2018-05-14 16:34:54.788  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:1
    2018-05-14 16:34:54.796  INFO 13988 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:34:54.870  INFO 13988 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#305f7627:0/SimpleConnection@665e9289...
    2018-05-14 16:34:55.903  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:3
    2018-05-14 16:34:56.903  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:5
    2018-05-14 16:34:57.904  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:7
    2018-05-14 16:34:58.904  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:9
    2018-05-14 16:34:59.905  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:11
    2018-05-14 16:35:00.906  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:13
    2018-05-14 16:35:01.907  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:15
    2018-05-14 16:35:02.907  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:17
    2018-05-14 16:35:03.910  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:19
    2018-05-14 16:35:04.928  INFO 13988 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
    2018-05-14 16:35:04.932  INFO 13988 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者日志:

    2018-05-14 16:34:51.853  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:0
    2018-05-14 16:34:52.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:2
    2018-05-14 16:34:53.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:4
    2018-05-14 16:34:54.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:6
    2018-05-14 16:34:54.902  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:1
    2018-05-14 16:34:55.872  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:8
    2018-05-14 16:34:55.904  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:3
    2018-05-14 16:34:56.873  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:10
    2018-05-14 16:34:56.905  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:5
    2018-05-14 16:34:57.873  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:12
    2018-05-14 16:34:57.905  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:7
    2018-05-14 16:34:58.878  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:14
    2018-05-14 16:34:58.906  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:9
    2018-05-14 16:34:59.877  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:16
    2018-05-14 16:34:59.906  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:11
    2018-05-14 16:35:00.877  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:18
    2018-05-14 16:35:00.909  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:13
    2018-05-14 16:35:01.909  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:15
    2018-05-14 16:35:02.911  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:17
    2018-05-14 16:35:03.914  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:19

    分析日志:

    从生产者1、生产者2日志中可以看出,所有消息均已经发送成功。

    从消费者日志可以看出,所有消息均被成功接收处理。

    7.一对多

    如何模拟一对多?

    7.1)为了测试方便,我们继续复用PaymentNotifyReceive案例,在PaymentNotifySenderTests测试类中,增加测试方法

    test_sender_one2many:循环调用20次。

    @Test
    public void test_sender_one2many() {
    	for (int i = 0; i < 20; i++) {
    		sender.sender("支付订单号:"+i);
    	}
    }
    

      

    7.2)测试

    为了达到一对多的效果,我们需要多启动一个(或多个)消费者。然后执行test_sender_one2many()测试方法。

    生产者日志:

    2018-05-14 16:36:27.703  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 7508 (started by lianjinsoft in...
    2018-05-14 16:36:27.704  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
    2018-05-14 16:36:27.729  INFO 7508 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
    2018-05-14 16:36:28.391  INFO 7508 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type...
    2018-05-14 16:36:29.285  INFO 7508 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:36:29.303  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.197 seconds (JVM running for 3.097)
    2018-05-14 16:36:29.504  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:0
    2018-05-14 16:36:29.516  INFO 7508 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:36:29.635  INFO 7508 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#4f071df8:0/SimpleConnection@42a9e5d1...
    2018-05-14 16:36:29.672  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:1
    2018-05-14 16:36:29.679  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:2
    2018-05-14 16:36:29.705  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:3
    2018-05-14 16:36:29.707  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:4
    2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:5
    2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:6
    2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:7
    2018-05-14 16:36:29.713  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:8
    2018-05-14 16:36:29.719  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:9
    2018-05-14 16:36:29.728  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:10
    2018-05-14 16:36:29.729  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:11
    2018-05-14 16:36:29.733  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:12
    2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:13
    2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:14
    2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:15
    2018-05-14 16:36:29.741  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:16
    2018-05-14 16:36:29.742  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:17
    2018-05-14 16:36:29.742  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:18
    2018-05-14 16:36:29.744  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付订单号:19
    2018-05-14 16:36:29.762  INFO 7508 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
    2018-05-14 16:36:29.767  INFO 7508 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者1日志:

    2018-05-14 16:36:29.675  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:0
    2018-05-14 16:36:29.690  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:2
    2018-05-14 16:36:29.712  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:4
    2018-05-14 16:36:29.714  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:6
    2018-05-14 16:36:29.728  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:9
    2018-05-14 16:36:29.733  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:11
    2018-05-14 16:36:29.737  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:13
    2018-05-14 16:36:29.740  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:15
    2018-05-14 16:36:29.743  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:16
    2018-05-14 16:36:29.745  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:18

    消费者2日志:

    2018-05-14 16:36:29.705  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:1
    2018-05-14 16:36:29.709  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:3
    2018-05-14 16:36:29.712  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:5
    2018-05-14 16:36:29.718  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:7
    2018-05-14 16:36:29.722  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:8
    2018-05-14 16:36:29.731  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:10
    2018-05-14 16:36:29.736  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:12
    2018-05-14 16:36:29.738  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:14
    2018-05-14 16:36:29.744  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:17
    2018-05-14 16:36:29.747  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付订单号:19

    分析日志:

    从生产者日志可以看出,所有的消息均已经发送成功。

    从消费者1、消费者2日志可以看出,消息被两个消费者均衡消费了。

    8.发送对象

    实际项目中,请求信息可能包含多个字段。为了保证生产者与消费者两端的字段一致性,通常会传递一个对象。

    8.1)为了测试方便,我们在DirectConfig中增加一个消息队列

    @Bean
    public Queue refundNotifyQueue() {
    	return new Queue("notify.refund");
    }
    

      

    8.2)添加一个消息监听类(消费者)

    监听routingKey为notify.refund的队列消息

    @Component
    @RabbitListener(queues = "notify.refund")
    public class RefundNotifyReceive {
    	@RabbitHandler
    	public void receive(Order order) {
    		LogUtil.info("notify.refund receive message: "+order);
    	}
    }
    

      

    8.3)添加一个消息发送类(生产者)

    @Component
    public class RefundNotifySender {
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    	
    	public void sender(Order order){
    		LogUtil.info("notify.refund send message: "+order);
    		rabbitTemplate.convertAndSend("notify.refund", order);
    	}
    }
    

      

    8.4)添加一个测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RefundNotifySenderTests {
    	@Autowired
    	private RefundNotifySender sender;
    	
    	@Test
    	public void test_sender() {
    		Order order = new Order();
    		order.setId(100001);
    		order.setOrderId(String.valueOf(System.currentTimeMillis()));
    		order.setAmount(new BigDecimal("1999.99"));
    		order.setCreateTime(new Date());
    		sender.sender(order);
    	}
    }
    

      

    8.5)执行RefundNotifySenderTests测试类

    生产者日志:

    2018-05-14 16:37:47.038  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : Starting RefundNotifySenderTests on LAPTOP-1DF7S904 with PID 13672 (started by lianjinsoft in...
    2018-05-14 16:37:47.041  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : No active profile set, falling back to default profiles: default
    2018-05-14 16:37:47.070  INFO 13672 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa:...
    2018-05-14 16:37:47.715  INFO 13672 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
    2018-05-14 16:37:48.779  INFO 13672 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:37:48.802  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : Started RefundNotifySenderTests in 2.082 seconds (JVM running for 2.967)
    2018-05-14 16:37:49.085  INFO 13672 --- [           main] com.lianjinsoft.util.LogUtil             : notify.refund send message: Order [id=100001, orderId=1526287069081, amount=1999.99, createTime=...
    2018-05-14 16:37:49.104  INFO 13672 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:37:49.170  INFO 13672 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#6b5894c8:0/SimpleConnection@38f57b3d [delegate=...
    2018-05-14 16:37:49.265  INFO 13672 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
    2018-05-14 16:37:49.266  INFO 13672 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者日志:

    2018-05-14 16:37:49.242  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.refund receive message: Order [id=100001, orderId=1526287069081, amount=1999.99, createTime=...

    分析日志:

    从生产者日志可以看出,order对象已经发送成功。

    从消费者日志可以看出,order对象已经接受成功并可以直接使用。

    注意:

    传递的对象必须支持序列化(实现了Serializable接口)

    9.RPC

    RabbitMQ支持RPC远程调用,同步返回结果。

    9.1)为了测试方便,我们在DirectConfig中增加一个消息队列

    @Bean
    public Queue queryOrderQueue() {
    	return new Queue("query.order");
    }
    

      

    9.2)添加一个消息监听类(消费者)

    监听routingKey为query.order的队列消息

    @Component
    @RabbitListener(queues = "query.order")
    public class QueryOrderReceive {
    	@RabbitHandler
    	public Order receive(String orderId) {
    		LogUtil.info("notify.refund receive message: "+orderId);
    		
    		Order order = new Order();
    		order.setId(100001);
    		order.setOrderId(orderId);
    		order.setAmount(new BigDecimal("2999.99"));
    		order.setCreateTime(new Date());
    		return order;
    	}
    }
    

      

    9.3)添加一个消息发送类(生产者)

    @Component
    public class QueryOrderSender {
    	@Autowired
    	private AmqpTemplate rabbitTemplate;
    	
    	public void sender(String orderId){
    		LogUtil.info("query.order send message: "+orderId);
    		Order order = (Order) rabbitTemplate.convertSendAndReceive("query.order", orderId);
    		LogUtil.info("query.order return message: "+order);
    	}
    }
    

      

    9.4)添加一个测试类

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class QueryOrderSenderTests {
    	@Autowired
    	private QueryOrderSender sender;
    	
    	@Test
    	public void test_sender() {
    		sender.sender("900000001");
    	}
    }
    

      

    9.5)执行QueryOrderSenderTests测试类

    生产者日志:

    2018-05-14 16:38:14.163  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : Starting QueryOrderSenderTests on LAPTOP-1DF7S904 with PID 2024 (started by lianjinsoft in...
    2018-05-14 16:38:14.164  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : No active profile set, falling back to default profiles: default
    2018-05-14 16:38:14.197  INFO 2024 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
    2018-05-14 16:38:14.848  INFO 2024 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
    2018-05-14 16:38:15.705  INFO 2024 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
    2018-05-14 16:38:15.715  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : Started QueryOrderSenderTests in 1.927 seconds (JVM running for 3.079)
    2018-05-14 16:38:15.793  INFO 2024 --- [           main] com.lianjinsoft.util.LogUtil             : query.order send message: 900000001
    2018-05-14 16:38:15.812  INFO 2024 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
    2018-05-14 16:38:15.988  INFO 2024 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#38be305c:0/SimpleConnection@71984c3 ...
    2018-05-14 16:38:16.057  INFO 2024 --- [           main] com.lianjinsoft.util.LogUtil             : query.order return message: Order [id=100001, orderId=900000001, amount=2999.99, createTime=...
    2018-05-14 16:38:16.079  INFO 2024 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
    2018-05-14 16:38:16.097  INFO 2024 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

    消费者日志:

    2018-05-14 16:38:16.028  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.refund receive message: 900000001

    分析日志:

    从生产者日志第7行可以看出,消息已经发送成功。从第10行日志可以看出,已经收取到返回的消息,并成功转化为Order对象。

    从消费者日志可以看出,已经成功接收到消息并处理完成。

    虽然RabbitMQ支持RPC接口调用,但不推荐使用。

    原因:

    1)RPC默认为单线程阻塞模型,效率极低。

    2)需要手动实现多线程消费。

    安装RabbitMQ请参考:CentOS在线安装RabbitMQ3.7

    本帖源代码:https://gitee.com/skychenjiajun/spring-boot

  • 相关阅读:
    C++ 获取图片文件信息
    java中redis的分布式锁工具类
    java中的redis工具类
    mysql中的sql查询优化
    利用Linux中的crontab实现分布式项目定时任务
    MYSQL的REPLACE和ON DUPLICATE KEY UPDATE使用
    redis学习三,Redis主从复制和哨兵模式
    redis学习五,redis集群搭建及添加主从节点
    String 转化成java.sql.Date和java.sql.Time
    SpringMVC配置双数据源,一个java项目同时连接两个数据库
  • 原文地址:https://www.cnblogs.com/skychenjiajun/p/9037324.html
Copyright © 2011-2022 走看看