zoukankan      html  css  js  c++  java
  • RabbitMQ+Spring 结合使用

    1:创建一个Maven工程,pom.xml文件如下:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.liu</groupId>
      <artifactId>rabbitmq</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>rabbitmq</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!-- spring版本号 -->
        <spring.version>3.2.8.RELEASE</spring.version>
        <!-- log4j日志文件管理包版本 -->
        <slf4j.version>1.6.6</slf4j.version>
        <log4j.version>1.2.12</log4j.version>
        <!-- junit版本号 -->
        <junit.version>4.10</junit.version>
      </properties>
    
      <dependencies>
        <!-- 添加Spring依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
    
        <!--单元测试依赖 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
    
        <!-- 日志文件管理包 -->
        <!-- log start -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- log end -->
    
        <!--spring单元测试依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>
    
        <!--rabbitmq依赖 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.1.3.RELEASE</version>
        </dependency>
    
        <dependency>
            <groupId>javax.validation</groupId>
            <artifactId>validation-api</artifactId>
            <version>1.1.0.Final</version>
        </dependency>
    
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>5.0.1.Final</version>
        </dependency>
        </dependencies>
        <build>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <targetPath>${basedir}/target/classes</targetPath>
                    <includes>
                        <include>**/*.properties</include>
                        <include>**/*.xml</include>
                    </includes>
                    <filtering>true</filtering>
                </resource>
                <resource>
                    <directory>src/main/resources</directory>
                    <targetPath>${basedir}/target/resources</targetPath>
                    <includes>
                        <include>**/*.properties</include>
                        <include>**/*.xml</include>
                    </includes>
                    <filtering>true</filtering>
                </resource>
            </resources>
    
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-war-plugin</artifactId>
                    <version>2.1.1</version>
                    <configuration>
                        <warSourceExcludes>${warExcludes}</warSourceExcludes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.4.3</version>
                    <configuration>
                        <testFailureIgnore>true</testFailureIgnore>
                    </configuration>
                </plugin>
                <plugin>
                    <inherited>true</inherited>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-source-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>attach-sources</id>
                            <goals>
                                <goal>jar</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-resources-plugin</artifactId>
                    <configuration>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>

    2:配置文件如下:

      log4j.properties:

    log4j.rootLogger=debug,Console,Stdout
    
    #Console
    log4j.appender.Console=org.apache.log4j.ConsoleAppender
    log4j.appender.Console.layout=org.apache.log4j.PatternLayout
    log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
    
    log4j.logger.java.sql.ResultSet=INFO
    log4j.logger.org.apache=INFO
    log4j.logger.java.sql.Connection=DEBUG
    log4j.logger.java.sql.Statement=DEBUG
    log4j.logger.java.sql.PreparedStatement=DEBUG 
    
    log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender  
    log4j.appender.Stdout.File = E://logs/log.log  
    log4j.appender.Stdout.Append = true  
    log4j.appender.Stdout.Threshold = DEBUG   
    log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout  
    log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n 

      rabbitMQ.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans default-lazy-init="false"
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:p="http://www.springframework.org/schema/p"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
            http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
        <!--配置connection-factory,指定连接rabbit server参数 -->
        <rabbit:connection-factory id="connectionFactory"
            username="guest" password="guest" host="127.0.0.1" port="5672" />
            
            <!--通过指定下面的admin信息,当前proceducer中的exchange和queue会在rabbitmq服务器上自动生成 -->
        <rabbit:admin connection-factory="connectionFactory" />
            
            <!-- 标准的建立Queue的参数 -->
        <rabbit:queue-arguments id="amqpQueueArguments">
            <!-- 暂时没有 -->
        </rabbit:queue-arguments>    
        
        <rabbit:queue queue-arguments="amqpQueueArguments" id="amqpTemplateReplyQueue" name="test"/>
        
        <!--定义rabbit template用于数据的接收和发送 -->
        <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"  reply-queue="amqpTemplateReplyQueue">
            <rabbit:reply-listener concurrency="2"/>
        </rabbit:template>
    
        <!--定义queue -->
        <rabbit:queue name="queueTest" id="amqpTemplateRequestQueue" queue-arguments="amqpQueueArguments"/>
    
        <!-- 消息接收者 -->
        <bean id="messageReceiver" class="com.liu.rabbitmq.Consumer"></bean>
        
        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
        <rabbit:listener-container connection-factory="connectionFactory">
                 <rabbit:listener queues="amqpTemplateRequestQueue" ref="messageReceiver"/>
        </rabbit:listener-container>
        
    </beans>

      application-context.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:p="http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
    
    
        <import resource="classpath*:rabbitmq.xml" />
        
        <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->
        <context:component-scan base-package="com.liu.rabbitmq,com.liu.rabbitmq" />
        
    
        <!-- 激活annotation功能 -->
        <context:annotation-config />
        <!-- 激活annotation功能 -->
        <context:spring-configured />
    
    </beans>

    3:创建消费者和生产者:

      Proceducer.java :

    package com.liu.rabbitmq;
    
    import javax.annotation.Resource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.stereotype.Service;
    
    /**
     * 功能概要:消息产生,提交到队列中去
     * 
     * @author liucc
     * @since  2016年12月1日 
     */
    @Service
    public class Proceducer  {
    
        private Logger logger = LoggerFactory.getLogger(Proceducer.class);
    
        @Resource
        private AmqpTemplate amqpTemplate;
    
        public void sendMessage(Object message){
          //发送消息到消息队列服务器中,并得到回馈内容
          Object object=amqpTemplate.convertSendAndReceive("queueTest",message,new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    MessageProperties props = message.getMessageProperties();
                    //把版本加入消息头中
                    props.setHeader("header", "1.0.0");
                    props.setExpiration(String.valueOf(30000));
                    logger.debug("设置RPC消息的TTL为{}", 30000);
                    return message;
                }
            });
          System.out.println(object);
        }
    }

      Consumer.java:

    package com.liu.rabbitmq;
    
    import java.io.ByteArrayInputStream;
    import java.io.ObjectInputStream;
    
    import javax.annotation.Resource;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.amqp.support.converter.SimpleMessageConverter;
    
    /**
     * 功能概要:消费接收
     * 
     * @author liucc
     * @since  2016年12月1日 
     */
    public class Consumer implements MessageListener {
        //private Logger logger = LoggerFactory.getLogger(Consumer.class);
        @Resource
        private AmqpTemplate amqpTemplate;
        
        @Override
        public void onMessage(Message message) {
            //logger.info("receive message:{}",message);
            try {
                //将字节流对象转换成Java对象
                Person person=(Person) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();
                System.out.println("年龄:"+person.getAge());
            } catch (Exception e) {
                e.printStackTrace();
            }
            String replyTo = message.getMessageProperties().getReplyTo();
            MessageConverter messageConverter=new SimpleMessageConverter();
            
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().putAll(message.getMessageProperties().getHeaders());
            
            String response=new String("收到返回消息");
            //将Java对象转成Message对象,并作为返回的内容,回送给生产者
            Message message2=messageConverter.toMessage(response, messageProperties);
            amqpTemplate.send(replyTo, message2);
            
            
        }
    }

    4:测试

      创建实体类:Person.java

      

    package com.liu.rabbitmq;
    
    import java.io.Serializable;
    
    public class Person implements Serializable {
        
        private static final long serialVersionUID = 1L;
        private String name;
        private int age;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public int getAge() {
            return age;
        }
        public void setAge(int age) {
            this.age = age;
        }
        public Person(String name, int age) {
            super();
            this.name = name;
            this.age = age;
        }
        
    }

      RAbbitmqTest.java:

    package com.liu.rabbitmq;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class RabbitmqTest {
        
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context=new ClassPathXmlApplicationContext("application-context.xml");
            Proceducer proceducer=(Proceducer) context.getBean("proceducer") ; 
            Person person=new Person("liucc",22);
            System.out.println(person);
            proceducer.sendMessage(person);        
            
        }
    }

    注:本demo主要实现了rabbitMQ集合Spring框架的基本使用,生产者向消息队列服务器发送消息,消费者接收消息,并经过简单的处理,将消息返回给生产者,完成一个基本的通信过程。消息的传输对象可以是任何对象,在内部传输时都会别装换成RabbitMQ特有的对象Message(类似报文对象),消息的主体在Message的body部分。

  • 相关阅读:
    Effective C++ 学习一
    JavaScript 定义类和继承类的基本步骤
    Vararg collection Factory Method
    apache之httpd启动、终止、重启小结
    Thinking in C++ 学习笔记[1]
    Creational Pattern 之 Abstract Factory
    Perl WEB 开发之 Template
    C语言博客作业数据类型
    C语言博客作业一二维数组
    C语言博客作业函数
  • 原文地址:https://www.cnblogs.com/cowboys/p/6163796.html
Copyright © 2011-2022 走看看