zoukankan      html  css  js  c++  java
  • RabittMQ实践(二): RabbitMQ 与spring、springmvc框架集成

    一、RabbitMQ简介

    1.1、rabbitMQ的优点(适用范围)
    1. 基于erlang语言开发具有高可用高并发的优点,适合集群服务器。
    2. 健壮、稳定、易用、跨平台、支持多种语言、文档齐全。
    3. 有消息确认机制和持久化机制,可靠性高。
    4. 开源
    其他MQ的优势:
    1. Apache ActiveMQ曝光率最高,但是可能会丢消息。
    2. ZeroMQ延迟很低、支持灵活拓扑,但是不支持消息持久化和崩溃恢复。

    1.2、几个概念说明
    producer&consumer
    producer指的是消息生产者,consumer消息的消费者。
    Queue
    消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。
    设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
    设置为临时队列,queue中的数据在系统重启之后就会丢失
    设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除Exchange

    Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。
    Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:
    Direct
    直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
    fanout
    广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。
    topic
    主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
    headers
    消息体的header匹配(ignore)
    Binding
    所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。
    virtual host
    在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

    二、Rbbitmq与Spring、springMVC结合使用实例

    1. MessageProducer

     1 MessageProducerpackage com.hjp.rabbitmq.rabbitmq.samples3;
     2 
     3 import javax.annotation.Resource;
     4 
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 import org.springframework.amqp.core.AmqpTemplate;
     8 import org.springframework.stereotype.Service;
     9 
    10 @Service
    11 public class MessageProducer {
    12 
    13     private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
    14 
    15     @Resource
    16     private AmqpTemplate amqpTemplate;
    17 
    18     public void sendMessage(Object message) {
    19         
    20         logger.info("to send message:{}", message);
    21         amqpTemplate.convertAndSend("queueTestKey", message);
    22 
    23     }
    24 }

    2.MessageConsumer

    package com.hjp.rabbitmq.rabbitmq.samples3;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class MessageConsumer implements MessageListener {
        
        private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    
        @Override
        public void onMessage(Message message) {
            logger.info("receive message:{}",message);
        }
    
    }

    3.rabbitmq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
        <!--配置connection-factory,指定连接rabbit server参数 -->
        <rabbit:connection-factory id="connectionFactory"
            username="guest" password="guest" host="localhost" port="5672" />
            
        <!--定义rabbit template用于数据的接收和发送 -->
        <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory" 
            exchange="exchangeTest" />
            
        <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin connection-factory="connectionFactory" />
    
        <!--定义queue -->
        <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
    
        <!-- 定义direct exchange,绑定queueTest -->
        <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
            <rabbit:bindings>
                <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:direct-exchange>
        
        <!-- 消息接收者 -->
        <bean id="messageReceiver" class="com.hjp.rabbitmq.rabbitmq.samples3.MessageConsumer"></bean>
        
        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
        <rabbit:listener-container connection-factory="connectionFactory">
                 <rabbit:listener queues="queueTest" ref="messageReceiver"/>
        </rabbit:listener-container>
        
    </beans>

    4.application.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:p="http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
        
        <import resource="classpath*:rabbitmq.xml" />
        <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->
        <context:component-scan base-package="com.hjp.rabbitmq.rabbitmq.samples3" />            
        <!-- 激活annotation功能 -->
        <context:annotation-config />
        <!-- 激活annotation功能 -->
        <context:spring-configured />
            
    </beans>

    5.log4j配置

    log4j.rootLogger=DEBUG,Console,Stdout
    
    #Console
    log4j.appender.Console=org.apache.log4j.ConsoleAppender
    log4j.appender.Console.layout=org.apache.log4j.PatternLayout
    log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
    
    log4j.logger.java.sql.ResultSet=INFO
    log4j.logger.org.apache=INFO
    log4j.logger.java.sql.Connection=DEBUG
    log4j.logger.java.sql.Statement=DEBUG
    log4j.logger.java.sql.PreparedStatement=DEBUG 
    
    log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender  
    #log4j.appender.Stdout.File = E://logs/log.log  
    log4j.appender.Stdout.Append = true  
    log4j.appender.Stdout.Threshold = DEBUG   
    log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout  
    log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n  

    代码下载:https://github.com/jianpingh/rabbitmq-samples

  • 相关阅读:
    Path Sum
    Intersection of Two Linked Lists (求两个单链表的相交结点)
    Nginx入门资料
    赛马问题
    翻转单词顺序 VS 左旋转字符串
    重建二叉树
    Fibonacci相关问题
    Find Minimum in Rotated Sorted Array(旋转数组的最小数字)
    常用查找算法总结
    Contains Duplicate
  • 原文地址:https://www.cnblogs.com/huangjianping/p/8340643.html
Copyright © 2011-2022 走看看