zoukankan      html  css  js  c++  java
  • 简单理解Rocket---简单代码实例

    生产者producer,用于发送消息

     
    package com.xxx.rocketMQ;
     
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    /** 
      * @author  : xxx
      * @date : 2017年10月19日 下午3:57:22   
    */
    public class Producer {
        private static final String GROUP_NAME="producerGroup";
        //集群地址分号隔开
        private static final String NAME_SERVER="11.111.1111.1111:1111";
     
        public static void main(String[] args) throws MQClientException {
            //producerGroup必须唯一
            DefaultMQProducer producer = new DefaultMQProducer(GROUP_NAME);
            //设置name servver地址
            producer.setNamesrvAddr(NAME_SERVER);
            //在发送消息前,必须调用start方法启动producer
            producer.start();
            //topic:消息主题    tag:对消息进行再分类,方便consumer指定过滤条件在MQ服务器中过滤
            //sss:任何二进制形式的数据,需要producer和consumer协商好一致的序列化和反序列化方式
            Message msg = new Message("Topic","Tag","sss".getBytes());
            try {
                //发送消息
                producer.send(msg);
            } catch (RemotingException e) {
                e.printStackTrace();
            } catch (MQBrokerException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //应用退出前,可以销毁producer对象
            producer.shutdown();
        }
    }
     
    消费者consumer,用于接收和处理消息
     
    package com.xxx.rocketMQ;
     
    import java.util.List;
     
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
     
    /** 
      * @author  : xxx
      * @date : 2017年10月19日 下午3:57:27   
    */
    public class Consumer {
        private static final String GROUP_NAME="consumerGroup";
        private static final String NAME_SERVER="11.111.1111.1111:1111";
     
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_NAME);
            //第一次启动从何处开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.setNamesrvAddr(NAME_SERVER);
            //订阅topic *表示任何标签  多个标签  tag||tag||tag
            consumer.subscribe("Topic", "*");
            //监听消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
     
     
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    }
     

        1,通过consumer.setNamesrvAddr(NAME_SERVER)以及producer.setNamesrvAddr(NAME_SERVER)来设置name server的地址,该地址应与服务器启动时指定的地址一致。

        2,consumeFromWhere()设置第一次启动何处消费。枚举类型ConsumeFromWhere如下:
        CONSUME_FROM_LAST_OFFSET:表示一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费
        CONSUME_FROM_FIRST_OFFSET:表示一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费
        CONSUME_FROM_TIMESTAMP:表示一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,时间点设置参见DefaultMQPushConsumer.consumeTimestamp
        3,consumer.subscribe("topic","*")来订阅感兴趣的topic和tag,当tag为*表示对任意标签都感兴趣,如果对几个标签感兴趣可以设置为“a||b||c”
     
    <<分布式系统常用技术及案例分析>>
     
     
  • 相关阅读:
    Delphi Help
    RAD 10 新控件 TSearchBox TSplitView
    滚动条
    c++builder Active Form
    chart左侧
    RAD 10 蓝牙
    浏览器插件 火狐插件
    c++builder delphi 调用dll dll编写
    模拟键盘 键盘虚拟代码
    oracle怎么把一个用户下的表复制给另一个用户?(授予表权限)
  • 原文地址:https://www.cnblogs.com/shangdongbin/p/7699243.html
Copyright © 2011-2022 走看看