zoukankan      html  css  js  c++  java
  • RocketMQ(2)

    1. 消费端集群消费(负载均衡)

     示例代码:

    /**
     * Producer,发送消息
     * 
     */
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("message_producer");
            producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            producer.start();
    
            for (int i = 0; i < 100; i++) {
                try {
                    Message msg = new Message("TopicTest",// topic
                        "Tag1",// tag
                        ("Hello RocketMQ " + i).getBytes()// body
                            );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer1 {
    
        public Consumer1() {
            try {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
                consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
                consumer.registerMessageListener(new Listener());
                consumer.start();
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        class Listener implements MessageListenerConcurrently {
    
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                        
                        System.out.println("======暂停=====");
                        Thread.sleep(60000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            Consumer1 consumer1 = new Consumer1();
            System.out.println("Consumer1 Started.");
        }
    }
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer2 {
    
        public Consumer2() {
            try {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
                consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
                consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
                consumer.registerMessageListener(new Listener());
                consumer.start();
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        class Listener implements MessageListenerConcurrently {
    
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            Consumer2 consumer2 = new Consumer2();
            System.out.println("Consumer2 Started.");
        }
    }
    View Code

    一个生产者,两个消费者,注意两个消费者的组名要一样。

    先启动两个消费者(customer1,customer2),通过控制台查看如下:

    再启动生产者生成100条消息,消费情况如下:

     

    生成的100条消息被customer1和customer2平均的消费了。可以通过consumer.setAllocateMessageQueueStrategy去设置分配策略。

    BTW:这是默认的模式,可以通过consumer.setMessageModel设置,MessageModel.CLUSTERING | MessageModel.BROADCASTING,如果是广播消费,则每个客户端都会收到生产端的所有消息

    2.消息未响应会重发

     代码示例:

    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("message_producer");
            producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            producer.start();
    
            for (int i = 0; i < 1; i++) {
                try {
                    Message msg = new Message("TopicTest",// topic
                        "Tag1",// tag
                        ("Hello RocketMQ " + i).getBytes()// body
                            );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }
    
    
    public class Consumer1 {
    
        public Consumer1() {
            try {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
                consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
                consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
                consumer.registerMessageListener(new Listener());
                consumer.start();
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        class Listener implements MessageListenerConcurrently {
    
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                        
                        System.out.println("======暂停=====");
                        Thread.sleep(600000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            Consumer1 consumer1 = new Consumer1();
            System.out.println("Consumer1 Started.");
        }
    }
    
    
    public class Consumer2 {
    
        public Consumer2() {
            try {
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
                consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
                consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
                consumer.registerMessageListener(new Listener());
                consumer.start();
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        class Listener implements MessageListenerConcurrently {
    
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            Consumer2 consumer2 = new Consumer2();
            System.out.println("Consumer2 Started.");
        }
    }
    View Code

    先启动consumer1,再启动consumer2,最后启动producer

    consumer1收到了消息,consumer2没有收到消息,这时把consumer1强制停止,也就是说consumer1不会给MQ返回响应,查看结果:

     

    consumer2也收到消息了,说明在MQ没收到消费端响应的情况下,会重发消息。

     3. 修改topic的队列数

    默认的队列数是4个,可以从执行结果中看出:queueId都是0-3

    细节可以看https://www.cnblogs.com/dyfh/p/4113677.html

    可以增加设置producer.createTopic("TopicTest", "TopicTest", 8);

     
  • 相关阅读:
    Hadoop是什么
    Hadoop的安装模式
    hadoop和云计算的关系
    no server suitable for synchronization found的解决办法
    hadoop的产生背景、发展历程
    Hadoop生态系统
    学习路线
    日历+时钟
    新世界(未完善)
    简单的点击切换图
  • 原文地址:https://www.cnblogs.com/lostyears/p/8582299.html
Copyright © 2011-2022 走看看