zoukankan      html  css  js  c++  java
  • MessageConsumer

    @Slf4j
    @Component
    public class MessageConsumer {
        @Autowired
        private PpcRequestMessageListener ppcRequestMessageListener;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private MessageConverter messageConverter;
    
        @Value("${app.rabbitmq.schedule.queue.ppc-request}")
        private String ppcRequestQueue;
    
        private ExecutorService executor = new ThreadPoolExecutor(1, 4, 60, TimeUnit.SECONDS,
                new SynchronousQueue<>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    
        public void start() {
            Executors.newSingleThreadExecutor().execute(() -> {
                try {
                    while (true) {
                        ChannelCallbackImpl<PpcRequestMessage> callback = new ChannelCallbackImpl<>(ppcRequestQueue, 4);
                        List<PpcRequestMessage> list = rabbitTemplate.execute(callback);
                        if (list == null || list.isEmpty()) {
                            TimeUnit.MILLISECONDS.sleep(1000);
                        }
                    }
                } catch (Exception e) {
                    log.error("MessageConsumer handle error", e);
                }
            });
        }
    
        private class ChannelCallbackImpl<T> implements ChannelCallback<List<T>> {
            private final MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    
            private final String queueName;
    
            private final int maxCount;
    
            public ChannelCallbackImpl(String queueName, int maxCount) {
                this.queueName = queueName;
                this.maxCount = maxCount;
            }
    
            @Override
            public List<T> doInRabbit(Channel channel) throws Exception {
                GetResponse response = channel.basicGet(queueName, false);
                if(response == null){
                    return null;
                }
                //如果没有空闲进程,睡眠直至有可用线程
                Integer tid = getSetIdleTid();
    
                int total = response.getMessageCount() + 1;
                if(total > maxCount){
                    total = maxCount;
                }
    
                long[] tags = new long[total];
                List<T> messages = new ArrayList<>(total);
    
                for (int i = 0; i < total; i++) {
                    MessageProperties props = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
                    if (response.getMessageCount() >= 0) {
                        props.setMessageCount(response.getMessageCount());
                    }
                    Message message = new Message(response.getBody(), props);
                    T t = (T) messageConverter.fromMessage(message);
                    messages.add(t);
                    tags[i] = response.getEnvelope().getDeliveryTag();
                    response = channel.basicGet(queueName, false);
                }
    
                //accept
                for (int i = 0; i < messages.size(); i++) {
                    executor.execute(new ConsumerAcceptRunnable(tid, channel, tags[i], messages.get(i)));
                    tid = getSetIdleTid();
                }
    
                //ack
                for (long tag : tags){
                    channel.basicAck(tag, false);
                }
                return messages;
            }
    
            private Integer getSetIdleTid() throws Exception {
                Integer tid = PpcProcessStatus.getSetIdleTid();
                while (tid == null) {
                    log.info("ppc process has no idle thread, sleep");
                    TimeUnit.MILLISECONDS.sleep(500);
                    continue;
                }
                return tid;
            }
    
            public class ConsumerAcceptRunnable implements Runnable{
                private final Integer tid;
                private final Channel channel;
                private final long tag;
                private final T t;
    
                public ConsumerAcceptRunnable(Integer tid, Channel channel, long tag, T t) {
                    this.tid = tid;
                    this.channel = channel;
                    this.tag = tag;
                    this.t = t;
                }
    
                @Override
                public void run() {
                    try {
                        ppcRequestMessageListener.handleMessage(tid, t);
                    } catch (Exception e) {
                        log.error("accept message error {}", t, e);
                        try {
                            channel.basicNack(tag, false, true);
                        } catch (Exception ioe) {
                            log.error("basicNack error", ioe);
                        }
                    }
                }
            }
        }
    }
  • 相关阅读:
    一个Bean属性拷贝的工具类
    java Integer parseInt()
    sql 预编译 in
    显著性图谱的评价
    如何优雅的在MFC中使用cvSetMouseCallback?
    为MFC界面添加一个Log Window
    最大流算法统计
    2014年秋 求职总结
    图论的常用算法
    常用的排序算法
  • 原文地址:https://www.cnblogs.com/exmyth/p/13823763.html
Copyright © 2011-2022 走看看