zoukankan      html  css  js  c++  java
  • RabittMQ实践(二): RabbitMQ 与spring、springmvc框架集成

    一、RabbitMQ简介

    1.1、rabbitMQ的优点(适用范围)
    1. 基于erlang语言开发具有高可用高并发的优点,适合集群服务器。
    2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全。
    3. 有消息确认机制和持久化机制,可靠性高。
    4. 开源
    其他MQ的优势:
    1. Apache ActiveMQ曝光率最高,但是可能会丢消息。
    2. ZeroMQ延迟很低、支持灵活拓扑,但是不支持消息持久化和崩溃恢复。

    1.2、几个概念说明
    producer&consumer
    producer指的是消息生产者,consumer消息的消费者。
    Queue
    消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
    设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
    设置为临时队列,queue中的数据在系统重启之后就会丢失
    设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除Exchange

    Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
    Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
    Direct
    直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
    fanout
    广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
    topic
    主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
    headers
    消息体的header匹配(ignore)
    Binding
    所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。
    virtual host
    在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

    二、Rbbitmq与Spring、springMVC结合使用实例

    1. MessageProducer

     1 MessageProducerpackage com.hjp.rabbitmq.rabbitmq.samples3;
     2 
     3 import javax.annotation.Resource;
     4 
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 import org.springframework.amqp.core.AmqpTemplate;
     8 import org.springframework.stereotype.Service;
     9 
    10 @Service
    11 public class MessageProducer {
    12 
    13     private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
    14 
    15     @Resource
    16     private AmqpTemplate amqpTemplate;
    17 
    18     public void sendMessage(Object message) {
    19         
    20         logger.info("to send message:{}", message);
    21         amqpTemplate.convertAndSend("queueTestKey", message);
    22 
    23     }
    24 }

    2.MessageConsumer

    package com.hjp.rabbitmq.rabbitmq.samples3;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class MessageConsumer implements MessageListener {
        
        private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    
        @Override
        public void onMessage(Message message) {
            logger.info("receive message:{}",message);
        }
    
    }

    3.rabbitmq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
        <!--配置connection-factory,指定连接rabbit server参数 -->
        <rabbit:connection-factory id="connectionFactory"
            username="guest" password="guest" host="localhost" port="5672" />
            
        <!--定义rabbit template用于数据的接收和发送 -->
        <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory" 
            exchange="exchangeTest" />
            
        <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin connection-factory="connectionFactory" />
    
        <!--定义queue -->
        <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
    
        <!-- 定义direct exchange,绑定queueTest -->
        <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
            <rabbit:bindings>
                <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
        
        <!-- 消息接收者 -->
        <bean id="messageReceiver" class="com.hjp.rabbitmq.rabbitmq.samples3.MessageConsumer"></bean>
        
        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
        <rabbit:listener-container connection-factory="connectionFactory">
                 <rabbit:listener queues="queueTest" ref="messageReceiver"/>
        </rabbit:listener-container>
        
    </beans>

    4.application.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:p="http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
        
        <import resource="classpath*:rabbitmq.xml" />
        <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->
        <context:component-scan base-package="com.hjp.rabbitmq.rabbitmq.samples3" />            
        <!-- 激活annotation功能 -->
        <context:annotation-config />
        <!-- 激活annotation功能 -->
        <context:spring-configured />
            
    </beans>

    5.log4j配置

    log4j.rootLogger=DEBUG,Console,Stdout
    
    #Console
    log4j.appender.Console=org.apache.log4j.ConsoleAppender
    log4j.appender.Console.layout=org.apache.log4j.PatternLayout
    log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
    
    log4j.logger.java.sql.ResultSet=INFO
    log4j.logger.org.apache=INFO
    log4j.logger.java.sql.Connection=DEBUG
    log4j.logger.java.sql.Statement=DEBUG
    log4j.logger.java.sql.PreparedStatement=DEBUG 
    
    log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender  
    #log4j.appender.Stdout.File = E://logs/log.log  
    log4j.appender.Stdout.Append = true  
    log4j.appender.Stdout.Threshold = DEBUG   
    log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout  
    log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n  

    代码下载:https://github.com/jianpingh/rabbitmq-samples

  • 相关阅读:
    ADF中遍历VO中的行数据(Iterator)
    程序中实现两个DataTable的Left Join效果(修改了,网上第二个DataTable为空,所处的异常)
    ArcGIS api for javascript——鼠标悬停时显示信息窗口
    ArcGIS api for javascript——查询,然后单击显示信息窗口
    ArcGIS api for javascript——查询,立刻打开信息窗口
    ArcGIS api for javascript——显示多个查询结果
    ArcGIS api for javascript——用图表显示查询结果
    ArcGIS api for javascript——查询没有地图的数据
    ArcGIS api for javascript——用第二个服务的范围设置地图范围
    ArcGIS api for javascript——显示地图属性
  • 原文地址:https://www.cnblogs.com/huangjianping/p/8340643.html
Copyright © 2011-2022 走看看