zoukankan      html  css  js  c++  java
  • RocketMQ生产者示例程序

      转载请注明出处:http://www.cnblogs.com/xiaodf/

      本示例展示了一个RocketMQ producer的简单实现,通过解析文本文件获取输入数据,将数据经过Avro序列化后发送到RocketMQ。

      程序通过stdin.xml配置文件获取主要参数值,stdin.xml文件内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <operator>
    	<parameters>
    		<parameter>
    			<key>rocketmq.nameserver.list</key>
    			<value>172.16.8.106:9876</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.group.id</key>
    			<value>test006</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.topic</key>
    			<value>TopicTest2</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.tags</key>
    			<value>*</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.message.key</key>
    			<value>OrderID0034</value>
    		</parameter>
    		<parameter>
    			<key>schemaStr</key>
    			<value>col1:string,col2:double</value>
    		</parameter>
    		<parameter>
    			<key>filePath</key>
    			<value>/home/test/rocketmq/input.txt</value>
    		</parameter>
    	</parameters>
    </operator>
    

      

    生产者示例程序如下:

    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.scistor.datavision.operator.common.AvroUtils;
    import com.scistor.datavision.operator.common.OperatorConfiguration;
    import org.apache.avro.Schema;
    import org.apache.hive.hcatalog.common.HCatException;
    import org.apache.hive.hcatalog.data.schema.HCatSchema;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class RocketProducer {
    
        // parameters
        private String nameserver;
        private String rocketmqTopic;
        private String tags;
        private String key;
        private String schemaStr;
        private String filePath;
    
        public RocketProducer configure(OperatorConfiguration conf) {
            this.nameserver = conf.get("rocketmq.nameserver.list");
            this.rocketmqTopic = conf.get("rocketmq.topic");
            this.tags = conf.get("rocketmq.tags");
            this.key = conf.get("rocketmq.message.key");
            this.schemaStr = conf.get("schemaStr");
            this.filePath = conf.get("filePath");
            return this;
        }
    
        public int run() {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            producer.setNamesrvAddr(nameserver);
            producer.setInstanceName("RocketProducer");
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
            HCatSchema hcatSchema = null;
            Schema schema = null;
            SchemaUtil schemaUtil = new SchemaUtil();
            try {
                hcatSchema = schemaUtil.createHCatSchema(schemaStr);
                schema = schemaUtil.createSchema("com.scistor.rocketmq.producer", rocketmqTopic, hcatSchema);
            } catch (HCatException e) {
                e.printStackTrace();
            }
    
            List<String> content = RocketProducer.readFileByLines(filePath);
    
            /**
             * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
             * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
             * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
             * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
             */
            for (int i = 0; i < content.size(); i++) {
                try {
                    {
                        String[] fields = content.get(i).split(",");
                        Object[] record = AvroUtils.convert(schema, fields);
                        byte[] bytes = AvroUtils.serialize(schema, record);
                        Message msg = new Message(rocketmqTopic,// topic
                                tags,// tag
                                key,// key
                                bytes);// body
                        SendResult sendResult = producer.send(msg);
                        System.out.println(sendResult);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //TimeUnit.MILLISECONDS.sleep(10);
            }
    
            /**
             * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
             * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
             */
            producer.shutdown();
            return 0;
        }
    
        public static List<String> readFileByLines(String fileName) {
            List<String> list = new ArrayList<String>();
            File file = new File(fileName);
            BufferedReader reader = null;
            try {
                System.out.println("以行为单位读取文件内容,一次读一整行:");
                reader = new BufferedReader(new FileReader(file));
                String tempString = null;
                int line = 1;
                // 一次读入一行,直到读入null为文件结束
                while ((tempString = reader.readLine()) != null) {
                    // 显示行号
                    list.add(tempString);
                    System.out.println("line " + line + ": " + tempString);
                    line++;
                }
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e1) {
                    }
                }
            }
            return list;
        }
    
        public static void main(String[] args) {
            if (args.length < 1) {
                System.err.println("需要: 参数配置文件<stdin.xml>所在的hdfs目录");
                System.exit(-1);
            }
            OperatorConfiguration conf = new OperatorConfiguration(args[0]);
            RocketProducer trainer = new RocketProducer();
            System.exit(trainer.configure(conf).run());
        }
    }
    

      

    程序运行输出打印到控制台:

    [root@m106 rocketmq]# ./produce.sh 
    
    以行为单位读取文件内容,一次读一整行: line 1: hdfs:///user/xdf/streaming/file-web/file/1.html,1 line 2: hdfs:///user/xdf/streaming/file-web/file/2.html,2 line 3: hdfs:///user/xdf/streaming/file-web/file/3.html,3 line 4: hdfs:///user/xdf/streaming/file-web/file/4.html,4 line 5: hdfs:///user/xdf/streaming/file-web/file,1 line 6: /home/xdf/workflow/file-web/file/1.html,1 line 7: /home/xdf/workflow/file-web/file/2.html,2 line 8: /home/xdf/workflow/file-web/file/3.html,3 line 9: /home/xdf/workflow/file-web/file/4.html,4 line 10: /home/xdf/workflow/file-web/file,1 SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00A36, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18710] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00AED, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18700] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00BA4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2], queueOffset=18668] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00C5B, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3], queueOffset=18663] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197504, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=0], queueOffset=18649] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E1975B4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=1], queueOffset=18633] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197663, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=2], queueOffset=18629] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197712, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=3], queueOffset=18626] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00D12, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18711] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00DC1, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18701]

      

  • 相关阅读:
    Java for LeetCode 229 Majority Element II
    Java for LeetCode 228 Summary Ranges
    Java for LeetCode 227 Basic Calculator II
    Java for LintCode 颜色分类
    Java for LintCode 链表插入排序
    Java for LintCode 颠倒整数
    Java for LintCode 验证二叉查找树
    Java for LeetCode 226 Invert Binary Tree
    Java for LeetCode 225 Implement Stack using Queues
    Java for LeetCode 224 Basic Calculator
  • 原文地址:https://www.cnblogs.com/xiaodf/p/5999727.html
Copyright © 2011-2022 走看看