zoukankan      html  css  js  c++  java
  • RabbitMQ ——与Spring集成及exchange的direct、topic方式实现和简单队列实现

    程序整体结构

    Maven依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.test</groupId>
    	<artifactId>springrabbitmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<!-- log4j日志文件管理包版本 -->
    		<slf4j.version>1.7.7</slf4j.version>
    		<log4j.version>1.2.17</log4j.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-core</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-webmvc</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-context</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-context-support</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-aop</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-aspects</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-tx</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-jdbc</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-web</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>4.9</version>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-test</artifactId>
    			<version> 3.2.8.RELEASE  </version>
    			<scope>provided</scope>
    		</dependency>
    
    		<!--rabbitmq依赖 -->
    		<dependency>
    			<groupId>org.springframework.amqp</groupId>
    			<artifactId>spring-rabbit</artifactId>
    			<version>1.3.5.RELEASE</version>
    		</dependency>
    		<!-- log start -->
    		<dependency>
    			<groupId>log4j</groupId>
    			<artifactId>log4j</artifactId>
    			<version>${log4j.version}</version>
    		</dependency>
    		<!-- 格式化对象,方便输出日志 -->
    		<dependency>
    			<groupId>com.alibaba</groupId>
    			<artifactId>fastjson</artifactId>
    			<version>1.1.41</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>${slf4j.version}</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>${slf4j.version}</version>
    		</dependency>
    		<!-- log end -->
    	</dependencies>
    </project>

    application.xml

    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">  
      
        <import resource="classpath*:rabbitMQ.xml" />  
          
        <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->  
        <context:component-scan base-package="com.gm.consumer, com.gm.producer" />  
          
        <!-- 激活annotation功能 -->  
        <context:annotation-config />  
        <!-- 激活annotation功能 -->  
        <context:spring-configured />  
      
    </beans>

    rabbitMQ.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
         http://www.springframework.org/schema/rabbit  
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
    
    	<!-- RabbitMQ公共配置部分 start -->
    
    	<!--配置connection-factory,指定连接rabbit server参数 -->
    	<rabbit:connection-factory id="connectionFactory"
    		virtual-host="/" username="admin" password="123456" host="127.0.0.1"
    		port="5672" />
    
    	<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    	<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
    
    	<!-- RabbitMQ公共配置部分 end -->
    
    	<!-- ~~~~~~~~~~~~~~~~~~~~~华丽的分割线~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    
    	<!-- 定义 direct方式的exchange、队列、消息收发 start -->
    
    	<!--定义queue -->
    	<rabbit:queue name="direct_queue_test" durable="true"
    		auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
    	<!-- 定义direct exchange,绑定direct_queue_test -->
    	<rabbit:direct-exchange name="exchange_direct"
    		durable="true" auto-delete="false" declared-by="connectAdmin">
    		<rabbit:bindings>
    			<rabbit:binding queue="direct_queue_test" key="direct_queue_key"></rabbit:binding>
    		</rabbit:bindings>
    	</rabbit:direct-exchange>
    
    	<!--定义rabbit template用于数据的接收和发送 -->
    	<rabbit:template id="directAmqpTemplate"
    		connection-factory="connectionFactory" exchange="exchange_direct" />
    
    	<!-- 消息接收者 -->
    	<bean id="directMessageReceiver" class="com.gm.consumer.DirectMessageReceiver"></bean>
    
    	<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    	<rabbit:listener-container
    		connection-factory="connectionFactory">
    		<rabbit:listener queues="direct_queue_test" ref="directMessageReceiver" />
    	</rabbit:listener-container>
    
    	<!-- 定义 direct方式的exchange、队列、消息收发 end -->
    
    
    	<!-- ~~~~~~~~~~~~~~~~~~~~~华丽的分割线~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    
    	<!-- 定义 topic方式的exchange、队列、消息收发 start -->
    
    	<!--定义queue -->
    	<rabbit:queue name="topic_queue_test" durable="true"
    		auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
    	<!--定义topic类型exchange,绑定direct_queue_test -->
    	<rabbit:topic-exchange name="exchange_topic">
    		<rabbit:bindings>
    			<rabbit:binding queue="topic_queue_test" pattern="log.#" />
    		</rabbit:bindings>
    	</rabbit:topic-exchange>
    
    	<!--定义rabbit template用于数据的接收和发送 -->
    	<rabbit:template id="topicAmqpTemplate"
    		connection-factory="connectionFactory" exchange="exchange_topic" />
    
    	<!-- 消息接收者 -->
    	<bean id="topicMessageReceiver" class="com.gm.consumer.TopicMessageReceiver"></bean>
    
    	<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    	<rabbit:listener-container
    		connection-factory="connectionFactory">
    		<rabbit:listener queues="topic_queue_test" ref="topicMessageReceiver" />
    	</rabbit:listener-container>
    
    	<!-- 定义 topic方式的exchange、队列、消息收发 end -->
    
    
    	<!-- ~~~~~~~~~~~~~~~~~~~~~华丽的分割线~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    
    
    	<!-- 创建rabbitTemplate 消息模板类 -->
    	<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    		<constructor-arg ref="connectionFactory"></constructor-arg>
    	</bean>
    
    	<rabbit:queue name="default_queue" durable="true"
    		auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
    	<!-- 消息接收者 -->
    	<bean id="defaultMessageReceiver" class="com.gm.consumer.DefaultMessageReceiver"></bean>
    
    	<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    	<rabbit:listener-container
    		connection-factory="connectionFactory">
    		<rabbit:listener queues="default_queue" ref="defaultMessageReceiver" />
    	</rabbit:listener-container>
    </beans>

    exchange的direct消费者

    DirectMessageReceiver.java 

    package com.gm.consumer;
    
    import java.io.UnsupportedEncodingException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class DirectMessageReceiver implements MessageListener {
    	private Logger logger = LoggerFactory.getLogger(DirectMessageReceiver.class);
    
    	public void onMessage(Message message) {
    
    		logger.info("consumer receive message------->:{}", message);
    		String body;
    		try {
    			body = new String(message.getBody(), "UTF-8");
    			System.out.println("consumer receive message------->"+body);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    

    exchange的topic消费者

    TopicMessageReceiver.java

    package com.gm.consumer;
    
    import java.io.UnsupportedEncodingException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicMessageReceiver implements MessageListener {
    	private Logger logger = LoggerFactory.getLogger(TopicMessageReceiver.class);
    
    	public void onMessage(Message message) {
    
    		logger.info("consumer receive message------->:{}", message);
    		String body;
    		try {
    			body = new String(message.getBody(), "UTF-8");
    			System.out.println("consumer receive message------->" + body);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    

    简单队列消费者

    DefaultMessageReceiver.java

    package com.gm.consumer;
    
    import java.io.UnsupportedEncodingException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class DefaultMessageReceiver implements MessageListener {
    	private Logger logger = LoggerFactory.getLogger(DefaultMessageReceiver.class);
    
    	public void onMessage(Message message) {
    
    		logger.info("consumer receive message------->:{}", message);
    		String body;
    		try {
    			body = new String(message.getBody(), "UTF-8");
    			System.out.println("consumer receive message------->"+body);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    

    exchange的direct生产者

    DirectMessageProducer.java

    package com.gm.producer;
    
    import java.io.IOException;
    
    import javax.annotation.Resource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class DirectMessageProducer {
    	private Logger logger = LoggerFactory.getLogger(DirectMessageProducer.class);
    
    	@Resource(name = "directAmqpTemplate")
    	private AmqpTemplate directAmqpTemplate;
    
    	public void sendMessage(Object message) throws IOException {
    		logger.info("to send message:{}", message);
    		directAmqpTemplate.convertAndSend("direct_queue_key", message);
    	}
    }
    

    exchange的topic生产者

    TopicMessageProducer.java

    package com.gm.producer;
    
    import java.io.IOException;
    
    import javax.annotation.Resource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class TopicMessageProducer {
    	private Logger logger = LoggerFactory.getLogger(TopicMessageProducer.class);
    
    	@Resource(name = "topicAmqpTemplate")
    	private AmqpTemplate topicAmqpTemplate;
    
    	public void sendMessage(Object message) throws IOException {
    		logger.info("to send message:{}", message);
    		topicAmqpTemplate.convertAndSend("log.info", message);
    	}
    }
    

    简单队列生产者

    DefaultMessageProducer.java

    package com.gm.producer;
    
    import java.io.IOException;
    
    import javax.annotation.Resource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class DefaultMessageProducer {
    	private Logger logger = LoggerFactory.getLogger(DefaultMessageProducer.class);
    
    	@Resource(name = "rabbitTemplate")
    	RabbitTemplate rabbitTemplate;
    
    	public void sendMessage(Object message) throws IOException {
    		logger.info("to send message:{}", message);
    		rabbitTemplate.convertAndSend("default_queue", message);
    	}
    }
    

    单元测试

    ProducerMain.java

    package com.gm.producer;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    @RunWith(SpringJUnit4ClassRunner.class) // 使用junit4进行测试
    @ContextConfiguration(locations = { "classpath:application.xml" }) // 加载配置文件
    public class ProducerMain {
    
    	private Logger logger = LoggerFactory.getLogger(ProducerMain.class);
    	
    	@Autowired
    	DirectMessageProducer directMessageProducer;
    	
    	@Autowired
    	TopicMessageProducer topicMessageProducer;
    	
    	@Autowired
    	DefaultMessageProducer defaultMessageProducer;
    
    	@Test
    	public void send_messages() throws Exception {
    		directMessageProducer.sendMessage("我是directMessageProducer发出的消息");
    		topicMessageProducer.sendMessage("我是topicMessageProducer发出的消息");
    		defaultMessageProducer.sendMessage("我是defaultMessageProducer发出的消息");
    		
    		while(true) {
    			
    		}
    	}
    }

    结果

  • 相关阅读:
    Constants and Variables
    随想
    C#基础篇之语言和框架介绍
    Python基础19 实例方法 类方法 静态方法 私有变量 私有方法 属性
    Python基础18 实例变量 类变量 构造方法
    Python基础17 嵌套函数 函数类型和Lambda表达式 三大基础函数 filter() map() reduce()
    Python基础16 函数返回值 作用区域 生成器
    Python基础11 List插入,删除,替换和其他常用方法 insert() remove() pop() reverse() copy() clear() index() count()
    Python基础15 函数的定义 使用关键字参数调用 参数默认值 可变参数
    Python基础14 字典的创建修改访问和遍历 popitem() keys() values() items()
  • 原文地址:https://www.cnblogs.com/gmhappy/p/11864032.html
Copyright © 2011-2022 走看看