zoukankan      html  css  js  c++  java
  • Rocketmq使用 生产者、push消费者和pull消费者

    maven引用

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.7.0</version>
            </dependency>
            <!--一个好用的工具包,可以不引入-->
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>5.3.0</version>
            </dependency>

    说明

      不管是生产者还是消费者,都有很多参数可以配置,rocketmq命名比较好,基本可以从参数名上判断具体作用,还有注释可以看。

    下面例子中只给出了常用的一些参数设置。更多参数可自行探索。

     简单生产者实现

    注意:

    1、NamesrvAddr参数在多个节点时,用英文分号分隔,例: 192.168.9.58:9876;192.168.9.59:9876

    import cn.hutool.core.util.RandomUtil;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.common.message.Message;
    
    public class Producer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            producer.setNamesrvAddr("127.0.0.1:9876");
            //发送超时时间,默认3000 单位ms
            producer.setSendMsgTimeout(5000);
            producer.start();
    
            try {
                Message msg = new Message("TestTopic",// topic
                        "177",                       // tag 可以为空,用以简单的筛选。
                        RandomUtil.randomString(8),  // key 可以为空,可用以查询。
                        ("test" + RandomUtil.randomString(8)).getBytes());    // body ,我常将对象转json再获取byte[] 进行传输。
                SendResult send = producer.send(msg);
                if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
                    //发送成功处理
                }else {
                    //发送失败处理
                }
            } catch (Exception e) {
                //发送失败处理
                e.printStackTrace();
            }
            //正式环境不要发完就就shutdown,要在应用退出时再shutdown。
            producer.shutdown();
        }
    }

    多线程加批量生产者模拟实现

    注意:

    1、批量发送时,topic必须为同一个,否则发送会报异常。

    2、批量发送相较于单条发送速度提升很大。

    import cn.hutool.core.util.RandomUtil;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class Producer {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            producer.setNamesrvAddr("127.0.0.1:9876");
            //发送超时时间,默认3000 单位ms
            producer.setSendMsgTimeout(5000);
            producer.start();
    
            int threadCount = 20;
            int forCount = 100000;
            CountDownLatch latch = new CountDownLatch(threadCount);
            long start = System.currentTimeMillis();
            for (int i = 0; i < threadCount; i++) {
                new Thread(() -> {
                    try {
                        List<Message> list = new ArrayList<>();
                        for (int j = 0; j < forCount; j++) {
                            try {
                                Message msg = new Message("TestTopic",// topic
                                        "177",                       // tag
                                        RandomUtil.randomString(8),                       // key
                                        ("test" + RandomUtil.randomString(8)).getBytes());    // body
                                list.add(msg);
                                if (list.size() >= 100) {
                                    SendResult send = producer.send(list);
                                    if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
                                        //发送成功处理
                                        list.clear();
                                    }else {
                                        //发送失败处理
                                    }
                                }
                            } catch (Exception e) {
                                //发送失败处理
                                e.printStackTrace();
                            }
                        }
                        if (list.size() > 0) {
                            SendResult send = producer.send(list);
                            if (!send.getSendStatus().equals(SendStatus.SEND_OK)) {
                                System.out.println(send);
                            }
                            list.clear();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }).start();
            }
            latch.await();
            long hs = System.currentTimeMillis() - start;
            System.out.println(hs);
    
            long speed = (threadCount * forCount) / (hs >= 0 ? 1 : hs / 1000);
            System.out.println("速度" + speed);
            //正式环境不要发完就就shutdown,要在应用退出时再shutdown。
            producer.shutdown();
        }
    }

    push消费者

    import cn.hutool.core.lang.Console;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class PushConsumer {
        public static void main(String[] args) throws Exception {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumerGroupName");
            consumer.setNamesrvAddr("127.0.0.1:9876");
            //一个GroupName第一次消费时的位置
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            consumer.setConsumeThreadMin(20);
            consumer.setConsumeThreadMax(20);
            //要消费的topic,可使用tag进行简单过滤
            consumer.subscribe("TestTopic", "*");
            //一次最大消费的条数
            consumer.setConsumeMessageBatchMaxSize(100);
            //消费模式,广播或者集群,默认集群。
            consumer.setMessageModel(MessageModel.CLUSTERING);
            //在同一jvm中 需要启动两个同一GroupName的情况需要这个参数不一样。
            consumer.setInstanceName("InstanceName");
            //配置消息监听
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                try {
                    //业务处理
                    msgs.forEach(msg -> {
                        Console.log(msg);
                    });
                } catch (Exception e) {
                    System.err.println("接收异常" + e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }

    pull消费者

    从4.6之后,提供了DefaultLitePullConsumer  大大简化了pull的操作。以下为新实现,4.6之前的版本不支持。

    import cn.hutool.core.collection.CollUtil;
    import cn.hutool.core.lang.Console;
    import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class PullConsumer {
        private static boolean runFlag = true;
        public static void main(String[] args) throws Exception {
            DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("PullConsumerGroupName");
            consumer.setNamesrvAddr("127.0.0.1:9876");
            //要消费的topic,可使用tag进行简单过滤
            consumer.subscribe("TestTopic", "*");
            //一次最大消费的条数
            consumer.setPullBatchSize(100);
            //无消息时,最大阻塞时间。默认5000 单位ms
            consumer.setPollTimeoutMillis(5000);
            consumer.start();
            while (runFlag){
                try {
                    //拉取消息,无消息时会阻塞 
                    List<MessageExt> msgs = consumer.poll();
                    if (CollUtil.isEmpty(msgs)){
                        continue;
                    }
                    //业务处理
                    msgs.forEach(msg-> Console.log(new String(msg.getBody())));
                    //同步消费位置。不执行该方法,应用重启会存在重复消费。
                    consumer.commitSync();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
            consumer.shutdown();
        }
    }
  • 相关阅读:
    一句话开启XP_CMDSHELL
    CF14B Young Photographer 题解
    sql 存储过程与函数区别
    sql索引
    分区表中毒,重装系统
    面试
    XML范例的应用(转载)
    数据结构题目
    网页加载速度的方法和技巧
    设计模式分类
  • 原文地址:https://www.cnblogs.com/enenen/p/12773099.html
Copyright © 2011-2022 走看看