zoukankan      html  css  js  c++  java
  • rabbitMQ实现推迟队列

    一. 使用原生Api

    1.RabbitMQ 相关

          <dependency>
              <groupId>com.rabbitmq</groupId>
              <artifactId>amqp-client</artifactId>
              <version>5.7.0</version>
          </dependency>
    class RabbitMqContex {
        private static final String host = "127.0.0.1";
        private static final int port = 5672;
    
        public static ConnectionFactory getFactory() {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(host);
            factory.setPort(port);
            factory.setUsername("guest");
            factory.setPassword("guest");
    
            return factory;
        }
    
        public static Connection getFactoryConnection() throws IOException, TimeoutException {
            return getFactory().newConnection();
        }
    
        public static Channel getChannel() throws IOException, TimeoutException {
            return getFactoryConnection().createChannel();
        }
    }
    class DelayProducer {
        public static void publish(String exchange, String content, int delayMillseconds) throws Exception {
            Channel channel = null;
            String delayExchangeName = exchange + "_delay";
            String delayQueueName = delayExchangeName + "->queue_delay";
            String delayRouteKey = "dead";
            //设置死信队列参数
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", exchange);
            try {
                channel = RabbitMqContex.getChannel();
                channel.exchangeDeclare(exchange, "fanout", true, false, null);
                channel.exchangeDeclare(delayExchangeName, "direct", true, false, null);
                channel.queueDeclare(delayQueueName, true, false, false, arguments);
                channel.queueBind(delayQueueName, delayExchangeName, delayRouteKey);
                //设计消息超时参数
                AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
                AMQP.BasicProperties properties = builder.expiration(Integer.toString(delayMillseconds)).build();
                channel.basicPublish(delayExchangeName, delayRouteKey, properties, content.getBytes());
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                if (null != channel) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
    
                    }
                }
            }
        }
    }

    2.调用如下

      public static void main(String[] args) throws Exception {
        
            DelayProducer.publish("rewardSuccess", "{"customerId":123032}", 60 * 1000);
        }

    二使用spring的 RabbitTemplate 和 RabbitAdmin

    1.注入bean

    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 系统配置
     *
     * @author 
     * @date 2017/10/19
     */
    @Configuration
    public class RabbitMqConfig {
    
        @Value("${spring.rabbitmq.host}")
        private String host;
        @Value("${spring.rabbitmq.port}")
        private int port;
        @Value("${spring.rabbitmq.username:guest}")
        private String username;
        @Value("${spring.rabbitmq.password:guest}")
        private String password;
    
      
    
        @Bean(name = "rabbitMq.connectionFactory")
        public ConnectionFactory getConnectionFactory() {
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setHost(host);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);
            return factory;
        }
    
        @Bean
        public RabbitTemplate getRabbitTemplate(
                @Qualifier("rabbitMq.connectionFactory") ConnectionFactory connectionFactory) {
            return new RabbitTemplate(connectionFactory);
        }
    
        @Bean
        public RabbitAdmin getRabbitAdmin(
                @Qualifier("rabbitMq.connectionFactory") ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public MessageProperties getRabbitTemplateMessageProperties() {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            messageProperties.setHeader("content_encoding", "JSON");
            return messageProperties;
        }
    }

    2.发送工具类

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import javax.annotation.PostConstruct;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 功能描述:
     *
     * @version 1.0
     * @author: 
     * @createDate: 2017/10/19
     */
    @Configuration
    @Slf4j
    public class RabbitMqSender {
        /**
         * rabbitTemplate
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * rabbitAdmin
         */
        @Autowired
        private RabbitAdmin rabbitAdmin;
    
        /**
         * messageProperties
         */
        @Autowired
        private MessageProperties messageProperties;
    
        /**
         * init
         */
        @PostConstruct
        public void init() {
        }
    
        private void declareExchange(String exchange) {
            rabbitAdmin.declareExchange(new FanoutExchange(exchange, true, false, null));
        }
    
        private MessageProperties getMessageProperties(Map<String, String> header) {
            if (header == null) {
                return this.messageProperties;
            }
    
            MessageProperties customMessageProperties = new MessageProperties();
            customMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            customMessageProperties.setHeader("content_encoding", "JSON");
            for (Map.Entry<String, String> item : header.entrySet()) {
                customMessageProperties.setHeader(item.getKey(), item.getValue());
            }
            return customMessageProperties;
        }
    
        /**
         * sendMessage
         */
        public void publish(String exchange, String content) {
            publish(exchange, content, (Map) null);
        }
    
        /**
         * sendMessage
         */
        public void publish(String exchange, String content, Map<String, String> properties) {
            publish(exchange, content, getMessageProperties(properties));
        }
    
        /**
         * sendMessage
         */
        private void publish(String exchange, String content, MessageProperties messageProperties) {
            declareExchange(exchange);
            Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
            try {
                rabbitTemplate.send(exchange, "", message);
                log.debug("推送给exchange:{},消息体:{} 成功", exchange, content);
            } catch (Exception e) {
                log.error("推送给exchange:{},消息体:{} 失败!!", exchange, content, e);
                throw e;
            }
        }
    
        public void delayPublish(String exchange, String content, int millseconds) {
            String delayExchangeName = exchange + "_delay";
            String delayQueueName = delayExchangeName + "->queue_delay";
            String delayRouteKey = "dead";
            Map<String, Object> arguments = new HashMap<>();
            arguments.putIfAbsent("x-dead-letter-exchange", exchange);
            declareExchange(exchange);
            declareExchange(delayExchangeName);
            rabbitAdmin.declareQueue(new Queue(delayQueueName, true, false, false, arguments));
            rabbitAdmin.declareBinding(new Binding(delayQueueName, Binding.DestinationType.QUEUE, delayExchangeName,
                    delayRouteKey, Collections.emptyMap()));
            MessageProperties messageProperties = getMessageProperties(Collections.emptyMap());
            messageProperties.setExpiration(Integer.toString(millseconds));
            publish(delayExchangeName, content, messageProperties);
        }
    }
  • 相关阅读:
    人工智能技术导论——博弈树搜索
    人工智能技术导论——基于产生式规则的机器推理
    人工智能技术导论——基于遗传算法的随机优化搜索
    人工智能技术导论——使用PROLOG逻辑语言解决爱因斯坦斑马问题
    人工智能技术导论——逻辑程序设计语言PROLOG
    操作系统——页面调度算法
    操作系统——页式存储管理
    人工智能技术导论复习大纲
    计算机图形学复习大纲
    算法设计与分析——算法复杂性分析
  • 原文地址:https://www.cnblogs.com/zhshlimi/p/10913586.html
Copyright © 2011-2022 走看看