zoukankan      html  css  js  c++  java
  • RabbitMQ ——与Spring集成及exchange的direct、topic方式实现和简单队列实现

    程序整体结构

    Maven依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.test</groupId>
    	<artifactId>springrabbitmq</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<!-- log4j日志文件管理包版本 -->
    		<slf4j.version>1.7.7</slf4j.version>
    		<log4j.version>1.2.17</log4j.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-core</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-webmvc</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-context</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-context-support</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-aop</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-aspects</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-tx</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-jdbc</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-web</artifactId>
    			<version>3.2.8.RELEASE</version>
    		</dependency>
    
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>4.9</version>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-test</artifactId>
    			<version> 3.2.8.RELEASE  </version>
    			<scope>provided</scope>
    		</dependency>
    
    		<!--rabbitmq依赖 -->
    		<dependency>
    			<groupId>org.springframework.amqp</groupId>
    			<artifactId>spring-rabbit</artifactId>
    			<version>1.3.5.RELEASE</version>
    		</dependency>
    		<!-- log start -->
    		<dependency>
    			<groupId>log4j</groupId>
    			<artifactId>log4j</artifactId>
    			<version>${log4j.version}</version>
    		</dependency>
    		<!-- 格式化对象,方便输出日志 -->
    		<dependency>
    			<groupId>com.alibaba</groupId>
    			<artifactId>fastjson</artifactId>
    			<version>1.1.41</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-api</artifactId>
    			<version>${slf4j.version}</version>
    		</dependency>
    		<dependency>
    			<groupId>org.slf4j</groupId>
    			<artifactId>slf4j-log4j12</artifactId>
    			<version>${slf4j.version}</version>
    		</dependency>
    		<!-- log end -->
    	</dependencies>
    </project>

    application.xml

    <?xml version="1.0" encoding="UTF-8"?>  
    <beans xmlns="http://www.springframework.org/schema/beans"  
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">  
      
        <import resource="classpath*:rabbitMQ.xml" />  
          
        <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans -->  
        <context:component-scan base-package="com.gm.consumer, com.gm.producer" />  
          
        <!-- 激活annotation功能 -->  
        <context:annotation-config />  
        <!-- 激活annotation功能 -->  
        <context:spring-configured />  
      
    </beans>

    rabbitMQ.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
         http://www.springframework.org/schema/rabbit  
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
    
    	<!-- RabbitMQ公共配置部分 start -->
    
    	<!--配置connection-factory,指定连接rabbit server参数 -->
    	<rabbit:connection-factory id="connectionFactory"
    		virtual-host="/" username="admin" password="123456" host="127.0.0.1"
    		port="5672" />
    
    	<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    	<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
    
    	<!-- RabbitMQ公共配置部分 end -->
    
    	<!-- ~~~~~~~~~~~~~~~~~~~~~华丽的分割线~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    
    	<!-- 定义 direct方式的exchange、队列、消息收发 start -->
    
    	<!--定义queue -->
    	<rabbit:queue name="direct_queue_test" durable="true"
    		auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
    	<!-- 定义direct exchange,绑定direct_queue_test -->
    	<rabbit:direct-exchange name="exchange_direct"
    		durable="true" auto-delete="false" declared-by="connectAdmin">
    		<rabbit:bindings>
    			<rabbit:binding queue="direct_queue_test" key="direct_queue_key"></rabbit:binding>
    		</rabbit:bindings>
    	</rabbit:direct-exchange>
    
    	<!--定义rabbit template用于数据的接收和发送 -->
    	<rabbit:template id="directAmqpTemplate"
    		connection-factory="connectionFactory" exchange="exchange_direct" />
    
    	<!-- 消息接收者 -->
    	<bean id="directMessageReceiver" class="com.gm.consumer.DirectMessageReceiver"></bean>
    
    	<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    	<rabbit:listener-container
    		connection-factory="connectionFactory">
    		<rabbit:listener queues="direct_queue_test" ref="directMessageReceiver" />
    	</rabbit:listener-container>
    
    	<!-- 定义 direct方式的exchange、队列、消息收发 end -->
    
    
    	<!-- ~~~~~~~~~~~~~~~~~~~~~华丽的分割线~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    
    	<!-- 定义 topic方式的exchange、队列、消息收发 start -->
    
    	<!--定义queue -->
    	<rabbit:queue name="topic_queue_test" durable="true"
    		auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
    	<!--定义topic类型exchange,绑定direct_queue_test -->
    	<rabbit:topic-exchange name="exchange_topic">
    		<rabbit:bindings>
    			<rabbit:binding queue="topic_queue_test" pattern="log.#" />
    		</rabbit:bindings>
    	</rabbit:topic-exchange>
    
    	<!--定义rabbit template用于数据的接收和发送 -->
    	<rabbit:template id="topicAmqpTemplate"
    		connection-factory="connectionFactory" exchange="exchange_topic" />
    
    	<!-- 消息接收者 -->
    	<bean id="topicMessageReceiver" class="com.gm.consumer.TopicMessageReceiver"></bean>
    
    	<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    	<rabbit:listener-container
    		connection-factory="connectionFactory">
    		<rabbit:listener queues="topic_queue_test" ref="topicMessageReceiver" />
    	</rabbit:listener-container>
    
    	<!-- 定义 topic方式的exchange、队列、消息收发 end -->
    
    
    	<!-- ~~~~~~~~~~~~~~~~~~~~~华丽的分割线~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
    
    
    	<!-- 创建rabbitTemplate 消息模板类 -->
    	<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    		<constructor-arg ref="connectionFactory"></constructor-arg>
    	</bean>
    
    	<rabbit:queue name="default_queue" durable="true"
    		auto-delete="false" exclusive="false" declared-by="connectAdmin" />
    
    	<!-- 消息接收者 -->
    	<bean id="defaultMessageReceiver" class="com.gm.consumer.DefaultMessageReceiver"></bean>
    
    	<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    	<rabbit:listener-container
    		connection-factory="connectionFactory">
    		<rabbit:listener queues="default_queue" ref="defaultMessageReceiver" />
    	</rabbit:listener-container>
    </beans>

    exchange的direct消费者

    DirectMessageReceiver.java 

    package com.gm.consumer;
    
    import java.io.UnsupportedEncodingException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class DirectMessageReceiver implements MessageListener {
    	private Logger logger = LoggerFactory.getLogger(DirectMessageReceiver.class);
    
    	public void onMessage(Message message) {
    
    		logger.info("consumer receive message------->:{}", message);
    		String body;
    		try {
    			body = new String(message.getBody(), "UTF-8");
    			System.out.println("consumer receive message------->"+body);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    

    exchange的topic消费者

    TopicMessageReceiver.java

    package com.gm.consumer;
    
    import java.io.UnsupportedEncodingException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicMessageReceiver implements MessageListener {
    	private Logger logger = LoggerFactory.getLogger(TopicMessageReceiver.class);
    
    	public void onMessage(Message message) {
    
    		logger.info("consumer receive message------->:{}", message);
    		String body;
    		try {
    			body = new String(message.getBody(), "UTF-8");
    			System.out.println("consumer receive message------->" + body);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    

    简单队列消费者

    DefaultMessageReceiver.java

    package com.gm.consumer;
    
    import java.io.UnsupportedEncodingException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class DefaultMessageReceiver implements MessageListener {
    	private Logger logger = LoggerFactory.getLogger(DefaultMessageReceiver.class);
    
    	public void onMessage(Message message) {
    
    		logger.info("consumer receive message------->:{}", message);
    		String body;
    		try {
    			body = new String(message.getBody(), "UTF-8");
    			System.out.println("consumer receive message------->"+body);
    		} catch (UnsupportedEncodingException e) {
    			e.printStackTrace();
    		}
    
    	}
    
    }
    

    exchange的direct生产者

    DirectMessageProducer.java

    package com.gm.producer;
    
    import java.io.IOException;
    
    import javax.annotation.Resource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class DirectMessageProducer {
    	private Logger logger = LoggerFactory.getLogger(DirectMessageProducer.class);
    
    	@Resource(name = "directAmqpTemplate")
    	private AmqpTemplate directAmqpTemplate;
    
    	public void sendMessage(Object message) throws IOException {
    		logger.info("to send message:{}", message);
    		directAmqpTemplate.convertAndSend("direct_queue_key", message);
    	}
    }
    

    exchange的topic生产者

    TopicMessageProducer.java

    package com.gm.producer;
    
    import java.io.IOException;
    
    import javax.annotation.Resource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class TopicMessageProducer {
    	private Logger logger = LoggerFactory.getLogger(TopicMessageProducer.class);
    
    	@Resource(name = "topicAmqpTemplate")
    	private AmqpTemplate topicAmqpTemplate;
    
    	public void sendMessage(Object message) throws IOException {
    		logger.info("to send message:{}", message);
    		topicAmqpTemplate.convertAndSend("log.info", message);
    	}
    }
    

    简单队列生产者

    DefaultMessageProducer.java

    package com.gm.producer;
    
    import java.io.IOException;
    
    import javax.annotation.Resource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class DefaultMessageProducer {
    	private Logger logger = LoggerFactory.getLogger(DefaultMessageProducer.class);
    
    	@Resource(name = "rabbitTemplate")
    	RabbitTemplate rabbitTemplate;
    
    	public void sendMessage(Object message) throws IOException {
    		logger.info("to send message:{}", message);
    		rabbitTemplate.convertAndSend("default_queue", message);
    	}
    }
    

    单元测试

    ProducerMain.java

    package com.gm.producer;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    @RunWith(SpringJUnit4ClassRunner.class) // 使用junit4进行测试
    @ContextConfiguration(locations = { "classpath:application.xml" }) // 加载配置文件
    public class ProducerMain {
    
    	private Logger logger = LoggerFactory.getLogger(ProducerMain.class);
    	
    	@Autowired
    	DirectMessageProducer directMessageProducer;
    	
    	@Autowired
    	TopicMessageProducer topicMessageProducer;
    	
    	@Autowired
    	DefaultMessageProducer defaultMessageProducer;
    
    	@Test
    	public void send_messages() throws Exception {
    		directMessageProducer.sendMessage("我是directMessageProducer发出的消息");
    		topicMessageProducer.sendMessage("我是topicMessageProducer发出的消息");
    		defaultMessageProducer.sendMessage("我是defaultMessageProducer发出的消息");
    		
    		while(true) {
    			
    		}
    	}
    }

    结果

  • 相关阅读:
    axios、ajax、fetch三者的区别
    React与Vue的相同与不同点
    react-redux
    redux【react】
    react高阶组件
    基于WebGL无插件虚拟场景漫游关键技术(完整版)ThingJS
    基于WebGL的三维交通监控可视化技术应用(实践版) ThingJS
    地下管线监控系统中互联网WebGL三维可视化构建技术 ThingJS
    基于WebGL实现智慧校园的全景漫游技术研究 三维可视化
    基于WebGL的3D可视化告警系统关键技术解析 ThingJS
  • 原文地址:https://www.cnblogs.com/gmhappy/p/11864032.html
Copyright © 2011-2022 走看看