import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.xinsight.Pool.ThreadPoolManager; import com.xinsight.Thread.ConsumerThread; public class KafkaConsumer { private ConsumerConnector consumer = null; private static FSDataOutputStream hdfsOutStream; private static FSDataInputStream is; public static FileSystem fs ; public static Configuration conf; public static int num = 0; private static String filePath; public static String lock = new String("lock"); public static void setFSDataOutputStream(String filename){ Path path = new Path(filename); if(hdfsOutStream != null){ try { hdfsOutStream.close(); } catch (IOException e) { e.printStackTrace(); } } synchronized (lock) { try { if(fs.exists(path)){ is = fs.open(path); FileStatus stat = fs.getFileStatus(path); byte[] buffer = buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); fs.delete(path); hdfsOutStream = fs.create(path); hdfsOutStream.write(buffer); }else{ hdfsOutStream = fs.create(path); } } catch (IOException e) { e.printStackTrace(); } } } public static FSDataOutputStream getFSDataOutputStream(){ return hdfsOutStream; } public static void init(){ try { conf = new Configuration(); String url = "hdfs://Master:9000/test" ; conf .set("dfs.client.block.write.replace-datanode-on-failure.policy" ,"NEVER" ); conf .set("dfs.client.block.write.replace-datanode-on-failure.enable" ,"true" ); fs = FileSystem.get( new URI( url), conf); } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { if (args.length == 3) { init(); String fileName = getFileName(); filePath = args[1]+"/"+fileName ; KafkaConsumer.setFSDataOutputStream(filePath); /**********zookeeper的列表*********/ String zkInfo = args[0]; /************存入的hdfs文件夹**************/ String uri = args[1]; /************kafka的topic**********/ String topic = args[2]; KafkaConsumer consumer = new KafkaConsumer(); consumer.setConsumer(zkInfo); consumer.consume(lock,topic); } } /** * 加载配置 * @param zkInfo */ public void setConsumer(String zkInfo) { Properties props = new Properties(); //zookeeper 配置 props.put("zookeeper.connect",zkInfo); //group 代表一个消费组 props.put("group.id", "jd-group"); props.put("zookeeper.session.timeout.ms", "5000"); //client连接到ZK server的超时时间。 props.put("zookeeper.sync.time.ms", "200"); //1个ZK follower能落后leader多久 props.put("auto.commit.interval.ms", "1000"); //consumer向ZooKeeper发送offset的时间间隔。 props.put("auto.offset.reset", "smallest");//读取旧数据 props.put("rebalance.max.retries", "5");//当一个新的consumer加入一个consumer group时,会有一个rebalance的操作,导致每一个consumer和partition的关系重新分配。如果这个重分配失败的话,会进行重试,此配置就代表最大的重试次数。 props.put("rebalance.backoff.ms", "1200");//在rebalance重试时的backoff时间。 //序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } /** * 获得文件名 * @return */ public static String getFileName(){ long time = System.currentTimeMillis(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHH"); String date = sdf.format(new Date(time)); return date; } /** * 得到topic的消息 * @param hdfsPath * @param topic * @throws InterruptedException */ public void consume(String lock,String topic) throws InterruptedException { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(3)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); List<KafkaStream<String, String>> streams = consumerMap.get(topic); System.out.println(streams.size()); for(final KafkaStream stream : streams){ ThreadPoolManager.dbShortSchedule(new ConsumerThread(stream,lock), 0); } System.out.println("finish"); } } package com.xinsight.Thread; import java.io.IOException; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import org.apache.hadoop.fs.FSDataOutputStream; import com.xinsight.kafkaConsumer.KafkaConsumer; public class ConsumerThread implements Runnable{ private String lock; private KafkaStream m_stream; private int max_sync = 1000; private int current_sync = 0; public ConsumerThread(KafkaStream a_stream,String lock) { this.m_stream = a_stream; this.lock = lock; } @Override public void run() { ConsumerIterator<String, String> it = m_stream.iterator(); while (it.hasNext()) { String message = it.next().message(); try { synchronized (lock) { WriteFile(KafkaConsumer.getFSDataOutputStream(),message); } } catch (IOException e) { e.printStackTrace(); } } } /** * 写入hdfs的操作 * @param hdfs * @param message * @throws IOException */ public void WriteFile(FSDataOutputStream hdfsOutStream,String message) throws IOException { try{ hdfsOutStream.write(message.getBytes()); hdfsOutStream.write(" ".getBytes()); current_sync++; if(current_sync>=max_sync){ hdfsOutStream.sync(); } }catch (Exception e) { e.printStackTrace(); } } }