zoukankan      html  css  js  c++  java
  • rocketmq的以集群模式MessageModel.CLUSTERING实现消费者集群消费消息,实现负载均衡

    package com.bfxy.rocketmq.model;

    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

    import com.bfxy.rocketmq.constants.Const;

    public class Consumer1 {

    public Consumer1() {
    try {
    String group_name = "test_model_consumer_name";
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
    consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
    consumer.subscribe("test_model_topic2", "TagA");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new Listener());
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    class Listener implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for(MessageExt msg : msgs){
    String topic = msg.getTopic();
    String msgBody = new String(msg.getBody(),"utf-8");
    String tags = msg.getTags();
    //if(tags.equals("TagA")) {
    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
    //}
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    }

    public static void main(String[] args) {
    Consumer1 c1 = new Consumer1();
    System.out.println("c1 start..");

    }
    }

    //==========================================================

    package com.bfxy.rocketmq.model;

    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

    import com.bfxy.rocketmq.constants.Const;

    public class Consumer2 {

    public Consumer2() {
    try {
    String group_name = "test_model_consumer_name";
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
    consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
    consumer.subscribe("test_model_topic2", "TagA");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new Listener());
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    class Listener implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for(MessageExt msg : msgs){
    String topic = msg.getTopic();
    String msgBody = new String(msg.getBody(),"utf-8");
    String tags = msg.getTags();
    //if(tags.equals("TagA")) {
    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
    //}
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    }

    public static void main(String[] args) {
    Consumer2 c2 = new Consumer2();
    System.out.println("c2 start..");

    }
    }

  • 相关阅读:
    面试官:反射都不会,还敢说自己会Java?
    nginx 开启x-forward
    不写代码,从0到1教你制作炫酷可视化大屏
    5G 专网部署方案
    Mac运行pygame一直显示空白屏幕
    数据库大咖解读“新基建”,墨天轮四重好礼相送!
    Oracle 20c 新特性:自动的区域图
    4000多人全靠报表自动化,效率提高60%,这套数据平台方法论真强
    EBS开发性能优化之查找需要优化的程序
    EBS开发性能优化之SQL语句优化
  • 原文地址:https://www.cnblogs.com/zhangzhiqin/p/10351557.html
Copyright © 2011-2022 走看看