zoukankan      html  css  js  c++  java
  • RabbitMQ(四)Spring集成RabbitMQ

    前言

    在使用Spring整合RabbitMQ时我们主要关注三个核心接口:

    • RabbitAdmin: 用于声明交换机 队列 绑定等
    • RabbitTemplate: 用于RabbitMQ消息的发送和接收
    • MessageListenerContainer: 监听容器 为消息入队提供异步处理

    依赖

    <dependency>
         <groupId>org.springframework.amqp</groupId>
         <artifactId>spring-rabbit</artifactId>
         <version>1.7.9.RELEASE</version>
    </dependency>

    配置

    可通过以下两种方式进行配置。

    • rabbitmq.xml配置文件
    <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:rabbit="http://www.springframework.org/schema/rabbit"
      xsi:schemaLocation="http://www.springframework.org/schema/beans 
         http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
    
        <!-- 消费者和生产者通用配置(begin) -->
    <!-- 创建连接工厂 --> <rabbit:connection-factory id="rabbitmqConnectionFactory" host="${rabbitmq_host}" port="${rabbitmq_port}" username="${rabbitmq_user}" password="${rabbitmq_passwd}" virtual-host="${rabbitmq_vhost}" /> <!-- 创建rabbitAdmin --> <rabbit:admin id="connectAdmin" connection-factory="rabbitmqConnectionFactory"/> <!-- 定义消息对象json转换类 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 消费者和生产者通用配置(end) --> <!-- 财务记账接口配置(begin) --> <!-- 创建rabbitTemplate消息发送模版 --> <rabbit:template id="rabbitTemplate" exchange="${mmc_topic_exchange_name}" routing-key="${mmc_routingkey_name}" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"/> <!-- 财务记账接口配置(end) -->
    <!-- 关闭订单配置(begin) --> <!-- 创建rabbitTemplate消息发送模版--> <rabbit:template id="orderDelayTemplate" exchange="mmc.order.delay.exchange-delay" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter" routing-key="mmc.order.delay.routingkey" /> <!-- 声明监听的Queue的名称 --> <rabbit:queue id="orderQueue" name="mmc.order.delay.queue"/> <!-- 声明exchange的类型为topic --> <rabbit:direct-exchange name="mmc.order.delay.exchange-delay" declared-by="connectAdmin" delayed="true"> <rabbit:bindings> <rabbit:binding queue="mmc.order.delay.queue" key="mmc.order.delay.routingkey"/> </rabbit:bindings> </rabbit:direct-exchange>
    <!-- 创建消息监控容器 --> <bean id="messageHandler" class="com.zat.mmc.service.rabbitmq.MessageHandler"/> <rabbit:listener-container connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"> <rabbit:listener queues="mmc.order.delay.queue" ref="messageHandler" /> </rabbit:listener-container> <!-- 关闭订单配置(end) --> </beans>

     配置文件其实还可以更简单点:

        <!-- 消费者和生产者通用配置(begin) -->
        <!-- 创建连接 -->
        <rabbit:connection-factory  id="rabbitmqConnectionFactory" host="${rabbitmq_host}" port="${rabbitmq_port}" username="${rabbitmq_user}" password="${rabbitmq_passwd}" virtual-host="${rabbitmq_vhost}" />
    
        <!-- 创建rabbitAdmin -->
        <rabbit:admin id="connectAdmin" connection-factory="rabbitmqConnectionFactory"/>
    
        <!-- 消息对象json转换类 -->
        <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
        <!-- 消费者和生产者通用配置(end) -->
    
    
        <!-- 财务记账接口配置(begin) -->
        <!-- 创建rabbitTemplate消息发送模版 -->
        <rabbit:template id="rabbitTemplate"  exchange="${mmc_topic_exchange_name}" routing-key="${mmc_routingkey_name}" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"/>
        <!-- 财务记账接口配置(end) -->

    中间定义了交换机,定义了队列,其实都可以省略的,可以到mq的管理后台创建。在代码中定义交换机和队列的好处是在使用时会自动创建它。

    •  Java配置类

    spring在启动时会扫描到Configuration这个注解是一个配置文件的注解。

    @Configuration
    public class RabbitMQConfig {
    
      public final static String QUEUE_NAME = "spring-queue";
      public final static String EXCHANGE_NAME = "spring-exchange";
      public final static String ROUTING_KEY = "spring-key";
    
      // 创建队列
      @Bean
      public Queue queue() {
        return new Queue(QUEUE_NAME);
      }
    
      // 创建一个 topic 类型的交换器
      @Bean
      public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
      }
    
      // 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange)
      @Bean
      public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
      }
    
      @Bean
      public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("xx", 5670);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
      }

      @Bean
      public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 注意,autoStartup 必须设置为 true,否则 Spring 容器不会加载 RabbitAdmin 类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;

      } 

      @Bean
      public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
      }
    }

    生产者

    @Component
    public class DemoProducer {
    
        @Resource(name = "rabbitTemplate")
        private RabbitTemplate rabbitTemplate; 
    
        public void sendProducer() {
            // 生产者发送消息(因为在配置文件里面已经为rabbitTemplate指定了交换机和routing,所以可以省去它们)
            rabbitTemplate.convertAndSend(jsonObject.toJSONString());   
    // 生产者往solutionInfo_exchange这个交换机,这个info_queue_key路由中发送消息 rabbitTemplate.convertAndSend("solutionInfo_exchange","info_queue_key", jsonObject.toJSONString()); } }

    消费者

    package com.rabbitmq.demo
    
    @Component
    public class DemoReceiver extends AbstractAdaptableMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) {
            try {
                String result=new String(message.getBody(),"UTF-8")
                log.info("bodyJson:" + result);
             } catch (Throwable e) {
                log.error("WechatMsgReceiver exception", e);
             }
        }
    
    }

    其它

    • 批量自动声明交换机、队列和绑定

    可以批量创建Queue和Exchange,批量创建绑定关系并将其放进List集合中返回使用:

    •  手动声明交换机、队列和绑定

    可通过RabbitAdmin来实现:

    /**
     * 声明一个direct类型的、持久化、非排他的交换器
     */
    rabbitAdmin.declareExchange(new DirectExchange(EXCHANGE_NAME, true, false, new HashMap<String, Object>()));
    /** * 声明一个持久化、非排他、非自动删除的队列 */ rabbitAdmin.declareQueue(new Queue(QUEUE_NAME, true, false, false, new HashMap<String, Object>()));
    /** * 将交换器和队列绑定 */ rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_NAME)). to(new DirectExchange(EXCHANGE_NAME)).with(ROUTING_KEY));
  • 相关阅读:
    nvm安装及使用(windon/mac)
    JVM学习笔记
    Java多线程
    OkHttpClient调优案例
    Java各版本新增特性, Since Java 8
    Linux下MySQL数据库的备份与恢复
    算法和数据结构学习笔记
    联想台式机安装网卡驱动指南
    解决「现有新的ios更新可用,请从ios14 beta 版更新」问题
    linux 命令英文全称(转帖)
  • 原文地址:https://www.cnblogs.com/caoweixiong/p/12910543.html
Copyright © 2011-2022 走看看