zoukankan      html  css  js  c++  java
  • rocketmq的以集群模式MessageModel.CLUSTERING实现消费者集群消费消息,实现负载均衡

    package com.bfxy.rocketmq.model;

    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

    import com.bfxy.rocketmq.constants.Const;

    public class Consumer1 {

    public Consumer1() {
    try {
    String group_name = "test_model_consumer_name";
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
    consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
    consumer.subscribe("test_model_topic2", "TagA");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new Listener());
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    class Listener implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for(MessageExt msg : msgs){
    String topic = msg.getTopic();
    String msgBody = new String(msg.getBody(),"utf-8");
    String tags = msg.getTags();
    //if(tags.equals("TagA")) {
    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
    //}
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    }

    public static void main(String[] args) {
    Consumer1 c1 = new Consumer1();
    System.out.println("c1 start..");

    }
    }

    //==========================================================

    package com.bfxy.rocketmq.model;

    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

    import com.bfxy.rocketmq.constants.Const;

    public class Consumer2 {

    public Consumer2() {
    try {
    String group_name = "test_model_consumer_name";
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
    consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
    consumer.subscribe("test_model_topic2", "TagA");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new Listener());
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    class Listener implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for(MessageExt msg : msgs){
    String topic = msg.getTopic();
    String msgBody = new String(msg.getBody(),"utf-8");
    String tags = msg.getTags();
    //if(tags.equals("TagA")) {
    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
    //}
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    }

    public static void main(String[] args) {
    Consumer2 c2 = new Consumer2();
    System.out.println("c2 start..");

    }
    }

  • 相关阅读:
    多个自定义覆盖物注册点击事件,点击某个覆盖物后获得它的坐标
    C# 文件操作(全部) 追加、拷贝、删除、移动文件、创建目录 修改文件名、文件夹名
    如何添加EXEStealth 2.5x 壳
    PACS系统简易
    好用的后端模版
    E信通项目总结[转]
    平台型产品的设计思路[转]
    baidu思维脑图在线编辑器
    Web 前端攻防(2014版)-baidu ux前端研发部
    访谈标叔:给新人设计师的建议【转】
  • 原文地址:https://www.cnblogs.com/zhangzhiqin/p/10351557.html
Copyright © 2011-2022 走看看