zoukankan      html  css  js  c++  java
  • rocketmq 以广播方式实现消费者消费消息

    package com.bfxy.rocketmq.model;

    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

    import com.bfxy.rocketmq.constants.Const;

    public class Consumer1 {

    public Consumer1() {
    try {
    String group_name = "test_model_consumer_name1";
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
    consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
    consumer.subscribe("test_model_topic2", "TagA");
    //consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new Listener());
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    class Listener implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for(MessageExt msg : msgs){
    String topic = msg.getTopic();
    String msgBody = new String(msg.getBody(),"utf-8");
    String tags = msg.getTags();
    //if(tags.equals("TagA")) {
    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
    //}
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    }

    public static void main(String[] args) {
    Consumer1 c1 = new Consumer1();
    System.out.println("c1 start..");

    }
    }

    //=========================

    package com.bfxy.rocketmq.model;

    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

    import com.bfxy.rocketmq.constants.Const;

    public class Consumer2 {

    public Consumer2() {
    try {
    String group_name = "test_model_consumer_name2";
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
    consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
    consumer.subscribe("test_model_topic2", "TagB");
    //consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new Listener());
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    class Listener implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for(MessageExt msg : msgs){
    String topic = msg.getTopic();
    String msgBody = new String(msg.getBody(),"utf-8");
    String tags = msg.getTags();
    //if(tags.equals("TagB")) {
    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
    //}
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    }

    public static void main(String[] args) {
    Consumer2 c2 = new Consumer2();
    System.out.println("c2 start..");

    }
    }

  • 相关阅读:
    通过user-agent判断h5页面是在哪个手机App(QQ、微信、支付宝)下打开的
    vscode格式化插件
    简单直接,“NODE_ENV”总结
    NativeScript又一个Hybrid技术(附与Weex,ReactNative比较)
    ReactNative开发中遇到的问题记录
    两个在线编辑网站runjs和jsbin
    go 如何单测
    go语法-type等
    go语法-结构体和接口-细节
    解决 Webstorm 每次更新 Git 代码都要输入密码的问题
  • 原文地址:https://www.cnblogs.com/zhangzhiqin/p/10351591.html
Copyright © 2011-2022 走看看