zoukankan      html  css  js  c++  java
  • springboot使用配置文件的方式集成RabbitMQ

    RabbitMQ可以使用编程式和申明式的方式,网上很多都是直接使用编程的方式来使用,这里主要讲如何使用springboot配置文件的方式来使用RabbitMQ。

    demo可以参考(https://github.com/Little-Orange7/rabbitMQ 和 https://github.com/Little-Orange7/rabbitMQ-async)

    一.搭建环境:

    由于RabbitMQ是使用Erlang语言编写的,所以要先安装Erlang;

    1.下载Erlang:http://www.erlang.org/downloads,注意 rabbitMq 和erlang的版本要对应,可以在http://www.rabbitmq.com/which-erlang.html页面查看对应版本。

    2.下载RabbitMQ:RabbitMq Server http://www.rabbitmq.com/releases/rabbitmq-server/

    下载完之后,安装;

    二.RabbitMQ配置

    rabbitMQ配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    
        <description>rabbitmq 服务配置</description>
        <!-- 公共部分 -->
        <!-- 创建连接类 连接安装好的 rabbitmq -->
        <bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
            <constructor-arg value="localhost" />
            <property name="virtualHost" value="${rabbitmq.virtualHost}" />
            <!-- host,RabbitMQ服务器地址,默认值"localhost" -->
            <property name="host" value="${rabbitmq.ip}" />
            <!-- port,RabbitMQ服务端口,默认值为5672 -->
            <property name="port" value="${rabbitmq.port}" />
            <!-- username,访问RabbitMQ服务器的账户,默认是guest -->
            <property name="username" value="${rabbitmq.username}" />
            <!-- username,访问RabbitMQ服务器的密码,默认是guest -->
            <property name="password" value="${rabbitmq.password}" />
            <!-- channel-cache-size,channel的缓存数量,默认值为25 -->
            <property name="channelCacheSize" value="${rabbitmq.channelCacheSize}" />
            <!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
            <property name="cacheMode" value="CHANNEL" />
        </bean>
        <!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例-->
        <!--<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.ip}" port="${rabbitmq.port}"
        username="${rabbitmq.manager.user}" password="${rabbitmq.manager.password}" />-->
    
        <!-- RabbitAdmin主要用于在Java代码中对用于创建、绑定、管理队列与交换机,可以直接通过RabbitAdmin接口来管理 -->
    <!--    <rabbit:admin connection-factory="connectionFactory"/>-->
    
        <!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
        <!--管理平台中显示是name,消费者监听的也是name,此处定义的id在绑定生产者时使用-->
        <rabbit:queue id="queue_one" name="${test_queue_one}" durable="true" auto-delete="false" exclusive="false" />
        <rabbit:queue id="queue_two" name="${test_queue_two}" durable="true" auto-delete="false" exclusive="false" />
    
        <!--定义交换机,绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
        <!--管理平台中显示的是name,绑定amqpTemplate也是使用name-->
        <rabbit:direct-exchange id="direct_Exchange" name="directExchange" durable="true" auto-delete="false">
            <rabbit:bindings>
                <!--queue对应上面定义的queue的id,此处的key是routingKey,调用生产者发送消息到对应队列需要传入相应的routringKey-->
                <rabbit:binding queue="queue_one" key="${routingKey.one}"></rabbit:binding>
                <rabbit:binding queue="queue_two" key="${routingKey.two}"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!--定义 amqpTemplate 将交换机和amqp绑定,其中exchange对应的是上面定义的交换机的name-->
        <rabbit:template id="amqpTemplate" exchange="directExchange" connection-factory="connectionFactory" channel-transacted="true"/>
    
    
        <!-- 生产者部分 -->
        <!-- 发送消息的producer类,也就是生产者 -->
        <bean id="testProducer" class="com.example.rabbitdemo.producer.impl.TestProducerImpl">
            <!-- spring注入 -->
            <property name="amqpTemplate" ref="amqpTemplate"/>
        </bean>
    
    
        <!-- 消费者部分 -->
        <!-- 自定义消费者类 -->
        <bean id="testConsumerOne" class="com.example.rabbitdemo.consumer.TestConsumerOne"></bean>
        <bean id="testConsumerTwo" class="com.example.rabbitdemo.consumer.TestConsumerTwo"></bean>
        <!-- 配置监听,默认是acknowledge="auto",可以设置手动应答acknowledge="manual"-->
        <rabbit:listener-container connection-factory="connectionFactory">
            <!--此处queue-names对应的是定义queue的name而不是id-->
            <rabbit:listener queue-names="${test_queue_one}" ref="testConsumerOne" />
            <rabbit:listener queue-names="${test_queue_two}" ref="testConsumerTwo" />
        </rabbit:listener-container>
    
    
    </beans>

    上述的这个配置文件包含生产者和消费者的配置,这里是同一个项目用作异步调用使用,所以将生产者和消费者配置文件合并到一起,实际生产者和消费者分开的情况是最常见的,所以,可以根据项目需要,将这个配置文件拆分成生产者和消费者两个配置文件(可以参考这个项目:https://github.com/Little-Orange7/rabbitMQ);

    使用:

    生产者:

    @Service
    public class TestProducerImpl implements TestProducer {
    
        //由配置文件配置方式注入
        private AmqpTemplate amqpTemplate;
    
        public AmqpTemplate getAmqpTemplate() {
            return amqpTemplate;
        }
    
        public void setAmqpTemplate(AmqpTemplate amqpTemplate) {
            this.amqpTemplate = amqpTemplate;
        }
    
        private void send(String routingKey, String json){
            System.out.println("--------------Producer-----start-------");
            System.out.println("routingKey:"+routingKey+",json:"+json);
            amqpTemplate.convertAndSend(routingKey,json);
            System.out.println("--------------Producer------end------");
        }
    
        @Override
        public void sendMessage(String routingKey, Object obj) {
            String json= JSON.toJSONString(obj);
            send(routingKey,json);
        }
    }

    生产者通过AmqpTemplate接口来发送消息,其中需要传入routingKey,这个routingKey是定义交换机的时候绑定的queue的key值,生产者通过这个routingKey来确定要将发送的消息放入到哪个queue中;

    消费者:

    public class TestConsumerOne implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            String messageJson=new String(message.getBody());
            System.out.println("ConsumerOne  messageJson:"+messageJson);
            TestPojo tj =JSON.parseObject(messageJson, new TypeReference<TestPojo>() {});
        }
    }

    消费者实现MessageListener这个接口,可以监听到queue中的消息,如果有待处理的message,则执行onMessage方法,可以从Message中获取到数据;这个消费者监听的queue在配置文件中就已经声明过了,绑定要监听的queue的name,只要这个监听的queue中存在待处理的消息,则会执行onMessage方法;

    三.Springboot集成RabbitMQ配置文件

    要读取此配置文件,必须加此注解,可以在启动类上加,也可以在放配置类上;

    @SpringBootApplication
    @ImportResource(locations = {"classpath:spring-rabbitmq.xml"})
    public class RabbitDemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RabbitDemoApplication.class, args);
        }
    
    }
  • 相关阅读:
    数字图像处理(一)之灰度转换和卷积python实现
    ArcEngine+C# 森林资源仿真系统 核心代码
    Dijkstra和Floyd算法遍历图的核心
    python像操作文件一样操作内存的模块 StringIO
    python操作Redis方法速记
    python中时间处理标准库DateTime加强版库:pendulum
    utittest和pytest中mock的使用详细介绍
    《金字塔原理》 读书笔记
    python轻量级orm框架 peewee常用功能速查
    docker中安装的mysql无法远程连接问题解决
  • 原文地址:https://www.cnblogs.com/littleorange7/p/13536125.html
Copyright © 2011-2022 走看看