代码示例:
package cn.com.kong.streaming; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.*; import scala.Tuple2; import scala.collection.JavaConverters; import scala.collection.Seq; import java.io.IOException; import java.util.*; /** * create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000} */ public class KafkaOffsetToHBase { public static void main(String[] args) throws InterruptedException, IOException { // if (args.length < 6) { // System.err.println("Usage: KafkaOffsetToHBase <batch-duration-in-seconds> <kafka-bootstrap-servers> " + // "<kafka-topics> <kafka-consumer-group-id> <hbase-table-name> <kafka-zookeeper-quorum>"); // System.exit(1); // } long batchDuration = 10L; String bootstrapServers = "172.30.xx.xx:9092,172.30.x.xx:9092"; String consumerGroupID = "group01"; String topic = "topic02"; String hbaseTableName = "stream_kafka_offsets"; String zkQuorum = "172.30.x.xx:2181,172.30.x.xx:2181,172.30.x.xxx:2181"; String zkKafkaRootDir = "brokers"; int zkSessionTimeOut = 10000; int zkConnectionTimeOut = 10000; HashSet<String> topicsSet = new HashSet<>(Arrays.asList(topic.split(","))); // kafka相关配置参数 HashMap<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers); kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,consumerGroupID); kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); SparkConf sparkConf = new SparkConf().setAppName("KafkaOffsetToHBase").setMaster("local[3]") .set("spark.io.compression.codec","snappy"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration)); jssc.sparkContext().setLogLevel("WARN"); Map<TopicPartition, Long> fromOffsetsMap = getLastCommittedOffsets(topic, consumerGroupID, hbaseTableName, zkQuorum, zkKafkaRootDir, zkSessionTimeOut, zkConnectionTimeOut); JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent() , ConsumerStrategies.Subscribe(topicsSet, kafkaParams, fromOffsetsMap)); inputDStream.foreachRDD((javaRDD, time) -> { if(!javaRDD.isEmpty()){ OffsetRange[] offsetRanges = ((HasOffsetRanges) javaRDD.rdd()).offsetRanges(); for (OffsetRange offset : offsetRanges) { System.out.println("topic:"+offset.topic()+",partition:"+offset.partition()+",fromOffset:" +offset.fromOffset()+",untilOffset:"+offset.untilOffset()); } //处理逻辑 JavaRDD<String> newRDD = javaRDD.map(record -> record.value()); long count = newRDD.count(); //保存offset到hbase //这里需要注意:计算结果如果与offset分开存储,可能会出现计算结果保存成功了,但是offset保存失败了 //那么下次启动的时候,会有部分数据重复消费,所以需要做好幂等 saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,time); System.out.println("已经处理的消息条数: " + count); } }); jssc.start(); jssc.awaitTermination(); } /** * 保存offset * @param topic_name * @param group_id * @param offsetRanges * @param hbaseTableName * @param batchTime */ private static void saveOffsets(String topic_name, String group_id, OffsetRange[] offsetRanges , String hbaseTableName, Time batchTime) throws IOException { Configuration hbaseConf = HBaseConfiguration.create(); hbaseConf.set("hbase.zookeeper.property.clientPort","2181"); hbaseConf.set("hbase.zookeeper.quorum","172.30.x.xx,172.30.x.xxx,172.30.x.xx"); Connection connection = ConnectionFactory.createConnection(hbaseConf); Table table = connection.getTable(TableName.valueOf(hbaseTableName)); String rowKey = topic_name+":"+group_id+":"+batchTime.milliseconds(); Put put = new Put(rowKey.getBytes()); for (OffsetRange offset : offsetRanges) { put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(String.valueOf(offset.partition())) ,Bytes.toBytes(String.valueOf(offset.untilOffset()))); } table.put(put); connection.close(); } /** * * @param topic_name * @param group_id * @param hbaseTableName * @param zkQuorum * @param zkRootDir * @param sessionTimeOut * @param connectionTimeOut * @return * @throws IOException */ private static Map<TopicPartition, Long> getLastCommittedOffsets(String topic_name, String group_id , String hbaseTableName, String zkQuorum, String zkRootDir, int sessionTimeOut , int connectionTimeOut) throws IOException { Configuration hbaseConf = HBaseConfiguration.create(); hbaseConf.set("hbase.zookeeper.property.clientPort","2181"); hbaseConf.set("hbase.zookeeper.quorum","172.30.x.xx,172.30.x.xx,172.30.x.xx"); String zkUrl = zkQuorum + "/" + zkRootDir; Tuple2<ZkClient, ZkConnection> zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl , sessionTimeOut, connectionTimeOut); ZkUtils zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false); Seq<String> seq_topic = JavaConverters.asScalaIteratorConverter(Collections.singletonList(topic_name) .iterator()).asScala().toSeq(); int zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(seq_topic).get(topic_name) .toList().head().size(); //Connect to HBase to retrieve last committed offsets Connection conn = ConnectionFactory.createConnection(hbaseConf); Table table = conn.getTable(TableName.valueOf(hbaseTableName)); String startRow = topic_name + ":" + group_id + ":" + System.currentTimeMillis(); String stopRow = topic_name + ":" + group_id + ":" + 0; Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan.setStartRow(startRow.getBytes()).setStopRow( stopRow.getBytes()).setReversed(true)); Result result = scanner.next(); //Set the number of partitions discovered for a topic in HBase to 0 int hbaseNumberOfPartitionsForTopic = 0; if (result != null){ //If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells hbaseNumberOfPartitionsForTopic = result.listCells().size(); } Map<TopicPartition, Long> fromOffsets = new HashMap<>(); if(hbaseNumberOfPartitionsForTopic == 0){ // initialize fromOffsets to beginning for (int partition = 0; partition < zKNumberOfPartitionsForTopic -1; partition++) { fromOffsets.put(new TopicPartition(topic_name,partition),0L); } }else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){ // handle scenario where new partitions have been added to existing kafka topic //如果该topic的分区执行了新增,也就是zk里记录的目前topic分区数大于在hbase保存的分区数 for (int partition = 0; partition < hbaseNumberOfPartitionsForTopic -1; partition++) { //找出每个分区在hbase里记录的offset String offset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes( String.valueOf(partition)))); fromOffsets.put(new TopicPartition(topic_name,partition),Long.valueOf(offset)); } for (int partition = hbaseNumberOfPartitionsForTopic; partition < zKNumberOfPartitionsForTopic-1; partition++) { fromOffsets.put(new TopicPartition(topic_name,partition),0L); } }else { //initialize fromOffsets from last run //没有新增分区,在应用重启以后,从hbase里读取上次记录的offset值 for (int partition = 0; partition < hbaseNumberOfPartitionsForTopic; partition++) { String offset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"), Bytes.toBytes( String.valueOf(partition)))); fromOffsets.put(new TopicPartition(topic_name,partition),Long.valueOf(offset)); System.out.println("组装fromOffset接着上次处理--topic_name:"+topic_name+",partition:"+partition+",fromOffset:"+offset); } } scanner.close(); conn.close(); return fromOffsets; } }
官网:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets