zoukankan      html  css  js  c++  java
  • rabbitMQ基于spring-rabbitnq

    一、什么是MQ

      MQ全称为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。

    二、MQ特点

      MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

    三、使用场景

      在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

    四、相关基础概念

      Exchange:交换机,决定了消息路由规则;

      Queue:消息队列;

      Channel:进行消息读写的通道;

      Bind:绑定了Queue和Exchange,意即为符合什么样路由规则的消息,将会放置入哪一个消息队列;

    五、本人是用的是spring-rabbitmq实现的相关配置如下:

    1、生产者配置

     1 <beans xmlns="http://www.springframework.org/schema/beans"
     2        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     3        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     4        xmlns:util="http://www.springframework.org/schema/util"
     5        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
     6         http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
     7 
     8     <!--<util:properties id="rabbit" location="classpath*:rabbitmq.properties"/>-->
     9 
    10     <!-- 配置ConnectionFactory -->
    11     <rabbit:connection-factory id="connectionFactory"
    12                                host="#{config['rabbit.host']}"
    13                                username="#{config['rabbit.user']}"
    14                                password="#{config['rabbit.pwd']}"
    15                                port="#{config['rabbit.port']}"
    16                                virtual-host="#{config['rabbit.virtualHost']}"/>
    17     <!-- 等同new一个RabbitAdmin -->
    18     <rabbit:admin connection-factory="connectionFactory"/>
    19     <!-- 声明一个队列 -->
    20     <rabbit:queue id="handle" name="#{config['rabbit.queue.handle.name']}"/>
    21 
    22     <!-- 声明一个topic类型的exchange,并把上面声明的队列绑定在上面,routingKey="foo.*" -->
    23     <rabbit:topic-exchange name="ex_sshmsgcentor">
    24         <rabbit:bindings>
    25             <rabbit:binding queue="handle" pattern="#{config['rabbit.queue.handle.name']}"/>
    26             <!-- 这里还可以继续绑定其他队列 -->
    27         </rabbit:bindings>
    28     </rabbit:topic-exchange>
    29 
    30     <!-- 声明一个rabbitTemplate,指定连接信息,发送消息到myExchange上,routingKey在程序中设置,此处的配置在程序中可以用set修改 -->
    31     <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="ex_sshmsgcentor"/>
    32 
    33 </beans>
    1 # rabbitMQ 相关配置
    2 rabbit.host=127.0.0.1
    3 rabbit.user=test
    4 rabbit.pwd=test
    5 rabbit.port=5672
    6 rabbit.virtualHost=vir_test
    7 #需要发送到mqtt的消息,通配符
    8 rabbit.queue.handle.name=test

    调用方法:

     @Resource
     private AmqpTemplate amqpTemplate;
    
    public void pushRabbitMQ{
         logger.info("push rabbitMq message:[" + messageCentent + "]");
            MessageProperties messageProperties = new MessageProperties();
            Message message = new Message(messageCentent.getBytes(), messageProperties);
            logger.info("message:[" + messageCentent + "]");
            logger.info("PATTERN:" + ConfigUtils.RABBIT_QUEUE_HANDLE_NAME);
            amqpTemplate.send(EXCHANGE_NAME, ConfigUtils.RABBIT_QUEUE_HANDLE_NAME, message);
    }

     ps:本项目生产者和消费者是不同的项目

    2、消费者配置

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     5        xmlns:util="http://www.springframework.org/schema/util"
     6        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
     7         http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
     8 
     9     <!--<util:properties id="rabbit" location="classpath*:rabbitmq.properties"/>-->
    10 
    11     <!-- 配置ConnectionFactory -->
    12     <rabbit:connection-factory id="connectionFactory"
    13                                host="#{config['rabbit.host']}"
    14                                username="#{config['rabbit.user']}"
    15                                password="#{config['rabbit.pwd']}"
    16                                port="#{config['rabbit.port']}"
    17                                virtual-host="#{config['rabbit.virtualHost']}"/>
    18     <!-- 等同new一个RabbitAdmin -->
    19     <rabbit:admin connection-factory="connectionFactory"/>
    20     <!-- 声明一个队列 -->
    21     <rabbit:queue id="handle" name="#{config['rabbit.queue.handle.name']}"/>
    22 
    23     <!-- 声明一个topic类型的exchange,并把上面声明的队列绑定在上面,routingKey="foo.*" -->
    24     <rabbit:topic-exchange name="ex_sshmsgcentor">
    25         <rabbit:bindings>
    26             <rabbit:binding queue="handle" pattern="#{config['rabbit.queue.handle.name']}"/>
    27             <!-- 这里还可以继续绑定其他队列 -->
    28         </rabbit:bindings>
    29     </rabbit:topic-exchange>
    30 
    31     <!-- 声明一个rabbitTemplate,指定连接信息,发送消息到myExchange上,routingKey在程序中设置,此处的配置在程序中可以用set修改 -->
    32     <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="ex_sshmsgcentor"/>
    33 
    34     <!-- 配置监听容器,指定消息处理类,处理方法,还可以配置自动确认等-->
    35     <rabbit:listener-container connection-factory="connectionFactory"
    36                                acknowledge="manual" max-concurrency="#{config['rabbit.consumer.max.concurrency']}"
    37                                concurrency="#{config['rabbit.consumer.concurrency']}" prefetch="#{config['rabbit.consumer.prefetch']}">
    38         <rabbit:listener ref="messageReceiver" queue-names="#{config['rabbit.queue.handle.name']}"/>
    39         <!-- 可以继续注册监听 -->
    40     </rabbit:listener-container>
    41     <bean id="messageReceiver" class="com.ssh.servicenode.pushtaskserver.consumer.MessageConsumer"/>
    42 
    43 
    44 </beans>
     1 # rabbitMQ 相关配置
     2 rabbit.host=192.168.1.211
     3 rabbit.user=devmsgcentor
     4 rabbit.pwd=devmsgcentor
     5 rabbit.port=5672
     6 rabbit.virtualHost=vir_devmsgcentor
     7 #需要发送到mqtt的消息,通配符
     8 rabbit.queue.handle.name=queue_devmsgcentor
     9 #监听者并发数
    10 rabbit.consumer.concurrency = 1
    11 #监听者最大并发数
    12 rabbit.consumer.max.concurrency = 2
    13 #每个监听者从队列中预取数
    14 rabbit.consumer.prefetch = 100

    辅助网页

  • 相关阅读:
    解决FileReader读取文件乱码问题
    comparable 与 comparator
    JavaScript添加水印
    mybatis里oracle与MySQL的insert_update
    关于回车换行
    solr创建collection
    hbase相关配置说明
    java基础(十一)--- IO
    java基础(十)--- 异常
    c++中的vector原理
  • 原文地址:https://www.cnblogs.com/lxn0216/p/10676629.html
Copyright © 2011-2022 走看看