zoukankan      html  css  js  c++  java
  • 消息中间件系列四:RabbitMQ与Spring集成

    一、RabbitMQ与Spring集成

     准备工作:

    分别新建名为RabbitMQSpringProducer和RabbitMQSpringConsumer的maven web工程

     在pom.xml文件里面引入如下依赖:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.study.demo</groupId>
        <artifactId>RabbitMQSpringProducer</artifactId>
        <packaging>war</packaging>
        <version>0.0.1-SNAPSHOT</version>
        <name>RabbitMQSpringProducer Maven Webapp</name>
        <url>http://maven.apache.org</url>
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>javax</groupId>
                <artifactId>javaee-web-api</artifactId>
                <version>7.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>commons-logging</groupId>
                <artifactId>commons-logging</artifactId>
                <version>1.2</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>jcl-over-slf4j</artifactId>
                <version>1.7.5</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>4.3.11.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-aop</artifactId>
                <version>4.3.11.RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>javax.servlet</groupId>
                <artifactId>jstl</artifactId>
                <version>1.2</version>
            </dependency>
    
            <!--日志 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.5</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.16</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>jcl-over-slf4j</artifactId>
                <version>1.7.5</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.0.13</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>1.0.13</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-access</artifactId>
                <version>1.0.13</version>
            </dependency>
    
            <!--JSON -->
            <dependency>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-mapper-asl</artifactId>
                <version>1.9.13</version>
            </dependency>
            <dependency>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-core-asl</artifactId>
                <version>1.9.13</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.8.4</version>
            </dependency>
    
            <!-- RabbitMQ -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>2.0.0.RELEASE</version>
            </dependency>
        </dependencies>
        <build>
            <finalName>RabbitMQSpringProducer</finalName>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>${basedir}/src/main/java</directory>
                    <includes>
                        <include>**/*.xml</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>

    与Spring集成步骤:

    配置文件中增加命名空间:
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd

    配置文件中的配置
    1) 连接工厂配置
    2) <rabbit:admin>
    3) 声明队列
    4) 声明交换器
    5) 队列和交换器进行绑定
    6) 生产者端要声明RabbitmqTemplate

    1. 在工程RabbitMQSpringProducer里面新建/RabbitMQSpringProducer/src/main/java/applicationContext.xml的配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:mvc="http://www.springframework.org/schema/mvc"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:jee="http://www.springframework.org/schema/jee"
           xmlns:p="http://www.springframework.org/schema/p"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:task="http://www.springframework.org/schema/task"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.0.xsd
            http://www.springframework.org/schema/jee
            http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
            http://www.springframework.org/schema/mvc
            http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
            http://www.springframework.org/schema/tx
            http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
            http://www.springframework.org/schema/aop
            http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
            http://www.springframework.org/schema/task
            http://www.springframework.org/schema/task/spring-task-4.0.xsd
            http://www.springframework.org/schema/rabbit
            http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd">
         
    
        <!-- rabbitMQ配置 -->
        <bean id="rabbitConnectionFactory"
            class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
            <constructor-arg value="127.0.0.1"/>
            <property name="username" value="guest"/>
            <property name="password" value="guest"/>
            <property name="channelCacheSize" value="8"/>
            <property name="port" value="5672"></property>
        </bean>
        <!--Spring的rabbitmq admin-->
        <rabbit:admin connection-factory="rabbitConnectionFactory"/>
    
        <!--生产者创建队列-->
        <rabbit:queue name="p_create_queue" durable="false"/>
    
        <!--fanout交换器-->
        <rabbit:fanout-exchange name="fanout-exchange"
            xmlns="http://www.springframework.org/schema/rabbit" durable="false">
            <rabbit:bindings>
                <rabbit:binding queue="p_create_queue"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
    
        <!--topic交换器-->
        <rabbit:topic-exchange name="topic-exchange"
             xmlns="http://www.springframework.org/schema/rabbit" durable="false">
        </rabbit:topic-exchange>
        <!-- rabbitTemplate 消息模板类 -->
    
        <!--定义Spring的RabbitMQ的连接模板  -->
        <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
            <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
        </bean>
    
    </beans>  

    3. 在工程RabbitMQSpringProducer里面新建/RabbitMQSpringProducer/src/main/java/spring-mvc.xml配置文件

    <?xml version="1.0" encoding="UTF-8"?>  
    <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
    <beans xmlns="http://www.springframework.org/schema/beans"   
           xmlns:aop="http://www.springframework.org/schema/aop"   
           xmlns:context="http://www.springframework.org/schema/context"  
           xmlns:mvc="http://www.springframework.org/schema/mvc"   
           xmlns:tx="http://www.springframework.org/schema/tx"   
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
           xsi:schemaLocation="http://www.springframework.org/schema/aop   
            http://www.springframework.org/schema/aop/spring-aop-4.0.xsd   
            http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context-4.0.xsd   
            http://www.springframework.org/schema/mvc   
            http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd   
            http://www.springframework.org/schema/tx   
            http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
    
    <!--    <mvc:default-servlet-handler />-->
        <mvc:resources mapping="/js/**" location="/js/"/>
        <mvc:annotation-driven
                content-negotiation-manager="contentNegotiationManager" />
    
        <context:component-scan base-package="com.study.demo">
            <context:include-filter type="annotation"
                                    expression="org.springframework.stereotype.Controller" />
        </context:component-scan>
    
    
        <bean id="stringHttpMessageConverter"
              class="org.springframework.http.converter.StringHttpMessageConverter">
            <property name="supportedMediaTypes">
                <list>
                    <bean class="org.springframework.http.MediaType">
                        <constructor-arg index="0" value="text" />
                        <constructor-arg index="1" value="plain" />
                        <constructor-arg index="2" value="UTF-8" />
                    </bean>
                </list>
            </property>
        </bean>
        <bean id="mappingJacksonHttpMessageConverter"
              class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
    
        <bean
                class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
            <property name="messageConverters">
                <list>
                    <ref bean="stringHttpMessageConverter" />
                    <ref bean="mappingJacksonHttpMessageConverter" />
                </list>
            </property>
        </bean>
    
        <bean id="contentNegotiationManager"
              class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean">
            <property name="mediaTypes">
                <map>
                    <entry key="html" value="text/html" />
                    <entry key="pdf" value="application/pdf" />
                    <entry key="xsl" value="application/vnd.ms-excel" />
                    <entry key="xml" value="application/xml" />
                    <entry key="json" value="application/json" />
                </map>
            </property>
            <property name="defaultContentType" value="text/html" />
        </bean>
    
        <bean id="viewResolver"
              class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
            <property name="order" value="0" />
            <property name="contentNegotiationManager" ref="contentNegotiationManager" />
    
            <property name="viewResolvers">
                <list>
                    <bean class="org.springframework.web.servlet.view.BeanNameViewResolver" />
                    <bean
                            class="org.springframework.web.servlet.view.InternalResourceViewResolver">
                        <property name="viewClass"
                                  value="org.springframework.web.servlet.view.JstlView" />
                        <property name="prefix" value="/WEB-INF/pages/" />
                        <property name="suffix" value=".jsp"></property>
                    </bean>
                </list>
            </property>
    
            <property name="defaultViews">
                <list>
                    <bean
                            class="org.springframework.web.servlet.view.json.MappingJackson2JsonView">
                        <property name="extractValueFromSingleKeyModel" value="true" />
                    </bean>
                </list>
            </property>
        </bean>
        
    </beans>  

    3. 在工程RabbitMQSpringProducer里面新建一个RabbitMQ与Spring集成发送消息控制器

    package com.study.demo.controller;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    /**
     * 
     * @Description: RabbitMQ与Spring集成发送消息控制器
     * @author leeSmall
     * @date 2018年9月17日
     *
     */
    @Controller
    @RequestMapping("/rabbitmq")
    public class RabbitMqController {
    
        private Logger logger = LoggerFactory.getLogger(RabbitMqController.class);
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        @ResponseBody
        @RequestMapping("/fanoutSender")
        public String fanoutSender(@RequestParam("message")String message){
            String opt="";
            try {
                String str = "Fanout,the message_"+" is : "+message;
                logger.info("**************************Send Message:["+str+"]");
                rabbitTemplate.send("fanout-exchange","",
                        new Message(str.getBytes(),new MessageProperties()));
    
                opt = "suc";
            } catch (Exception e) {
                opt = e.getCause().toString();
            }
            return opt;
        }
    
        @ResponseBody
        @RequestMapping("/topicSender")
        public String topicSender(@RequestParam("message")String message){
            String opt="";
            try {
                String[] severities={"error","info","warning"};
                String[] modules={"email","order","user"};
                for(int i=0;i<severities.length;i++){
                    for(int j=0;j<modules.length;j++){
                        String routeKey = severities[i]+"."+modules[j];
                        String str = "the message is [rk:"+routeKey+"]["+message+"]";
                        rabbitTemplate.send("topic-exchange",routeKey,
                                new Message(str.getBytes(),new MessageProperties()));
    
                    }
                }
                opt = "suc";
            } catch (Exception e) {
                opt = e.getCause().toString();
            }
            return opt;
        }
    
    }

    4. 在工程RabbitMQSpringProducer里面新建一个发送消息的/RabbitMQSpringProducer/src/main/webapp/index.jsp页面

    <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
    <%
        String path = request.getContextPath();
        System.out.println(path);
        String basePath = request.getScheme() + "://"
                + request.getServerName() + ":" + request.getServerPort()
                + path + "/";
        System.out.println(basePath);
    %>
    
    <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
    <html>
    <head>
    <base href="<%=basePath%>">
    
    <title>RabbitMQ Demo程序</title>
    
    <meta http-equiv="pragma" content="no-cache">
    <meta http-equiv="cache-control" content="no-cache">
    <meta http-equiv="expires" content="0">
    <script type="text/javascript" src="<%--<%=basePath%>--%>js/jquery-1.11.0.min.js"></script>
    <style type="text/css">
    .h1 {
        margin: 0 auto;
    }
    
    #producer{
        width: 48%;
         border: 1px solid blue; 
        height: 80%;
        align:center;
        margin:0 auto;
    }
    
    body{
        text-align :center;
    } 
    div {
        text-align :center;
    }
    textarea{
        width:80%;
        height:100px;
        border:1px solid gray;
    }
    button{
        background-color: rgb(62, 156, 66);
        border: none;
        font-weight: bold;
        color: white;
        height:30px;
    }
    </style>
    <script type="text/javascript">
        
        function send(controller){
            if($("#message").val()==""){
                $("#message").css("border","1px solid red");
                return;
            }else{
                $("#message").css("border","1px solid gray");
            }
            $.ajax({
                type: 'post',
                url:'<%=basePath%>rabbitmq/'+controller,
                dataType:'text', 
                data:{"message":$("#message").val()},
                success:function(data){
                    if(data=="suc"){
                        $("#status").html("<font color=green>发送成功</font>");
                        setTimeout(clear,1000);
                    }else{
                        $("#status").html("<font color=red>"+data+"</font>");
                        setTimeout(clear,5000);
                    }
                },
                error:function(data){
                    $("#status").html("<font color=red>ERROR:"+data["status"]+","+data["statusText"]+"</font>");
                    setTimeout(clear,5000);
                }
                
            });
        }
        
        function clear(){
            $("#status").html("");
        }
    
    </script>
    </head>
    
    <body>
        <h1>Hello RabbitMQ</h1>
        <div id="producer">
            <h2>Producer</h2>
            <textarea id="message"></textarea>
            <br>
            <button onclick="send('fanoutSender')">发送Fanout消息</button>
            <button onclick="send('topicSender')">发送Topic消息</button>
            <br>
            <span id="status"></span>
        </div>
    </body>
    </html>

    5. 在工程RabbitMQSpringProducer里面新建/RabbitMQSpringProducer/src/main/webapp/WEB-INF/web.xml

    <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
             xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
             version="3.0">
      <display-name>RabbitMqSpringProducerDemo</display-name>
    
      <servlet-mapping>
        <servlet-name>default</servlet-name>
        <url-pattern>*.js</url-pattern>
      </servlet-mapping>
    
      <!-- Spring 编码过滤器 start -->
      <filter>
        <filter-name>characterEncoding</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
          <param-name>encoding</param-name>
          <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
          <param-name>forceEncoding</param-name>
          <param-value>true</param-value>
        </init-param>
      </filter>
      <filter-mapping>
        <filter-name>characterEncoding</filter-name>
        <url-pattern>/*</url-pattern>
      </filter-mapping>
      <!-- Spring 编码过滤器 End -->
    
      <!-- Spring Application Context Listener Start -->
      <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath:applicationContext.xml</param-value>
      </context-param>
      <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
      </listener>
      <!-- Spring Application Context Listener End -->
    
    
      <!-- Spring MVC Config Start -->
      <servlet>
        <servlet-name>SpringMVC</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
          <param-name>contextConfigLocation</param-name>
          <param-value>classpath:spring-mvc.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
      </servlet>
      <servlet-mapping>
        <servlet-name>SpringMVC</servlet-name>
        <!-- Filter all resources -->
        <url-pattern>/</url-pattern>
      </servlet-mapping>
      <!-- Spring MVC Config End -->
    
    </web-app>

    到此生产者服务代码编写完成!

    6. 在Tomcat v8.5 8080里面启动RabbitMQSpringProducer,在浏览器输入地址http://localhost:8080/RabbitMQSpringProducer/访问

    6. 在工程RabbitMQSpringConsumer里面新建三个fanout消费者

    fanout消费者1:

    package com.study.demo.service.fanout;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 
     * @Description: RabbitMQ与Spring集成fanout消费者
     * @author leeSmall
     * @date 2018年9月17日
     *
     */
    @Component
    public class FanoutService_H1 implements MessageListener{
        private Logger logger = LoggerFactory.getLogger(FanoutService_H1.class);
        public void onMessage(Message message) {
            logger.info("Get message:"+new String(message.getBody()));
        }
    }

    fanout消费者2:

    package com.study.demo.service.fanout;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 
     * @Description: RabbitMQ与Spring集成fanout消费者
     * @author leeSmall
     * @date 2018年9月17日
     *
     */
    @Component
    public class FanoutService_H2 implements MessageListener{
        private Logger logger = LoggerFactory.getLogger(FanoutService_H2.class);
        public void onMessage(Message message) {
            logger.info("Get message:"+new String(message.getBody()));
        }
    }

    fanout消费者3:

    package com.study.demo.service.fanout;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 
     * @Description: RabbitMQ与Spring集成fanout消费者
     * @author leeSmall
     * @date 2018年9月17日
     *
     */
    @Component
    public class FanoutService_H3 implements MessageListener{
        private Logger logger = LoggerFactory.getLogger(FanoutService_H3.class);
        public void onMessage(Message message) {
            logger.info("Get message:"+new String(message.getBody()));
        }
    }

    7. 在工程RabbitMQSpringConsumer里面新建4个topic消费者

    topic消费者1:

    package com.study.demo.service.topic;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 
     * @Description: RabbitMQ与Spring集成topic消费者
     * @author leeSmall
     * @date 2018年9月17日
     *
     */
    @Component
    public class AllErrorTopicService implements MessageListener{
        private Logger logger = LoggerFactory.getLogger(AllErrorTopicService.class);
        public void onMessage(Message message) {
            logger.info("Get message:"+new String(message.getBody()));
        }
    }

    topic消费者2:

    package com.study.demo.service.topic;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 
     * @Description: RabbitMQ与Spring集成topic消费者
     * @author leeSmall
     * @date 2018年9月17日
     *
     */
    @Component
    public class AllLogTopicService implements MessageListener{
        private Logger logger = LoggerFactory.getLogger(AllLogTopicService.class);
        public void onMessage(Message message) {
            logger.info("Get message:"+new String(message.getBody()));
        }
    }

    topic消费者3:

    package com.study.demo.service.topic;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 
     * @Description: RabbitMQ与Spring集成topic消费者
     * @author leeSmall
     * @date 2018年9月17日
     *
     */
    @Component
    public class EmailAllTopicService implements MessageListener{
        private Logger logger = LoggerFactory.getLogger(EmailAllTopicService.class);
        public void onMessage(Message message) {
            logger.info("Get message:"+new String(message.getBody()));
        }
    }

    topic消费者4:

    package com.study.demo.service.topic;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 
     * @Description: RabbitMQ与Spring集成topic消费者
     * @author leeSmall
     * @date 2018年9月17日
     *
     */
    @Component
    public class EmailErrorTopicService implements MessageListener{
        private Logger logger = LoggerFactory.getLogger(EmailErrorTopicService.class);
        public void onMessage(Message message) {
            logger.info("Get message:"+new String(message.getBody()));
        }
    }

    8. 在工程RabbitMQSpringConsumer里面新建/RabbitMQSpringConsumer/src/main/java/applicationContext.xml配置文件,在里面配置RabbitMQ相关配置和消费者监听队列

    <?xml version="1.0" encoding="UTF-8"?>
    <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="
               http://www.springframework.org/schema/beans
               http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
               http://www.springframework.org/schema/context
               http://www.springframework.org/schema/context/spring-context-4.0.xsd
               http://www.springframework.org/schema/rabbit
               http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd">
    
         <!-- 配置扫描路径 -->
        <context:component-scan base-package="com.study.demo">
             <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
        </context:component-scan>
    
        <!-- rabbitMQ配置 -->
        <bean id="rabbitConnectionFactory"
              class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
            <constructor-arg value="127.0.0.1"/>
            <property name="username" value="guest"/>
            <property name="password" value="guest"/>
            <property name="channelCacheSize" value="8"/>
            <property name="port" value="5672"></property>
        </bean>
        <rabbit:admin connection-factory="rabbitConnectionFactory"/>
    
        <!-- fanout交换器 begin-->
        <!-- 定义队列 -->
        <rabbit:queue name="h1_queue" durable="false"/>
        <rabbit:queue name="h2_queue" durable="false"/>
        <rabbit:queue name="h3_queue" durable="false"/>
    
        <!-- 把需要数据的队列与交换器绑定一起 -->
        <rabbit:fanout-exchange name="fanout-exchange"
                                xmlns="http://www.springframework.org/schema/rabbit"
                                durable="false">
            <rabbit:bindings>
                <rabbit:binding queue="h1_queue"></rabbit:binding>
                <rabbit:binding queue="h2_queue"></rabbit:binding>
                <rabbit:binding queue="h3_queue"></rabbit:binding>
            </rabbit:bindings>
        </rabbit:fanout-exchange>
        <!-- fanout交换器 end-->
    
    
        <!-- topic交换器 begin-->
        <!-- 定义队列 -->
        <rabbit:queue name="all_log_queue" durable="false"/>
        <rabbit:queue name="email_all_queue" durable="false"/>
        <rabbit:queue name="email_error_queue" durable="false"/>
        <rabbit:queue name="all_error_queue" durable="false"/>
    
        <!-- 把需要数据的队列通过路由键与交换器绑定一起 -->
        <rabbit:topic-exchange name="topic-exchange"
                               xmlns="http://www.springframework.org/schema/rabbit"
                               durable="false">
            <rabbit:bindings>
                <rabbit:binding queue="all_log_queue" pattern="#"></rabbit:binding>
                <rabbit:binding queue="email_all_queue" pattern="*.email"></rabbit:binding>
                <rabbit:binding queue="email_error_queue"  pattern="error.email"></rabbit:binding>
                <rabbit:binding queue="all_error_queue"  pattern="error.*"></rabbit:binding>
    
            </rabbit:bindings>
        </rabbit:topic-exchange>
    
        <!-- topic交换器 end-->
    
    
        <!--监听容器-->
        <rabbit:listener-container connection-factory="rabbitConnectionFactory">
            <rabbit:listener ref="fanoutService_H1" queues="h1_queue" method="onMessage" />
            <rabbit:listener ref="fanoutService_H2" queues="h2_queue" method="onMessage" />
            <rabbit:listener ref="fanoutService_H3" queues="h3_queue" method="onMessage" />
            <rabbit:listener ref="allLogTopicService" queues="all_log_queue" method="onMessage" />
            <rabbit:listener ref="emailAllTopicService" queues="email_all_queue" method="onMessage" />
            <rabbit:listener ref="emailErrorTopicService" queues="email_error_queue" method="onMessage" />
            <rabbit:listener ref="allErrorTopicService" queues="all_error_queue" method="onMessage" />
        </rabbit:listener-container>
    
    
    </beans>  

    9. 在工程RabbitMQSpringConsumer里面新建/RabbitMQSpringConsumer/src/main/java/spring-mvc.xml配置文件

    <?xml version="1.0" encoding="UTF-8"?>  
    <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
    <beans xmlns="http://www.springframework.org/schema/beans"   
           xmlns:aop="http://www.springframework.org/schema/aop"   
           xmlns:context="http://www.springframework.org/schema/context"  
           xmlns:mvc="http://www.springframework.org/schema/mvc"   
           xmlns:tx="http://www.springframework.org/schema/tx"   
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
           xsi:schemaLocation="http://www.springframework.org/schema/aop   
            http://www.springframework.org/schema/aop/spring-aop-4.0.xsd   
            http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context-4.0.xsd   
            http://www.springframework.org/schema/mvc   
            http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd   
            http://www.springframework.org/schema/tx   
            http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">  
      
          <!-- 启用MVC注解 -->
        <mvc:annotation-driven />
        
        <!-- 静态资源文件,不会被Spring MVC拦截 -->
        <mvc:resources location="/resources/" mapping="/resources/**"/>
        
        <!-- 指定Sping组件扫描的基本包路径 -->
        <context:component-scan base-package="com.study.demo" >
            <!-- 这里只扫描Controller,不可重复加载Service -->
            <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
        </context:component-scan>
        
          <!-- JSP视图解析器-->
        <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">  
            <property name="prefix" value="/WEB-INF/views/" />  
            <property name="suffix" value=".jsp" />
        <!--  定义其解析视图的order顺序为1 -->
            <property name="order" value="1" />
        </bean>
        
    </beans>  

    10. 在工程RabbitMQSpringConsumer里面新建/RabbitMQSpringConsumer/src/main/webapp/WEB-INF/web.xml配置文件

    <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
             xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
             version="3.0">
      <display-name>RabbitMqSpringConsumerDemo</display-name>
    
      <context-param>
        <param-name>logbackConfigLocation</param-name>
        <param-value>/WEB-INF/conf/logback.xml</param-value>
      </context-param>
    
      <!-- Spring 编码过滤器 start -->
      <filter>
        <filter-name>characterEncoding</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
          <param-name>encoding</param-name>
          <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
          <param-name>forceEncoding</param-name>
          <param-value>true</param-value>
        </init-param>
      </filter>
      <filter-mapping>
        <filter-name>characterEncoding</filter-name>
        <url-pattern>/*</url-pattern>
      </filter-mapping>
      <!-- Spring 编码过滤器 End -->
    
    
    
      <!-- Spring Application Context Listener Start -->
      <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath:applicationContext.xml</param-value>
      </context-param>
      <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
      </listener>
      <!-- Spring Application Context Listener End -->
    
    
      <!-- Spring MVC Config Start -->
      <servlet>
        <servlet-name>SpringMVC</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    
        <init-param>
          <param-name>contextConfigLocation</param-name>
          <param-value>classpath:spring-mvc.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
      </servlet>
      <servlet-mapping>
        <servlet-name>SpringMVC</servlet-name>
        <!-- Filter all resources -->
        <url-pattern>/</url-pattern>
      </servlet-mapping>
      <!-- Spring MVC Config End -->
    
    </web-app>

     11. 生产者消费者共同日志配置文件/RabbitMQSpringConsumer/src/main/webapp/WEB-INF/conf/logback.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
    
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <!--<Pattern>%d{HH:mm:ss.SSS} [%T] %level %logger{36} - %msg%n</Pattern>-->
                <Pattern>%d{yyyy/MM/dd-HH:mm:ss} %level [%thread] %caller{1} - %msg%n</Pattern>
            </encoder>
        </appender>
    
        <logger name="com.study.demo" level="debug" addtivity="false"/>
        <logger name="org.springframework" level="error"  addtivity="false" />
    
        <root level="debug">
            <appender-ref ref="STDOUT"/>
        </root>
    
    </configuration>

     到此消费端代码编写完成!

    12. 在Tomcat v8.5 8081里面启动RabbitMQSpringConsumer消费者

    在生产者RabbitMQSpringProducer页面发送fanout消息

     查看消费者RabbitMQSpringConsumer的情况

     

    在生产者RabbitMQSpringProducer页面发送topic消息

     

    查看消费者RabbitMQSpringConsumer的情况

    示例代码获取地址

  • 相关阅读:
    正则表达式详解<一>
    multimap详讲
    map详讲<二>
    map详解<一>
    priority_queue详解
    容器适配器(一):queue
    用 input() 函数返回的数据是字符串类型
    学习python的基本了解
    学习oracle的SQL语句 练习
    oracle 练习题
  • 原文地址:https://www.cnblogs.com/leeSmall/p/9657687.html
Copyright © 2011-2022 走看看