zoukankan      html  css  js  c++  java
  • kafkaConsumer(从topic 拿数据存入hdfs)

    import kafka.consumer.ConsumerConfig;  
    import kafka.consumer.KafkaStream;  
    import kafka.javaapi.consumer.ConsumerConnector;  
    import kafka.serializer.StringDecoder;  
    import kafka.utils.VerifiableProperties;  
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;  
    import java.util.List;  
    import java.util.Map;  
    import java.util.Properties;  
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import com.xinsight.Pool.ThreadPoolManager;
    import com.xinsight.Thread.ConsumerThread;
    public class KafkaConsumer {  
        private ConsumerConnector consumer = null;  
        private static FSDataOutputStream  hdfsOutStream;
        private static FSDataInputStream is;
        public static FileSystem fs ;
        public static Configuration conf;
        public static int num = 0;
        private static String filePath;
        public static String lock = new String("lock");
        public static void setFSDataOutputStream(String filename){
            Path path = new Path(filename);
            if(hdfsOutStream != null){
                try {
                    hdfsOutStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            synchronized (lock) {
                try {
                    if(fs.exists(path)){
                        is = fs.open(path);
                        FileStatus stat = fs.getFileStatus(path); 
                        byte[] buffer = buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; 
                        is.readFully(0, buffer);  
                        is.close(); 
                        fs.delete(path);
                        hdfsOutStream = fs.create(path);
                        hdfsOutStream.write(buffer);
                    }else{
                        hdfsOutStream = fs.create(path);
                        
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        public static FSDataOutputStream getFSDataOutputStream(){
            return hdfsOutStream;
        }
        public static void init(){
            try {
                conf = new Configuration();
                String url = "hdfs://Master:9000/test" ;
                conf .set("dfs.client.block.write.replace-datanode-on-failure.policy" ,"NEVER" );
                conf .set("dfs.client.block.write.replace-datanode-on-failure.enable" ,"true" );
                fs = FileSystem.get( new URI( url), conf);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) throws InterruptedException {  
            
            if (args.length == 3) {
                init();
                String fileName = getFileName();
                filePath = args[1]+"/"+fileName ;
                KafkaConsumer.setFSDataOutputStream(filePath);
                /**********zookeeper的列表*********/
                String zkInfo = args[0];
                /************存入的hdfs文件夹**************/
                String uri = args[1];
                /************kafka的topic**********/
                String topic = args[2];
                KafkaConsumer consumer = new KafkaConsumer();
                consumer.setConsumer(zkInfo);
                consumer.consume(lock,topic);
            }
            
        }  
        /**
         * 加载配置
         * @param zkInfo
         */
        public void setConsumer(String zkInfo) {  
            Properties props = new Properties();  
            //zookeeper 配置  
            props.put("zookeeper.connect",zkInfo);  
            //group 代表一个消费组  
            props.put("group.id", "jd-group");  
            props.put("zookeeper.session.timeout.ms", "5000"); //client连接到ZK server的超时时间。
            props.put("zookeeper.sync.time.ms", "200"); //1个ZK follower能落后leader多久
            props.put("auto.commit.interval.ms", "1000");  //consumer向ZooKeeper发送offset的时间间隔。
            props.put("auto.offset.reset", "smallest");//读取旧数据  
            props.put("rebalance.max.retries", "5");//当一个新的consumer加入一个consumer group时,会有一个rebalance的操作,导致每一个consumer和partition的关系重新分配。如果这个重分配失败的话,会进行重试,此配置就代表最大的重试次数。
            props.put("rebalance.backoff.ms", "1200");//在rebalance重试时的backoff时间。
            //序列化类  
            props.put("serializer.class", "kafka.serializer.StringEncoder");  
            ConsumerConfig config = new ConsumerConfig(props);  
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
        }  
        /**
         * 获得文件名
         * @return
         */
        public static String getFileName(){
            long time = System.currentTimeMillis();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH");
            String date = sdf.format(new Date(time));
            return date;
        }
        
        /**
         * 得到topic的消息
         * @param hdfsPath
         * @param topic
         * @throws InterruptedException 
         */
        public void consume(String lock,String topic) throws InterruptedException {  
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
            topicCountMap.put(topic, new Integer(3));  
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
            Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);  
            List<KafkaStream<String, String>> streams = consumerMap.get(topic);
            System.out.println(streams.size());
            for(final KafkaStream stream : streams){
                ThreadPoolManager.dbShortSchedule(new ConsumerThread(stream,lock), 0);
            
            }
            System.out.println("finish");  
        }  
    }  
    package com.xinsight.Thread;
    import java.io.IOException;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import com.xinsight.kafkaConsumer.KafkaConsumer;
    public class ConsumerThread implements Runnable{
        private String lock;
        private KafkaStream m_stream;
        private int max_sync = 1000;
        private int current_sync = 0;
        public ConsumerThread(KafkaStream a_stream,String lock) {
            this.m_stream = a_stream;
            this.lock = lock;
        }
        @Override
        public void run() {
            ConsumerIterator<String, String> it = m_stream.iterator();
            while (it.hasNext()) {
                String message = it.next().message();
                try {
                    synchronized (lock) {
                        WriteFile(KafkaConsumer.getFSDataOutputStream(),message);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } 
        }
        /**
         * 写入hdfs的操作
         * @param hdfs
         * @param message
         * @throws IOException
         */
        public void WriteFile(FSDataOutputStream hdfsOutStream,String message) throws IOException {
            try{
                hdfsOutStream.write(message.getBytes());
                hdfsOutStream.write("
    ".getBytes());
                current_sync++;
                if(current_sync>=max_sync){
                    hdfsOutStream.sync();
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    PL/SQL学习笔记之包
    PL/SQL学习笔记之触发器
    PL/SQL学习笔记之异常
    PL/SQL学习笔记之记录
    PL/SQL学习笔记之游标
    PL/SQL学习笔记之函数
    PL/SQL学习笔记之存储过程
    PL/SQL学习笔记之循环语句
    PL/SQL学习笔记之条件控制语句
    PL/SQL学习笔记之变量、常量、字面量、字符串
  • 原文地址:https://www.cnblogs.com/zqzdong/p/6438962.html
Copyright © 2011-2022 走看看