zoukankan      html  css  js  c++  java
  • SpringBoot整合rabbitmq

    使用springboot整合rabbitmq

       逻辑梳理:

            1.创建连接:ConnectionFactory  (host、port、username、password)

            2.创建队列并绑定 (queue、exchange、routingkey)

            3.创建RabbitAdmin类用来管理

            4.  生产者:设置消息发送类 RabbitTemplate 重发类 RetryTemplate    

               消费者:设置消息接收监听类,方法

            第一步:创建连接

            

    		String host = StringUtils.hasText(env().getProperty(ENV_CONNECTION_HOST)) ? 
    				env().getProperty(ENV_CONNECTION_HOST) : DEFAULT_CONNECTION_HOST;
    		int port = NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) >= 1024 &&
    				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) < 65535 ?
    				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) : DEFAULT_CONNECTION_PORT;
    	    String username = StringUtils.hasText(env().getProperty(ENV_CONNECTION_NAME)) ? 
    				env().getProperty(ENV_CONNECTION_NAME) : DEFAULT_CONNECTION_NAME;
    		String password = StringUtils.hasText(env().getProperty(ENV_CONNECTION_PWD)) ? 
    				env().getProperty(ENV_CONNECTION_PWD) : DEFAULT_CONNECTION_PWD;
    	    int cachesize = NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) > 5 &&
    				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) < 25 ?
    				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) : DEFAULT_CONNECTION_CACHESIZE;
    		
    		CachingConnectionFactory factory = new CachingConnectionFactory(host,port);
    		factory.setUsername(username);
    		factory.setPassword(password);
    		
    		//打印连接消息
    		logger.info("******rabbit connection*******");
    		logger.info("[主机地址:]"+host);
    		logger.info("[端口号:]"+port);
    		logger.info("[登录用户:]"+username);
    		logger.info("[系统默认连接数:]"+cachesize);
    		
    		return factory;
    	}
    

            

            第二步:创建队列并绑定

    //创建队列,并将队列与交换机和路由键绑定
    	@Bean
    	public Queue queue(){
    		return new Queue(StringUtils.hasText(env().getProperty(ENV_QUEUE_NAME)) ? 
    				env().getProperty(ENV_QUEUE_NAME) : DEFAULT_QUEUE_NAME); 
    	}
    	//创建交换机,根据创建的类来指定交换机类型:TopicExchange、D
    	@Bean
    	public TopicExchange topicExchange(){
    		return new TopicExchange(StringUtils.hasText(env().getProperty(ENV_EXCHANGE_NAME)) ? 
    				env().getProperty(ENV_EXCHANGE_NAME) : DEFAULT_EXCHANGE_NAME);
    	}
    	//将创建好的队列和交换机通过指定routingkey绑定
    	@Bean
    	public Binding queueBinding(){
    		return BindingBuilder.bind(queue()).
    								to(topicExchange()).
    								with(StringUtils.hasText(env().getProperty(ENV_ROUTINGKEY))?
    										env().getProperty(ENV_ROUTINGKEY) : DEFAULT_ROUTINGKEY);
    	}
    

          第三步:创建RabbitAdmin类用来管理

     

    //管理类 
    	@Bean
    	public AmqpAdmin rabbitAdmin() {
    		return new RabbitAdmin(connectionFactory());
    	}
    

          第四步:1.设置生产者消息发送类和断连重连类

    //设置断连重发类
    	@Bean
    	public RetryTemplate retryTemplate(){
    		long interval = NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_INTERVAL)) >=100?
    					NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_INTERVAL)) :
    					DEFAULT_BACKOFF_INTERVAL;
    		double multiplier =  NumberUtils.toDouble(env().getProperty(ENV_BACKOFF4ASYN_MULTIPLIER),0) >0?
    			NumberUtils.toDouble(env().getProperty(ENV_BACKOFF4ASYN_MULTIPLIER)) :
    			DEFAULT_BACKOFF_MULTIPLIER;
    		long max_interval = NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_MAXINTERVAL),0) >1000?
    				NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_MAXINTERVAL)) :
    		    DEFAULT_BACKOFF_MAXINTERVAL;
    		
    		ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    		
    		//initial-interval(>100)
    		backOffPolicy.setInitialInterval(interval);
    		
    		backOffPolicy.setMultiplier(multiplier);
    		//max-interval(>1000)
    		backOffPolicy.setMaxInterval(max_interval);
    		RetryTemplate retryTemplate = new RetryTemplate();
    		retryTemplate.setBackOffPolicy(backOffPolicy);
    		
    		logger.info("*******rabbit retry-template******");		
    		logger.info("[interval]{}",interval);
    		logger.info("[multiplier]{}",multiplier);
    		logger.info("[max_interval]{}",max_interval);
    		return retryTemplate;
    	}
    	//将生产者发送类和断连重发类绑定在一起的方法
    	private void setRetryTemplate(RabbitTemplate _template){
    		if(StringUtils.hasText(env().getProperty(ENV_BACKOFF4ASYN_OPEN)) && 
    		   0 == env().getProperty(ENV_BACKOFF4ASYN_OPEN).compareToIgnoreCase("true")){
    		   _template.setRetryTemplate(retryTemplate());		
    		   logger.info("[retry]{}",true);
    		}else{
    			logger.info("[retry]{}",false);
    		}
    	}
    	//生产者操作类
    	@Bean
    	public RabbitTemplate rabbitTemplate(){
    		RabbitTemplate template = new RabbitTemplate(connectionFactory());
    		//向操作类中设置已经和队列绑定好了的交换机
    		template.setExchange(topicExchange().getName());
    		//设置发出去的信息转化的方式,特别注意,发送者的消息处理方式必须和接收方一致
    		template.setMessageConverter(new FastJsonMessageConvert());
    		//向template中设置如果断连的重发类
    		setRetryTemplate(template);
    		
    		logger.info("*******rabbit template*****");
    		logger.info("[exchange]:",topicExchange().getName());
    		logger.info("[message.convert]",FastJsonMessageConvert.class.getSimpleName());
    		return template;
    	}
    	
    

            2.设置消费者接收消息处理类

    //设置消费者[SimpleMessageListenerContainer<--MessageListenerAdapter<--FastJsonMessageConvert]
    	@Bean
    	public SimpleMessageListenerContainer messageListenerContainer(){
    		//设置消费者连接信息
    		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    		container.setQueueNames(queue().getName());
    		//设置监听模板
    		container.setMessageListener(messageListenerAdapter());
    		//设置应答方式
    		container.setAcknowledgeMode(AcknowledgeMode.NONE);
    		return container;
    	}
    	@Bean
    	 public MessageListenerAdapter messageListenerAdapter() {
    		DataPackageHandler handler = new DataPackageHandler();
    		//设置消费者处理信息的方式和转化信息的方式
    		MessageListenerAdapter adapter = new MessageListenerAdapter(handler,new FastJsonMessageConvert());
    		//指定是消息处理类中的哪一个方法
    		adapter.setDefaultListenerMethod("messageHandler");
    		return adapter;
    	}
    

      

    以下是完整代码:MyRabbitConfiguration类

    package com.sunland.demo.configuration;
    
    import java.util.Properties;
    
    import org.apache.commons.lang.math.NumberUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.retry.backoff.ExponentialBackOffPolicy;
    import org.springframework.retry.support.RetryTemplate;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.util.StringUtils;
    
    import com.sunland.common.cfg.AtomRoot;
    import com.sunland.common.cfg.XmlConfiguration;
    /**
     * 三个工具类:
     * BindingBuilder:用来通过绑定指定的队列和交换机
     * StringUtils:用来操作字符串
     * NumberUtils:用来操作int
     * @author qiu
     *
     */
    @Configuration
    @EnableScheduling
    public class MyRabbitConfiguration {
    	private static final Logger logger = LoggerFactory.getLogger(MyRabbitConfiguration.class.getName());
    	private Properties prop = new Properties();
    	private String xmlConfigure = "conf/myRabbit.xml";
    	private Properties env(){
    		if(prop.size() <= 0){
    			AtomRoot root = XmlConfiguration.getConfiguration(xmlConfigure);
    			if(root != null){
    				prop.putAll(XmlConfiguration.getAllProperties(root));
    			}
    		}
    		return prop;
    	}
    	
    	//[rabbit设置]
    	//[connection设置]
    	private static String ENV_CONNECTION_HOST = "mq.host";
    	private static String ENV_CONNECTION_PORT = "mq.port";
    	private static String ENV_CONNECTION_NAME = "mq.name";
    	private static String ENV_CONNECTION_PWD = "mq.password";
    	private static String ENV_CONNECTION_CACHESIZE = "mq.cachesize";
    	//[默认连接设置]
    	private static String DEFAULT_CONNECTION_HOST = "localhost";
    	private static Integer DEFAULT_CONNECTION_PORT = 5672;
    	private static String DEFAULT_CONNECTION_NAME = "qiuhangxiang";
    	private static String DEFAULT_CONNECTION_PWD = "qiu1994825";
    	private static Integer DEFAULT_CONNECTION_CACHESIZE = 5;
    	//	
    	private Integer MAX_CACHESIZE = 25;
    	
    	//[生产者发送消息交换机、队列和路由键设置]
    	private static String ENV_EXCHANGE_NAME = "mq.exchange";
    	private static String ENV_QUEUE_NAME = "mq.queue";
    	private static String ENV_ROUTINGKEY = "mq.routingkey";
    	//[生产者发送消息默认交换机、队列和路由键设置]
    	private static String DEFAULT_EXCHANGE_NAME = "i.am.default.exchange";
    	private static String DEFAULT_QUEUE_NAME = "i.am.default.queue";
    	private static String DEFAULT_ROUTINGKEY = "i.am.default.routingkey";
    	//[消费者默认交换机和队列]
    	private static String DEFAULT_CUSTOMER_QUEUE_NAME = "i.am.default.customer.queue";
    	private static String DEFAULT_CUSTOMER_ROUTINGKEY = "i.am.default.customer.routingkey";
    	
    	//异步报文发送重试模板
    	public  static String ENV_BACKOFF4ASYN_OPEN = "mq.retry";
    	public static String ENV_BACKOFF4ASYN_INTERVAL = "mq.retry.interval";
    	public static String ENV_BACKOFF4ASYN_MULTIPLIER = "mq.retry.multiplier";
    	public static String ENV_BACKOFF4ASYN_MAXINTERVAL = "mq.retry.maxinterval";
    	public static long DEFAULT_BACKOFF_INTERVAL = 500;
    	public static double DEFAULT_BACKOFF_MULTIPLIER = 10.0;
    	public static long DEFAULT_BACKOFF_MAXINTERVAL = 10000;
    	
    	@Bean
    	public ConnectionFactory connectionFactory(){
    		
    		String host = StringUtils.hasText(env().getProperty(ENV_CONNECTION_HOST)) ? 
    				env().getProperty(ENV_CONNECTION_HOST) : DEFAULT_CONNECTION_HOST;
    		int port = NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) >= 1024 &&
    				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) < 65535 ?
    				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_PORT)) : DEFAULT_CONNECTION_PORT;
    	    String username = StringUtils.hasText(env().getProperty(ENV_CONNECTION_NAME)) ? 
    				env().getProperty(ENV_CONNECTION_NAME) : DEFAULT_CONNECTION_NAME;
    		String password = StringUtils.hasText(env().getProperty(ENV_CONNECTION_PWD)) ? 
    				env().getProperty(ENV_CONNECTION_PWD) : DEFAULT_CONNECTION_PWD;
    	    int cachesize = NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) > 5 &&
    				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) < 25 ?
    				   NumberUtils.toInt(env().getProperty(ENV_CONNECTION_CACHESIZE)) : DEFAULT_CONNECTION_CACHESIZE;
    		
    		CachingConnectionFactory factory = new CachingConnectionFactory(host,port);
    		factory.setUsername(username);
    		factory.setPassword(password);
    		
    		//打印连接消息
    		logger.info("******rabbit connection*******");
    		logger.info("[主机地址:]"+host);
    		logger.info("[端口号:]"+port);
    		logger.info("[登录用户:]"+username);
    		logger.info("[系统默认连接数:]"+cachesize);
    		
    		return factory;
    	}
    	//创建队列,并将队列与交换机和路由键绑定
    	@Bean
    	public Queue queue(){
    		return new Queue(StringUtils.hasText(env().getProperty(ENV_QUEUE_NAME)) ? 
    				env().getProperty(ENV_QUEUE_NAME) : DEFAULT_QUEUE_NAME); 
    	}
    	//创建交换机,根据创建的类来指定交换机类型:TopicExchange、D
    	@Bean
    	public TopicExchange topicExchange(){
    		return new TopicExchange(StringUtils.hasText(env().getProperty(ENV_EXCHANGE_NAME)) ? 
    				env().getProperty(ENV_EXCHANGE_NAME) : DEFAULT_EXCHANGE_NAME);
    	}
    	//将创建好的队列和交换机通过指定routingkey绑定
    	@Bean
    	public Binding queueBinding(){
    		return BindingBuilder.bind(queue()).
    								to(topicExchange()).
    								with(StringUtils.hasText(env().getProperty(ENV_ROUTINGKEY))?
    										env().getProperty(ENV_ROUTINGKEY) : DEFAULT_ROUTINGKEY);
    	}
    	//设置断连重发类
    	@Bean
    	public RetryTemplate retryTemplate(){
    		long interval = NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_INTERVAL)) >=100?
    					NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_INTERVAL)) :
    					DEFAULT_BACKOFF_INTERVAL;
    		double multiplier =  NumberUtils.toDouble(env().getProperty(ENV_BACKOFF4ASYN_MULTIPLIER),0) >0?
    			NumberUtils.toDouble(env().getProperty(ENV_BACKOFF4ASYN_MULTIPLIER)) :
    			DEFAULT_BACKOFF_MULTIPLIER;
    		long max_interval = NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_MAXINTERVAL),0) >1000?
    				NumberUtils.toLong(env().getProperty(ENV_BACKOFF4ASYN_MAXINTERVAL)) :
    		    DEFAULT_BACKOFF_MAXINTERVAL;
    		
    		ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    		
    		//initial-interval(>100)
    		backOffPolicy.setInitialInterval(interval);
    		
    		backOffPolicy.setMultiplier(multiplier);
    		//max-interval(>1000)
    		backOffPolicy.setMaxInterval(max_interval);
    		RetryTemplate retryTemplate = new RetryTemplate();
    		retryTemplate.setBackOffPolicy(backOffPolicy);
    		
    		logger.info("*******rabbit retry-template******");		
    		logger.info("[interval]{}",interval);
    		logger.info("[multiplier]{}",multiplier);
    		logger.info("[max_interval]{}",max_interval);
    		return retryTemplate;
    	}
    	//将生产者发送类和断连重发类绑定在一起的方法
    	private void setRetryTemplate(RabbitTemplate _template){
    		if(StringUtils.hasText(env().getProperty(ENV_BACKOFF4ASYN_OPEN)) && 
    		   0 == env().getProperty(ENV_BACKOFF4ASYN_OPEN).compareToIgnoreCase("true")){
    		   _template.setRetryTemplate(retryTemplate());		
    		   logger.info("[retry]{}",true);
    		}else{
    			logger.info("[retry]{}",false);
    		}
    	}
    	//生产者操作类
    	@Bean
    	public RabbitTemplate rabbitTemplate(){
    		RabbitTemplate template = new RabbitTemplate(connectionFactory());
    		//向操作类中设置已经和队列绑定好了的交换机
    		template.setExchange(topicExchange().getName());
    		//设置发出去的信息转化的方式,特别注意,发送者的消息处理方式必须和接收方一致
    		template.setMessageConverter(new FastJsonMessageConvert());
    		//向template中设置如果断连的重发类
    		setRetryTemplate(template);
    		
    		logger.info("*******rabbit template*****");
    		logger.info("[exchange]:",topicExchange().getName());
    		logger.info("[message.convert]",FastJsonMessageConvert.class.getSimpleName());
    		return template;
    	}
    	
    	//管理类 
    	@Bean
    	public AmqpAdmin rabbitAdmin() {
    		return new RabbitAdmin(connectionFactory());
    	}
    	
    	//设置消费者[SimpleMessageListenerContainer<--MessageListenerAdapter<--FastJsonMessageConvert]
    	@Bean
    	public SimpleMessageListenerContainer messageListenerContainer(){
    		//设置消费者连接信息
    		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    		container.setQueueNames(queue().getName());
    		//设置监听模板
    		container.setMessageListener(messageListenerAdapter());
    		//设置应答方式
    		container.setAcknowledgeMode(AcknowledgeMode.NONE);
    		return container;
    	}
    	@Bean
    	 public MessageListenerAdapter messageListenerAdapter() {
    		DataPackageHandler handler = new DataPackageHandler();
    		//设置消费者处理信息的方式和转化信息的方式
    		MessageListenerAdapter adapter = new MessageListenerAdapter(handler,new FastJsonMessageConvert());
    		//指定是消息处理类中的哪一个方法
    		adapter.setDefaultListenerMethod("messageHandler");
    		return adapter;
    	}
    }
    

      DataPackageHandler处理消息类

    package com.sunland.demo.configuration;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.localhost.demo.PojoTest;
    
    public class DataPackageHandler {
    	private static Logger logger = LoggerFactory.getLogger(DataPackageHandler.class.getName());
    	
    	public void messageHandler(PojoTest obj){
    		try{
    			System.out.println("DataPackageHandler"+obj.getAge()+":"+obj.getName());
    		}catch(Exception e){
    			e.printStackTrace();
    		}
    	}
    	
    }
    

      

    生产者发送消息类

    package com.sunland.demo.configuration;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitGatewaySupport;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    
    
    public class AsyncGateway extends RabbitGatewaySupport {
    	private Logger log = LoggerFactory.getLogger(AsyncGateway.class);
    	
    
    	public void send(final Object obj){
    		try {
    			String routingKey;
    //			if(obj instanceof IllegalVehicle){
    //				routingKey = "queue.itaxi.illegal";
    //			}
    //			//[dispatch]
    //			else 
    				routingKey = "mq_routingkey";
    //				rabbitTemplate.convertAndSend(routingKey, obj);
    				this.getRabbitTemplate().convertAndSend(routingKey, obj,
    				new MessagePostProcessor() {
    					public Message postProcessMessage(Message message)
    							throws AmqpException {
    						//
    						// ---set correlation data so that the replies can
    						// be correlated to the request
    						message.getMessageProperties().setType(obj.getClass().getName());
    						return message;
    					}
    				});
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    

      一个基本的spring整合rabbitmq的例子完成

           

          

          

  • 相关阅读:
    php程序员面试经验
    模板引擎(smarty)知识点总结五
    模板引擎(smarty)知识点总结三
    js计算字数
    phpexcel导出成绩表
    phpexcel用法(转)
    模板引擎(smarty)知识点总结
    apache泛域名解析
    对象的解构赋值
    解构赋值
  • 原文地址:https://www.cnblogs.com/qiuhx/p/6425066.html
Copyright © 2011-2022 走看看