zoukankan      html  css  js  c++  java
  • RocketMq(二、生产者、消费者demo)

    在实际环境中,应先启动消费者,去订阅完服务后,再启动生产者。

    生产者 Producer

    可靠同步发送

    package com.wk.test.rocketmqTest;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
        public static void main(String[] args) {
            //定义生产者名称
            DefaultMQProducer producer = new DefaultMQProducer("quickstart_product");
            //连接rocketMQ的namesrv地址(这里是集群)
            producer.setNamesrvAddr("10.32.16.195:9876;10.32.16.196:9876");
            try {
                producer.start();
                for(int i = 0;i<100;i++){
                    //1.主题,一般在服务器设置好,不能从代码中新建。2.标签。3.发送内容。
                    Message message = new Message("TopicQuickStart","TagA",("Hello RocketMQ" + i).getBytes());
                    //发送设置超时时间10秒
                    SendResult sendResult = producer.send(message,10000);
                    System.out.println(sendResult);
                }
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            }finally {
                producer.shutdown();
            }
        }
    }

    可靠异步发送

    package com.wk.test.rocketmqTest;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
        public static void main(String[] args) {
            //定义生产者名称
            DefaultMQProducer producer = new DefaultMQProducer("quickstart_product");
            //连接rocketMQ的namesrv地址(这里是集群)
            producer.setNamesrvAddr("10.32.16.179:9876");
            //发送失败重试3次
            //producer.setRetryTimesWhenSendFailed(3000);
            try {
                producer.start();
                //1.主题,一般在服务器设置好,不能从代码中新建。2.标签。3.发送内容。
                Message message = new Message("TopicQuickStart","Tag1",("生产者重试").getBytes());
                //发送设置超时时间10秒
                producer.send(message, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }
    
                    @Override
                    public void onException(Throwable e) {
                        e.printStackTrace();
                    }
                },10000);
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            }finally {
                //此处的代码必须注销,否则会报No route info of this topic错误
                //producer.shutdown();
            }
        }
    }

    单向发送

    耗时非常短,但数据不可靠

    package com.wk.test.rocketmqTest;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
        public static void main(String[] args) {
            //定义生产者名称
            DefaultMQProducer producer = new DefaultMQProducer("quickstart_product");
            //连接rocketMQ的namesrv地址(这里是集群)
            producer.setNamesrvAddr("10.32.16.179:9876");
            //发送失败重试3次
            producer.setSendMsgTimeout(5000);
            try {
                producer.start();
                //1.主题,一般在服务器设置好,不能从代码中新建。2.标签。3.发送内容。
                Message message = new Message("TopicQuickStart","Tag1",("生产者重试").getBytes());
                producer.sendOneway(message);
            } catch (MQClientException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RemotingException e) {
                e.printStackTrace();
            }finally {
                producer.shutdown();
            }
        }
    }

    消费者 Consumer

    集群消费

    默认或者设置

    consumer.setMessageModel(MessageModel.CLUSTERING);
    package com.wk.test.rocketmqTest;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.io.UnsupportedEncodingException;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //定义消费者名称,MQ往消费者推送
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
            //连接rocketMQ的namesrv地址(此次为集群)
            consumer.setNamesrvAddr("10.32.16.179:9876");
            //新订阅组第一次启动,从头消费到尾,后续从上次的消费进度继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅的主题和标签(*代表所有标签)
            consumer.subscribe("TopicQuickStart","*");
            //消费者监听
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                for(MessageExt msg:msgs){
                    try {
                        String topic = msg.getTopic();
                        String msgbody = new String(msg.getBody(),"UTF-8");
                        String tag = msg.getTags();
                        System.out.println("topic:"+topic+" msgbody:"+msgbody+" tag:"+tag);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        //MQ发送失败重试机制,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                //消息处理成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        }
    }

     广播消费

    设置

    consumer.setMessageModel(MessageModel.BROADCASTING);
    package com.wk.test.rocketmqTest;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //定义消费者名称,MQ往消费者推送
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
            //连接rocketMQ的namesrv地址(此次为集群)
            consumer.setNamesrvAddr("10.32.16.179:9876");
            //新订阅组第一次启动,从头消费到尾,后续从上次的消费进度继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //广播模式
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //订阅的主题和标签(*代表所有标签)
            consumer.subscribe("TopicQuickStart", "Tag1 || Tag2");
            //消费者监听
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                MessageExt msg = msgs.get(0);
                try {
                    String topic = msg.getTopic();
                    String msgbody = new String(msg.getBody(), "UTF-8");
                    String tag = msg.getTags();
                    System.out.println("topic:" + topic + " msgbody:" + msgbody + " tag:" + tag);
                    //dosomething...业务处理
                } catch (Exception e) {
                    e.printStackTrace();
                    //重试3次扔不成功则不继续重试
                    if(msg.getReconsumeTimes() == 3){
                        //记录日志或进行持久化操作。
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    //MQ发送失败重试机制,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消息处理成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        }
    }
  • 相关阅读:
    自定义控件-控件关联
    DELPHI INSERT INTO 语句的语法错误 解决方法
    Delphi控件开发
    Delphi控件复合控件
    vcl学习备忘网址
    Delphi单元文件Unit详解
    aowner , nil 和 self 的区别
    Delphi 自定义事件的例子
    PHP中Heredoc
    What is HTTP_USER_AGENT?
  • 原文地址:https://www.cnblogs.com/Unlimited-Blade-Works/p/12395632.html
Copyright © 2011-2022 走看看