zoukankan      html  css  js  c++  java
  • RocketMQ消费者示例程序

      转载请注明出处:http://www.cnblogs.com/xiaodf/

      本博客实现了一个简单的RocketMQ消费者的示例,MQ里存储的是经过Avro序列化的消息数据,程序读取数据并反序列化后,将消息从控制台打印出来。

      程序通过stdin.xml配置文件获取主要参数值,stdin.xml文件内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <operator>
    	<parameters>
    		<parameter>
    			<key>rocketmq.nameserver.list</key>
    			<value>172.16.8.106:9876</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.group.id</key>
    			<value>test006</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.topic</key>
    			<value>TopicTest2</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.tags</key>
    			<value>*</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.message.key</key>
    			<value>OrderID0034</value>
    		</parameter>
    		<parameter>
    			<key>schemaStr</key>
    			<value>col1:string,col2:double</value>
    		</parameter>
    		<parameter>
    			<key>filePath</key>
    			<value>/home/test/rocketmq/input.txt</value>
    		</parameter>
    	</parameters>
    </operator>
    

      

    消费者示例程序如下:

    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.scistor.datavision.operator.common.OperatorConfiguration;
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericDatumReader;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.Decoder;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.hive.hcatalog.common.HCatException;
    import org.apache.hive.hcatalog.data.schema.HCatSchema;
    
    import java.io.IOException;
    import java.util.List;
    
    
    public class RocketConsumer {
        private static Schema schema = null;
        // parameters
        private String nameserver;
        private String groupID;
        private String rocketmqTopic;
        private String tags;
        private String schemaStr;
    
        public RocketConsumer configure(OperatorConfiguration conf) {
            this.nameserver = conf.get("rocketmq.nameserver.list");
            this.groupID = conf.get("rocketmq.group.id");
            this.rocketmqTopic = conf.get("rocketmq.topic");
            this.tags = conf.get("rocketmq.tags");
            this.schemaStr = conf.get("schemaStr");
            return this;
        }
    
        /**
         * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
         * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
         */
        public void run() throws MQClientException {
            /**
             * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
             * 注意:ConsumerGroupName需要由应用来保证唯一
             */
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
            consumer.setNamesrvAddr(nameserver);
            consumer.setInstanceName(groupID+"Consumber");
            consumer.setConsumerGroup(groupID);
    
            try {
                consumer.subscribe(rocketmqTopic, tags);
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
            HCatSchema hcatSchema = null;
            SchemaUtil schemaUtil = new SchemaUtil();
            try {
                hcatSchema = schemaUtil.createHCatSchema(schemaStr);
                schema = schemaUtil.createSchema("com.scistor.rocketmq.producer", rocketmqTopic, hcatSchema);
            } catch (HCatException e) {
                e.printStackTrace();
            }
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                /**
                 * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
                 */
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.println(Thread.currentThread().getName()
                            + " Receive New Messages: " + msgs.size());
    
                    for (MessageExt msg : msgs) {
                        if (msg.getTopic().equals(rocketmqTopic)) {
                            byte[] bytes = msg.getBody();
                            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
                            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
                            GenericRecord record1 = null;
                            try {
                                record1 = reader.read(null, decoder);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            int fieldSize = record1.getSchema().getFields().size();
                            for (int i = 0; i < fieldSize; i++) {
                                System.out.print(record1.get(i) + "====");
                            }
                            System.out.println();
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
             */
            consumer.start();
    
            System.out.println("Consumer Started.");
        }
    
        public static void main(String[] args) throws MQClientException {
            if (args.length < 1) {
                System.err.println("需要: 参数配置文件<stdin.xml>所在的hdfs目录");
                System.exit(-1);
            }
            OperatorConfiguration conf = new OperatorConfiguration(args[0]);
            RocketConsumer trainer = new RocketConsumer();
            trainer.configure(conf).run();
        }
    }
    

    程序运行输出打印到控制台:

    [root@m106 rocketmq]# ./consumer.sh 
    Consumer Started.
    ConsumeMessageThread_5 Receive New Messages: 1
    ConsumeMessageThread_1 Receive New Messages: 1
    ConsumeMessageThread_9 Receive New Messages: 1
    ConsumeMessageThread_8 Receive New Messages: 1
    ConsumeMessageThread_7 Receive New Messages: 1
    ConsumeMessageThread_4 Receive New Messages: 1
    ConsumeMessageThread_2 Receive New Messages: 1
    ConsumeMessageThread_10 Receive New Messages: 1
    ConsumeMessageThread_3 Receive New Messages: 1
    ConsumeMessageThread_6 Receive New Messages: 1
    /home/xdf/workflow/file-web/file/1.html====hdfs:///user/xdf/streaming/file-web/file/3.html====1.0====hdfs:///user/xdf/streaming/file-web/file/2.html====hdfs:///user/xdf/streaming/file-web/file/4.html====/home/xdf/workflow/file-web/file/3.html====hdfs:///user/xdf/streaming/file-web/file/1.html====/home/xdf/workflow/file-web/file====1.0====
    hdfs:///user/xdf/streaming/file-web/file====/home/xdf/workflow/file-web/file/2.html====/home/xdf/workflow/file-web/file/4.html====2.0====
    1.0====
    1.0====
    3.0====
    4.0====
    2.0====
    3.0====
    4.0====

      

  • 相关阅读:
    alpha版本发布前的进度
    1.26~1.27
    1.23~1.25
    1月21日~1月22日工作情况
    1月17日工作情况
    1月16日小组开会
    1月15日工作进度
    1月12日~1月14日工作进度
    linux下的动态链接库管理
    小组第一次小组讨论
  • 原文地址:https://www.cnblogs.com/xiaodf/p/5999756.html
Copyright © 2011-2022 走看看