zoukankan      html  css  js  c++  java
  • MessageConsumer

    @Slf4j
    @Component
    public class MessageConsumer {
        @Autowired
        private PpcRequestMessageListener ppcRequestMessageListener;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private MessageConverter messageConverter;
    
        @Value("${app.rabbitmq.schedule.queue.ppc-request}")
        private String ppcRequestQueue;
    
        private ExecutorService executor = new ThreadPoolExecutor(1, 4, 60, TimeUnit.SECONDS,
                new SynchronousQueue<>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
    
        public void start() {
            Executors.newSingleThreadExecutor().execute(() -> {
                try {
                    while (true) {
                        ChannelCallbackImpl<PpcRequestMessage> callback = new ChannelCallbackImpl<>(ppcRequestQueue, 4);
                        List<PpcRequestMessage> list = rabbitTemplate.execute(callback);
                        if (list == null || list.isEmpty()) {
                            TimeUnit.MILLISECONDS.sleep(1000);
                        }
                    }
                } catch (Exception e) {
                    log.error("MessageConsumer handle error", e);
                }
            });
        }
    
        private class ChannelCallbackImpl<T> implements ChannelCallback<List<T>> {
            private final MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    
            private final String queueName;
    
            private final int maxCount;
    
            public ChannelCallbackImpl(String queueName, int maxCount) {
                this.queueName = queueName;
                this.maxCount = maxCount;
            }
    
            @Override
            public List<T> doInRabbit(Channel channel) throws Exception {
                GetResponse response = channel.basicGet(queueName, false);
                if(response == null){
                    return null;
                }
                //如果没有空闲进程,睡眠直至有可用线程
                Integer tid = getSetIdleTid();
    
                int total = response.getMessageCount() + 1;
                if(total > maxCount){
                    total = maxCount;
                }
    
                long[] tags = new long[total];
                List<T> messages = new ArrayList<>(total);
    
                for (int i = 0; i < total; i++) {
                    MessageProperties props = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
                    if (response.getMessageCount() >= 0) {
                        props.setMessageCount(response.getMessageCount());
                    }
                    Message message = new Message(response.getBody(), props);
                    T t = (T) messageConverter.fromMessage(message);
                    messages.add(t);
                    tags[i] = response.getEnvelope().getDeliveryTag();
                    response = channel.basicGet(queueName, false);
                }
    
                //accept
                for (int i = 0; i < messages.size(); i++) {
                    executor.execute(new ConsumerAcceptRunnable(tid, channel, tags[i], messages.get(i)));
                    tid = getSetIdleTid();
                }
    
                //ack
                for (long tag : tags){
                    channel.basicAck(tag, false);
                }
                return messages;
            }
    
            private Integer getSetIdleTid() throws Exception {
                Integer tid = PpcProcessStatus.getSetIdleTid();
                while (tid == null) {
                    log.info("ppc process has no idle thread, sleep");
                    TimeUnit.MILLISECONDS.sleep(500);
                    continue;
                }
                return tid;
            }
    
            public class ConsumerAcceptRunnable implements Runnable{
                private final Integer tid;
                private final Channel channel;
                private final long tag;
                private final T t;
    
                public ConsumerAcceptRunnable(Integer tid, Channel channel, long tag, T t) {
                    this.tid = tid;
                    this.channel = channel;
                    this.tag = tag;
                    this.t = t;
                }
    
                @Override
                public void run() {
                    try {
                        ppcRequestMessageListener.handleMessage(tid, t);
                    } catch (Exception e) {
                        log.error("accept message error {}", t, e);
                        try {
                            channel.basicNack(tag, false, true);
                        } catch (Exception ioe) {
                            log.error("basicNack error", ioe);
                        }
                    }
                }
            }
        }
    }
  • 相关阅读:
    jMeter 里 CSV Data Set Config Sharing Mode 的含义详解
    如何使用 jMeter Parallel Controller
    使用 Chrome 开发者工具 coverage 功能分析 web 应用的渲染阻止资源的执行分布情况
    使用 Chrome 开发者工具的 lighthouse 功能分析 web 应用的性能问题
    关于 SAP 电商云首页加载时触发的 OCC API 请求
    SAP UI5 确保控件 id 全局唯一的实现方法
    SAP 电商云 Accelerator 和 Spartacus UI 的工作机制差异
    介绍一个好用的能让网页变成黑色背景的护眼 Chrome 扩展应用
    Chrome 开发者工具 performance 标签页的用法
    Client Side Cache 和 Server Side Cache 的区别
  • 原文地址:https://www.cnblogs.com/exmyth/p/13823763.html
Copyright © 2011-2022 走看看