zoukankan      html  css  js  c++  java
  • RabbitMQ的使用

                接到的项目是:spring的项目做spring整合rabbitMQ的作生产者,而测试使用springboot整合RmQ做消费者,交换机模式---Topic,这里还涉及到队列和消息的持久化,这里稍作总结!

     1:设置了队列和消息的持久化之后,当broker服务重启的之后,消息依旧存在

    spring整合rabbitMQ的作生产者:

    pom.xml:

    <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.3.5.RELEASE</version>
            </dependency>
             <!--无此类会报错,具体原因不详-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aspects</artifactId>
                <version>3.2.8.RELEASE</version>
            </dependency>

    rabbitMq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
    
        <!-- RabbitMQ公共配置部分 start -->
    
        <!--配置connection-factory,指定连接rabbit server参数 -->
        <rabbit:connection-factory id="connectionFactory"
                                   virtual-host="/" username="guest" password="guest" host="172.24.245.90"
                                   port="5672" />
    
        <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
    
        <!-- RabbitMQ公共配置部分 end -->
    
    
    
        <!-- ~~~~~~~~~~~~~~~~~~~~~华丽的分割线~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    
        <!-- 定义 topic方式的exchange、队列、消息收发 start -->
    
        <!--定义queue -->
        <!--中durable是是否持久划的标志,默认是true-->
        <rabbit:queue name="topic_queue_t" durable="true"
                      auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
        <!--定义topic类型exchange,绑定direct_queue_test -->
        <rabbit:topic-exchange name="exchange_topic">
            <rabbit:bindings>
                <rabbit:binding queue="topic_queue_t" pattern="notice.*" />
            </rabbit:bindings>
        </rabbit:topic-exchange>
    
        <!--定义rabbit template用于数据的接收和发送 -->
        <rabbit:template id="topicAmqpTemplate"
                         connection-factory="connectionFactory" exchange="exchange_topic" />
    
        <!-- 消息接收者 -->
    
        <!--<bean id="topicMessageReceiver" class="com.dcits.ensemble.service.sms.rabbit.TopicMessageReceiver"></bean>-->
    
        <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    
       <!-- <rabbit:listener-container
                connection-factory="connectionFactory">
            <rabbit:listener queues="topic_queue_t" ref="topicMessageReceiver" />
        </rabbit:listener-container>-->
    
        <!-- 定义 topic方式的exchange、队列、消息收发 end -->
    
    
        <!-- ~~~~~~~~~~~~~~~~~~~~~华丽的分割线~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    
    </beans>

    aapplication.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
    
        <import resource="classpath*:rabbitMq.xml" />
    
        <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->
        <!--<context:component-scan base-package="com.dcits.ensemble.service.sms.rabbit, com.dcits.ensemble.service.sms.rabbit" />-->
        <context:component-scan base-package="com.dcits.ensemble.service.sms.rabbit" />
    
        <!-- 激活annotation功能 -->
        <context:annotation-config />
        <!-- 激活annotation功能 -->
        <context:spring-configured />
    
    </beans>

    生产者:

    package com.dcits.ensemble.service.sms.rabbit;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    
    import javax.annotation.Resource;
    import java.io.IOException;
     
    @Service
    public class TopicMessageProducer {
        private Logger logger = LoggerFactory.getLogger(TopicMessageProducer.class);
     
        @Resource(name = "topicAmqpTemplate")
        private AmqpTemplate topicAmqpTemplate;
        /**
         * @author:LiFangTao
         * @date: 2019/6/4
         * @description:
         * topicAmqpTemplate.convertAndSend(String routingKey, Object object),异步调用生产者
         */
    
        @Async
        public void sendMessage(Object message) throws IOException {
            logger.info("to send message:{}", message);
            //未持久化的消息
            topicAmqpTemplate.convertAndSend("notice.info", message);
    
           //rabbitMQ的消息持久化
            /*ConnectionFactory factory=new ConnectionFactory(); //创建连接工厂
            factory.setHost("172.24.245.90");
            Connection connection=factory.newConnection(); //创建连接
            Channel channel=connection.createChannel();//创建信道
            //将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
            channel.basicPublish("exchange_topic","notice.info", MessageProperties.PERSISTENT_TEXT_PLAIN,message.toString().getBytes());
            System.out.println("持久化结束");*/
    
        }
    }

    springboot整合rabbitMQ的作消费者:

    pom.xml:

    <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    Mqconfig:

    package com.example.test002.mq;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class Conf {
    
            @Bean(name="message")
            public Queue queueMessage() {
                return new Queue("topic_queue_t");
            }
    
            @Bean
            public TopicExchange exchange() {
                return new TopicExchange("exchange_topic");
            }
    
            @Bean
            Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
                return BindingBuilder.bind(queueMessage).to(exchange).with("notice.info");
            }
    
    
    }
    TopicReceiver:

    package com.example.test002.mq;
     
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
     
    /**
     * Created by Administrator on 2018/4/10.
     */
    @Component
    public class TopicReceiver {
     
        @RabbitListener(queues ="topic_queue_t" )
        public void receiveMessage1(String str){
            System.out.println("我是消费者----------- , "+str);
        }
     
    
    }

    测试持久化:

    一:未持久化的MQ:

    注意,RabbitMQ不允许对一个已经存在的队列用不同的参数重新声明,对于试图这么做的程序,会报错,所以,改动之前代码之前,要在控制台中把原来的队列删除

    步骤:
    1,启动rabbitmq server
    2,运行以上java代码
    3,使用rabbitmqctl查看消息

     

    4,关闭rabbitmq server,再启动
    5,使用rabbitmqctl查看消息

    A:只队列持久化,未消息持久化

     

       答:仍可别消费(已持久化)

    B: 未队列持久化,只消息持久化(不存在)

    C:都未持久化

     

    重启前;

     

    重启后:(无队列)

     

    二:持久化后的MQ:

     

    1队列持久化

     

    2消息持久化:

    3测试:

    步骤:
    1,启动rabbitmq server
    2,运行以上java代码
    3,使用rabbitmqctl查看消息

     


    4,关闭rabbitmq server,再启动
    5,使用rabbitmqctl查看消息

     

    6:关闭生产者项目,启动消费者项目:


      已消费(持久化成功)

  • 相关阅读:
    toFixed()与银行家舍入
    VScode链接服务器并配置公钥-SSH Keys
    改造@vue/cli项目为服务端渲染-ServerSideRender
    vue预渲染及其cdn配置
    界面优化--如何提升用户体验(Velocity.js和GSAP)
    eslint配置介绍-如何在uniapp中配置eslint
    babel 的介绍及其配置
    如何为我的VUE项目编写高效的单元测试--Jest
    計算幾何 學習
    Manacher
  • 原文地址:https://www.cnblogs.com/leeego-123/p/10980198.html
Copyright © 2011-2022 走看看