一、从kafkaBroker获取offset
/**
* 测试之前需要启动kafka
* @author root
*
*/
public class GetTopicOffsetFromKafkaBroker {
public static void main(String[] args) {
Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic");
Set<Entry<TopicAndPartition, Long>> entrySet = topicOffsets.entrySet();
for(Entry<TopicAndPartition, Long> entry : entrySet) {
TopicAndPartition topicAndPartition = entry.getKey();
Long offset = entry.getValue();
String topic = topicAndPartition.topic();
int partition = topicAndPartition.partition();
System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset);
}
}
/**
* 从kafka集群中得到当前topic,生产者在每个分区中生产消息的偏移量位置
* @param KafkaBrokerServer
* @param topic
* @return
*/
public static Map<TopicAndPartition,Long> getTopicOffsets(String KafkaBrokerServer, String topic){
Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>();
for(String broker:KafkaBrokerServer.split(",")){
SimpleConsumer simpleConsumer = new SimpleConsumer(broker.split(":")[0],Integer.valueOf(broker.split(":")[1]), 64*10000,1024,"consumer");
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic));
TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
for (PartitionMetadata part : metadata.partitionsMetadata()) {
Broker leader = part.leader();
if (leader != null) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId());
PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 10000);
OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
if (!offsetResponse.hasError()) {
long[] offsets = offsetResponse.offsets(topic, part.partitionId());
retVals.put(topicAndPartition, offsets[0]);
}
}
}
}
simpleConsumer.close();
}
return retVals;
}
}
二、从zookeeper获取offset
public class GetTopicOffsetFromZookeeper {
public static Map<TopicAndPartition,Long> getConsumerOffsets(String zkServers,String groupID, String topic) {
Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>();
ObjectMapper objectMapper = new ObjectMapper();
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(zkServers).connectionTimeoutMs(1000)
.sessionTimeoutMs(10000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
curatorFramework.start();
try{
String nodePath = "/consumers/"+groupID+"/offsets/" + topic;
if(curatorFramework.checkExists().forPath(nodePath)!=null){
List<String> partitions=curatorFramework.getChildren().forPath(nodePath);
for(String partiton:partitions){
int partitionL=Integer.valueOf(partiton);
Long offset=objectMapper.readValue(curatorFramework.getData().forPath(nodePath+"/"+partiton),Long.class);
TopicAndPartition topicAndPartition=new TopicAndPartition(topic,partitionL);
retVals.put(topicAndPartition, offset);
}
}
}catch(Exception e){
e.printStackTrace();
}
curatorFramework.close();
return retVals;
}
public static void main(String[] args) {
Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic");
Set<Entry<TopicAndPartition, Long>> entrySet = consumerOffsets.entrySet();
for(Entry<TopicAndPartition, Long> entry : entrySet) {
TopicAndPartition topicAndPartition = entry.getKey();
String topic = topicAndPartition.topic();
int partition = topicAndPartition.partition();
Long offset = entry.getValue();
System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset);
}
}
}
三、使用zookeeper管理offset
public class UseZookeeperManageOffset {
/**
* 使用log4j打印日志,“UseZookeeper.class” 设置日志的产生类
*/
static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class);
public static void main(String[] args) {
/**
* 加载log4j的配置文件,方便打印日志
*/
ProjectUtil.LoadLogConfig();
logger.info("project is starting...");
/**
* 从kafka集群中得到topic每个分区中生产消息的最大偏移量位置
*/
Map<TopicAndPartition, Long> topicOffsets = GetTopicOffsetFromKafkaBroker.getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic");
/**
* 从zookeeper中获取当前topic每个分区 consumer 消费的offset位置
*/
Map<TopicAndPartition, Long> consumerOffsets =
GetTopicOffsetFromZookeeper.getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic");
/**
* 合并以上得到的两个offset ,
* 思路是:
* 如果zookeeper中读取到consumer的消费者偏移量,那么就zookeeper中当前的offset为准。
* 否则,如果在zookeeper中读取不到当前消费者组消费当前topic的offset,就是当前消费者组第一次消费当前的topic,
* offset设置为topic中消息的最大位置。
*/
if(null!=consumerOffsets && consumerOffsets.size()>0){
topicOffsets.putAll(consumerOffsets);
}
/**
* 如果将下面的代码解开,是将topicOffset 中当前topic对应的每个partition中消费的消息设置为0,就是从头开始。
*/
// for(Map.Entry<TopicAndPartition, Long> item:topicOffsets.entrySet()){
// item.setValue(0l);
// }
/**
* 构建SparkStreaming程序,从当前的offset消费消息
*/
JavaStreamingContext jsc = SparkStreamingDirect.getStreamingContext(topicOffsets,"zhy");
jsc.start();
jsc.awaitTermination();
jsc.close();
}
}