zoukankan      html  css  js  c++  java
  • rocketmq消费队列代码

            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Constant.operationLogGroup);
            try {
                consumer.setNamesrvAddr(Constant.rocketQueneAddr);
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.setMessageModel(MessageModel.BROADCASTING);
                consumer.subscribe(Constant.operationLogTopic, Constant.operationLogTag);
            } catch (MQClientException e) {
                logger.error("consume operation log MQ error", e);
            }
    
            cometutil = Comet4jUtil.getInstance(CHANNEL);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    
                    byte[] bytes = msgs.get(0).getBody();
                    try {
                        cometutil.sendMesToAllConnsWithString(CHANNEL, new String(bytes, "UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            try {
                consumer.start();
                logger.info("operationLogController's MQ consumer started.");
            } catch (MQClientException e) {
                logger.error("consume operation log MQ start error", e);
            }
  • 相关阅读:
    信息探测
    Hdu 1262 寻找素数对
    Hdu 1263 水果
    Hdu 1261字串数
    Hdu 1253 胜利大逃亡
    Hdu 1237简单计算器
    Hdu 1235 统计同成绩学生人数
    Hdu 1236 排名
    Hdu 1233 还是畅通工程
    Hdu 1234 开门人和关门人
  • 原文地址:https://www.cnblogs.com/llguanli/p/8520437.html
Copyright © 2011-2022 走看看