zoukankan      html  css  js  c++  java
  • rocketmq的以集群模式MessageModel.CLUSTERING实现消费者集群消费消息,实现负载均衡

    package com.bfxy.rocketmq.model;

    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

    import com.bfxy.rocketmq.constants.Const;

    public class Consumer1 {

    public Consumer1() {
    try {
    String group_name = "test_model_consumer_name";
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
    consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
    consumer.subscribe("test_model_topic2", "TagA");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new Listener());
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    class Listener implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for(MessageExt msg : msgs){
    String topic = msg.getTopic();
    String msgBody = new String(msg.getBody(),"utf-8");
    String tags = msg.getTags();
    //if(tags.equals("TagA")) {
    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
    //}
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    }

    public static void main(String[] args) {
    Consumer1 c1 = new Consumer1();
    System.out.println("c1 start..");

    }
    }

    //==========================================================

    package com.bfxy.rocketmq.model;

    import java.util.List;

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

    import com.bfxy.rocketmq.constants.Const;

    public class Consumer2 {

    public Consumer2() {
    try {
    String group_name = "test_model_consumer_name";
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
    consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
    consumer.subscribe("test_model_topic2", "TagA");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener(new Listener());
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }


    class Listener implements MessageListenerConcurrently {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
    for(MessageExt msg : msgs){
    String topic = msg.getTopic();
    String msgBody = new String(msg.getBody(),"utf-8");
    String tags = msg.getTags();
    //if(tags.equals("TagA")) {
    System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
    //}
    }
    } catch (Exception e) {
    e.printStackTrace();
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    }

    public static void main(String[] args) {
    Consumer2 c2 = new Consumer2();
    System.out.println("c2 start..");

    }
    }

  • 相关阅读:
    16061109-第0次个人作业
    面向对象第四次总结
    面向对象5-7次作业总结
    2018 OO第一次总结(作业1-3)
    (最终作业)面向对象先导课课程总结
    HTML学习笔记
    实验八 进程间通信
    信号
    进程基础
    shell脚本编程
  • 原文地址:https://www.cnblogs.com/zhangzhiqin/p/10351557.html
Copyright © 2011-2022 走看看