zoukankan      html  css  js  c++  java
  • 深入理解RocketMQ(九)---实战(代码)

    一、批量发送消息

      即多条消息放入List,一次发送,从而减少网络传输,提高效率

    DefaultMQProducer producer = new DefaultMQProducer("batch_send_producer_group");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
            String topic = "batchTopic";
            List<Message> messageList = new ArrayList<>();
    
            for (int i = 0; i < 10; i++) {
                Message msg = new Message(topic,"TAG1","ORDER" + i, "Hello world".getBytes());
                messageList.add(msg);
            }
            try{
                producer.send(messageList);
            }catch (Exception e){
                e.printStackTrace();
            }
    
            producer.shutdown();
    

    二、消息发送队列自主选择

     例:可以将同一订单(不同操作,例如下单、付款、出库、订单完成等操作)发送到同一个queue中,来保证一个订单不同操作的顺序性

    DefaultMQProducer producer = new DefaultMQProducer("select_queue_producer_group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();

    String topic = "selectQueueTopic";

    String[] tags = new String[]{"TAG","TAG2","TAG3","TAG4","TAG5"};

    int orderId = 41;
    int orderId1 = 42;
    try{
    for (int i = 0; i < 10; i++) {
    Message msg = new Message(topic,tags[i%tags.length],"KEY" + i, ("Hello world"+i).getBytes());
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    return mqs.get((Integer) arg % mqs.size());
    }
    },orderId1);
    System.out.println(orderId1 + "=======" + sendResult);

    sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    return mqs.get((Integer) arg % mqs.size());
    }
    },orderId);
    System.out.println(orderId + "=======" + sendResult);
    }

    }catch (Exception e){
    e.printStackTrace();
    }

    producer.shutdown();

    输入如下:

     可以看到订单ID为41的消息,全部发送到queueId为1的队列中,订单ID为42的消息,全部发送到QueueId为2的队列中

    三、订单过滤

    1、TAG模式过滤(多个tag使用 || 区分)

    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("127.0.0.1:9876");
    
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
            String topic = "TagFilterTopic1";
            String tagA = "TagA";
            String tagB = "TagB";
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
    
                    msg = new Message(topic,tagB,("tagA==========22222222222").getBytes(RemotingHelper.DEFAULT_CHARSET));
                    sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            producer.shutdown();

    消息消费

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFileterConcumer");
    
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.setConsumerGroup("tagFileterConcumerGroup");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer.subscribe("TagFilterTopic1", "TagA");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s TagA Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
    
            DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer("tagFileterConcumer1");
    
            consumer1.setNamesrvAddr("127.0.0.1:9876");
            consumer1.setConsumerGroup("tagFileterConcumerGroup1");
            consumer1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer1.subscribe("TagFilterTopic1", "TagA || TagB");
            consumer1.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("%s TagA&TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer1.start();
            System.out.printf("Consumer Started.%n");
    
            DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("tagFileterConcumer2");
    
            consumer2.setNamesrvAddr("127.0.0.1:9876");
            consumer2.setConsumerGroup("tagFileterConcumerGroup2");
            consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer2.subscribe("TagFilterTopic1", "TagB");
            consumer2.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("%s TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer2.start();
            System.out.printf("Consumer Started.%n");

    执行结果:可以看到只需要TagA的消费者,只输出了一条消息;只需要TagB的消费者,也只输出了一条消息

     2、SQL过滤

    需要开启支持sql92:在broker.conf文件中添加如下配置:enablePropertyFilter=true

    发送者:

    DefaultMQProducer producer = new DefaultMQProducer("sql_filter_group_name");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
            String topic = "SqlFilterTopic1";
            String tagA = "TagA";
            for (int i = 0; i < 1; i++) {
                try {
                    Message msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
                    msg.putUserProperty("orderStatus","1");
                    msg.putUserProperty("userName","lcl");
                    msg.putUserProperty("orderId","654646");
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
    
                    msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
                    msg.putUserProperty("orderStatus","2");
                    msg.putUserProperty("userName","lcl");
                    msg.putUserProperty("orderId","654646");
                    sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
    
                    msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
                    msg.putUserProperty("orderStatus","2");
                    msg.putUserProperty("userName","mm");
                    msg.putUserProperty("orderId","654646");
                    sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
    
                    msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
                    msg.putUserProperty("orderStatus","2");
                    msg.putUserProperty("userName","lcl");
                    msg.putUserProperty("orderId","323");
                    sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
    
    
                    msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
                    msg.putUserProperty("orderStatus","1");
                    msg.putUserProperty("userName","lcl");
                    msg.putUserProperty("orderId","68789");
                    sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            producer.shutdown();

    消费者:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFileterConcumer");
    
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.setConsumerGroup("tagFileterConcumerGroup");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer.subscribe("SqlFilterTopic1", MessageSelector.bySql("(orderStatus = '1' and userName = 'lcl' and orderId > 0)"));
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s TagA Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
    
            DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer("tagFileterConcumer1");
    
            consumer1.setNamesrvAddr("127.0.0.1:9876");
            consumer1.setConsumerGroup("tagFileterConcumerGroup1");
            consumer1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer1.subscribe("TagFilterTopic1", "TagA || TagB");
            consumer1.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("%s TagA&TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer1.start();
            System.out.printf("Consumer Started.%n");
    
            DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("tagFileterConcumer2");
    
            consumer2.setNamesrvAddr("127.0.0.1:9876");
            consumer2.setConsumerGroup("tagFileterConcumerGroup2");
            consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer2.subscribe("TagFilterTopic1", "TagB");
            consumer2.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("%s TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer2.start();
            System.out.printf("Consumer Started.%n");

    消费者输出结果:

    3、类过滤模式

  • 相关阅读:
    CF 1119 题解
    CF 582 题解
    CF 1098 题解
    CF 1129 题解
    CF 513 题解
    CF 417 D 题解
    ingress nginx遇到502错误,connect() failed (113 Host is unreachable) while connecting to upstream
    MySQL性能剖析
    MySQL的基准测试
    MySQL架构与历史
  • 原文地址:https://www.cnblogs.com/liconglong/p/12907343.html
Copyright © 2011-2022 走看看