zoukankan      html  css  js  c++  java
  • RocketMQ生产者示例程序

      转载请注明出处:http://www.cnblogs.com/xiaodf/

      本示例展示了一个RocketMQ producer的简单实现,通过解析文本文件获取输入数据,将数据经过Avro序列化后发送到RocketMQ。

      程序通过stdin.xml配置文件获取主要参数值,stdin.xml文件内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <operator>
    	<parameters>
    		<parameter>
    			<key>rocketmq.nameserver.list</key>
    			<value>172.16.8.106:9876</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.group.id</key>
    			<value>test006</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.topic</key>
    			<value>TopicTest2</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.tags</key>
    			<value>*</value>
    		</parameter>
    		<parameter>
    			<key>rocketmq.message.key</key>
    			<value>OrderID0034</value>
    		</parameter>
    		<parameter>
    			<key>schemaStr</key>
    			<value>col1:string,col2:double</value>
    		</parameter>
    		<parameter>
    			<key>filePath</key>
    			<value>/home/test/rocketmq/input.txt</value>
    		</parameter>
    	</parameters>
    </operator>
    

      

    生产者示例程序如下:

    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.scistor.datavision.operator.common.AvroUtils;
    import com.scistor.datavision.operator.common.OperatorConfiguration;
    import org.apache.avro.Schema;
    import org.apache.hive.hcatalog.common.HCatException;
    import org.apache.hive.hcatalog.data.schema.HCatSchema;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    
    public class RocketProducer {
    
        // parameters
        private String nameserver;
        private String rocketmqTopic;
        private String tags;
        private String key;
        private String schemaStr;
        private String filePath;
    
        public RocketProducer configure(OperatorConfiguration conf) {
            this.nameserver = conf.get("rocketmq.nameserver.list");
            this.rocketmqTopic = conf.get("rocketmq.topic");
            this.tags = conf.get("rocketmq.tags");
            this.key = conf.get("rocketmq.message.key");
            this.schemaStr = conf.get("schemaStr");
            this.filePath = conf.get("filePath");
            return this;
        }
    
        public int run() {
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            producer.setNamesrvAddr(nameserver);
            producer.setInstanceName("RocketProducer");
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
            HCatSchema hcatSchema = null;
            Schema schema = null;
            SchemaUtil schemaUtil = new SchemaUtil();
            try {
                hcatSchema = schemaUtil.createHCatSchema(schemaStr);
                schema = schemaUtil.createSchema("com.scistor.rocketmq.producer", rocketmqTopic, hcatSchema);
            } catch (HCatException e) {
                e.printStackTrace();
            }
    
            List<String> content = RocketProducer.readFileByLines(filePath);
    
            /**
             * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
             * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
             * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
             * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
             */
            for (int i = 0; i < content.size(); i++) {
                try {
                    {
                        String[] fields = content.get(i).split(",");
                        Object[] record = AvroUtils.convert(schema, fields);
                        byte[] bytes = AvroUtils.serialize(schema, record);
                        Message msg = new Message(rocketmqTopic,// topic
                                tags,// tag
                                key,// key
                                bytes);// body
                        SendResult sendResult = producer.send(msg);
                        System.out.println(sendResult);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //TimeUnit.MILLISECONDS.sleep(10);
            }
    
            /**
             * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
             * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
             */
            producer.shutdown();
            return 0;
        }
    
        public static List<String> readFileByLines(String fileName) {
            List<String> list = new ArrayList<String>();
            File file = new File(fileName);
            BufferedReader reader = null;
            try {
                System.out.println("以行为单位读取文件内容,一次读一整行:");
                reader = new BufferedReader(new FileReader(file));
                String tempString = null;
                int line = 1;
                // 一次读入一行,直到读入null为文件结束
                while ((tempString = reader.readLine()) != null) {
                    // 显示行号
                    list.add(tempString);
                    System.out.println("line " + line + ": " + tempString);
                    line++;
                }
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException e1) {
                    }
                }
            }
            return list;
        }
    
        public static void main(String[] args) {
            if (args.length < 1) {
                System.err.println("需要: 参数配置文件<stdin.xml>所在的hdfs目录");
                System.exit(-1);
            }
            OperatorConfiguration conf = new OperatorConfiguration(args[0]);
            RocketProducer trainer = new RocketProducer();
            System.exit(trainer.configure(conf).run());
        }
    }
    

      

    程序运行输出打印到控制台:

    [root@m106 rocketmq]# ./produce.sh 
    
    以行为单位读取文件内容,一次读一整行: line 1: hdfs:///user/xdf/streaming/file-web/file/1.html,1 line 2: hdfs:///user/xdf/streaming/file-web/file/2.html,2 line 3: hdfs:///user/xdf/streaming/file-web/file/3.html,3 line 4: hdfs:///user/xdf/streaming/file-web/file/4.html,4 line 5: hdfs:///user/xdf/streaming/file-web/file,1 line 6: /home/xdf/workflow/file-web/file/1.html,1 line 7: /home/xdf/workflow/file-web/file/2.html,2 line 8: /home/xdf/workflow/file-web/file/3.html,3 line 9: /home/xdf/workflow/file-web/file/4.html,4 line 10: /home/xdf/workflow/file-web/file,1 SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00A36, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18710] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00AED, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18700] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00BA4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2], queueOffset=18668] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00C5B, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3], queueOffset=18663] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197504, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=0], queueOffset=18649] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E1975B4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=1], queueOffset=18633] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197663, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=2], queueOffset=18629] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197712, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=3], queueOffset=18626] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00D12, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18711] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00DC1, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18701]

      

  • 相关阅读:
    黑马程序员——指针的应用
    黑马程序员——C语言基础常量、运算符、函数
    黑马程序员——数组
    黑马程序员——循环结构for,while,do..while
    webView去掉右侧导航条
    使用Eclipse构建Maven的SpringMVC项目
    win7 自动登录
    eclipse 自动提示
    apache+php+mysql 环境配置
    KMP子串查找算法
  • 原文地址:https://www.cnblogs.com/xiaodf/p/5999727.html
Copyright © 2011-2022 走看看