zoukankan      html  css  js  c++  java
  • rocketmq消费队列代码

            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Constant.operationLogGroup);
            try {
                consumer.setNamesrvAddr(Constant.rocketQueneAddr);
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.setMessageModel(MessageModel.BROADCASTING);
                consumer.subscribe(Constant.operationLogTopic, Constant.operationLogTag);
            } catch (MQClientException e) {
                logger.error("consume operation log MQ error", e);
            }
    
            cometutil = Comet4jUtil.getInstance(CHANNEL);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    
                    byte[] bytes = msgs.get(0).getBody();
                    try {
                        cometutil.sendMesToAllConnsWithString(CHANNEL, new String(bytes, "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            try {
                consumer.start();
                logger.info("operationLogController's MQ consumer started.");
            } catch (MQClientException e) {
                logger.error("consume operation log MQ start error", e);
            }
  • 相关阅读:
    lintcode-60-搜索插入位置
    lintcode-57-三数之和
    lintcode-55-比较字符串
    lintcode-52-下一个排列
    lintcode-51-上一个排列
    lintcode-49-字符大小写排序
    lintcode-47-主元素 II
    lintcode-45-最大子数组差
    lintcode-44-最小子数组
    Charles抓包
  • 原文地址:https://www.cnblogs.com/llguanli/p/8520437.html
Copyright © 2011-2022 走看看