zoukankan      html  css  js  c++  java
  • RabbitMQ —— spring整合发送异步消息

    POM.xml

    <properties>  
        <rabbitmq.version>3.0.4</rabbitmq.version>  
        <spring.amqp.version>1.1.1.RELEASE</spring.amqp.version>  
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
        <spring.version>3.1.2.RELEASE</spring.version>  
    </properties>  
    <dependencies>  
        <dependency>  
            <groupId>com.rabbitmq</groupId>  
            <artifactId>amqp-client</artifactId>  
            <version>${rabbitmq.version}</version>  
        </dependency>  
    
        <dependency>  
            <groupId>org.springframework.amqp</groupId>  
            <artifactId>spring-rabbit</artifactId>  
            <version>${spring.amqp.version}</version>  
        </dependency>  
    
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-core</artifactId>  
            <version>${spring.version}</version>  
            <exclusions>  
                <!-- Exclude Commons Logging in favor of SLF4j -->  
                <exclusion>  
                    <groupId>commons-logging</groupId>  
                    <artifactId>commons-logging</artifactId>  
                </exclusion>  
            </exclusions>  
        </dependency>  
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-context</artifactId>  
            <version>${spring.version}</version>  
        </dependency>  
    
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-test</artifactId>  
            <version>${spring.version}</version>  
            <scope>test</scope>  
        </dependency>  
    
        <dependency>  
            <groupId>junit</groupId>  
            <artifactId>junit</artifactId>  
            <version>4.8.1</version>  
            <scope>test</scope>  
        </dependency>  
    
        <dependency>  
            <groupId>org.springframework.amqp</groupId>  
            <artifactId>spring-amqp</artifactId>  
            <version>${spring.amqp.version}</version>  
            <classifier>sources</classifier>  
            <scope>compile</scope>  
        </dependency>  
        <dependency>  
            <groupId>commons-lang</groupId>  
            <artifactId>commons-lang</artifactId>  
            <version>2.6</version>  
        </dependency>  
        <dependency>  
            <groupId>org.slf4j</groupId>  
            <artifactId>slf4j-api</artifactId>  
            <version>1.5.10</version>  
        </dependency>  
        <dependency>  
            <groupId>org.slf4j</groupId>  
            <artifactId>jcl-over-slf4j</artifactId>  
            <version>1.5.10</version>  
            <scope>runtime</scope>  
        </dependency>  
        <dependency>  
            <groupId>org.slf4j</groupId>  
            <artifactId>slf4j-log4j12</artifactId>  
            <version>1.5.10</version>  
            <scope>runtime</scope>  
        </dependency>  
        <dependency>  
            <groupId>log4j</groupId>  
            <artifactId>log4j</artifactId>  
            <version>1.2.14</version>  
            <scope>runtime</scope>  
        </dependency>  
        <dependency>  
            <groupId>org.aspectj</groupId>  
            <artifactId>aspectjweaver</artifactId>  
            <version>1.6.9</version>  
        </dependency>  
    </dependencies>
    View Code

    rabbitmq.properties

    rabbit.hosts=192.168.42.128
    rabbit.username=root
    rabbit.password=123456
    rabbit.virtualHost=/
    rabbit.queue=spring.queue.sync
    #routingkey的名称默认为Queue的名称
    rabbit.routingKey=spring.queue.sync
    View Code

    applicationContext-rabbitmq-send-async.xml

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />  
        <property name="ignoreResourceNotFound" value="true" />  
        <property name="locations">  
            <list>
                <!-- 标准配置 -->
                <value>classpath*:rabbitmq.properties</value>
            </list>
        </property>
    </bean> 
    
    <!-- 创建connectionFactory -->  
    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
        <constructor-arg value="${rabbit.hosts}"/>  
        <property name="username" value="${rabbit.username}"/>  
        <property name="password" value="${rabbit.password}"/>  
        <property name="virtualHost" value="${rabbit.virtualHost}"/>  
        <property name="channelCacheSize" value="5"/>  
    </bean>  
      
    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">  
        <constructor-arg ref="rabbitConnectionFactory" />  
    </bean>  
      
    <!--定义rabbit template用于数据的接收和发送 -->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">  
        <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>  
        <property name="queue" value="${rabbit.queue}"></property>  
        <property name="routingKey" value="${rabbit.routingKey}"></property>    
    </bean>
    View Code

    applicationContext-rabbitmq-receive-async.xml

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />  
        <property name="ignoreResourceNotFound" value="true" />  
        <property name="locations">  
            <list>
                <!-- 标准配置 -->
                <value>classpath*:rabbitmq.properties</value>
            </list>
        </property>
    </bean> 
    
    <!-- 创建connectionFactory -->  
    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">  
        <constructor-arg value="${rabbit.hosts}"/>  
        <property name="username" value="${rabbit.username}"/>  
        <property name="password" value="${rabbit.password}"/>  
        <property name="virtualHost" value="${rabbit.virtualHost}"/>  
        <property name="channelCacheSize" value="5"/>  
    </bean>  
    
    <!-- 声明消息转换器为SimpleMessageConverter -->
    <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>
    
    <!-- 开始监听生产者 -->
    <!-- 用于接收消息的处理类 -->
    <bean id="msgHandler" class="test.async.ReceiveMsgHandler"></bean>
    
    <!-- 用于消息监听的代理类MessageListenerAdapter -->
    <bean id="messageListenerAdatper" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="msgHandler"></constructor-arg>
        <property name="defaultListenerMethod" value="handlerMsg"></property>
        <property name="messageConverter" ref="messageConverter"></property>
    </bean>
    
    <!-- 用于消息监听的容器类,queueName一定要和定义的Queue的值相同 -->
    <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="queueNames" value="${rabbit.queue}"></property>
        <property name="connectionFactory" ref="rabbitConnectionFactory"></property>
        <property name="messageListener" ref="messageListenerAdatper"></property>
    </bean>
    View Code

    Producer.java

    public class Producer {
        public static void main(String[] args) throws InterruptedException {  
            ApplicationContext context = new ClassPathXmlApplicationContext("async/applicationContext-rabbitmq-send-async.xml");  
            AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class);  
            
            for(int i = 0;i < 100;i++) {
                amqpTemplate.convertAndSend("test spring async:"+i);
                System.out.println("send ok:"+i);
                Thread.sleep(2000);
            }
        }  
    }
    View Code

    ReceiveMsgHandler.java(消息接收处理类

    public class ReceiveMsgHandler {
        public void handlerMsg(String msg) {
            System.out.println("receive msg:"+msg);
        }
    }
    View Code

    Consumer.java

    public class Consumer {
        public static void main(String[] args) {  
            ApplicationContext context = new ClassPathXmlApplicationContext("async/applicationContext-rabbitmq-receive-async.xml");  
        }  
    }
    View Code
  • 相关阅读:
    MyEclipse启动时,报错Error:could not open`E:Program FilesJavaJAVAlibamd64jvm.cfg'
    换JDK以后,MyEclipse无法启动,报错:Failed to load the JNI...
    ORA-12514
    java语言
    基本数据类型
    C# 计算时间间隔,两个时间差(年月日时分秒)
    Java的Stream流
    Java的Lambda表达式和函数式接口
    Java中的Log
    Oracle的触发器Trigger
  • 原文地址:https://www.cnblogs.com/yifanSJ/p/9034658.html
Copyright © 2011-2022 走看看