简要:开发中,常常因为需要我们要认为修改消费者实例对kafka某个主题消费的偏移量。具体如何修改?为什么可行?其实很容易,有时候只要我们换一种方式思考,如果我自己实现kafka消费者,我该如何让我们的消费者代码如何控制对某一个主题消费,以及我们该如何实现不同消费者组可以消费同一个主题的同一条消息,一个消费组下不同消费者消费同一个主题的不同消息。如果让你实现该框架该如何实现?
原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6212913.html
可接网站开发,java开发。
新浪微博:intsmaze刘洋洋哥
微信:intsmaze
创建一个kafka主题名为intsmazX,指定分区数为3.
使用kafkaspout创建该主题的消费者实例(指定元数据存放zookeeper中的路径为/kafka-offset,指定实例id为onetest),启动storm可以观察到如下信息:
INFO storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager connections
INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_0 --> null //这个地方会到zookeeper中该目录下读取,看是否存储有对该分区的消费信息
INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset//没有分区信息,这个时候就会直接到kafka的broker中得到该分区的最大偏移量
INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
INFO storm.kafka.PartitionManager - Starting Kafka hadoop002.icccuat.com:0 from offset 0
INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_1 --> null
INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
INFO storm.kafka.PartitionManager - Starting Kafka hadoop003.icccuat.com:1 from offset 0
INFO storm.kafka.PartitionManager - Read partition information from: /kafka-offset/onetest/partition_2 --> null
INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset
INFO storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
INFO storm.kafka.PartitionManager - Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
INFO storm.kafka.PartitionManager - Starting Kafka hadoop001.icccuat.com:2 from offset 0
这个时候在zookeeper的/kafka-offset下没有生成名为onetest的目录,这是因为对应的intsmazeX还没有数据产生。
我们使用kafka消费者生产3条数据,然后去查看zookeeper中对应目录下的信息:
{"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":1,"broker":{"host":"hadoop003.icccuat.com","port":6667},"topic":"intsmazeX"}
{"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":2,"broker":{"host":"hadoop001.icccuat.com","port":6667},"topic":"intsmazeX"}
{"topology":{"id":"34e94ae4-a0a0-41e9-a360-d0ab648fe196","name":"intsmaze-20161222-143121"},"offset":1,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"}
30秒(kafkaspout中设置提交zookeeper消费偏移量时间为30秒)之后,可以看到,会记录该实例对每一个分区消费的偏移量为1.
杀掉该拓扑,这个时候我们再向intsmazeX主题生产6条数据,这个时候,broker中该主题每个分区的最大偏移量为3了。
然后我们修改/kafka-offset/onttest/下每一个分区的offset为3.
这个时候,我们再次部署该拓扑,可以发现拓扑没有消费刚刚产生的6条消息。再发送3条消息,拓扑就会立马消费这三条消息。
杀掉该拓扑,这个时候该拓扑消费者实例对每个分区的消费偏移量就是4了,然后我们把offset修改为6,然后启动拓扑,这个时候broker中该主题每个分区的最大偏移量为4并不是6,让我们看看,消费分区的偏移量大于主题分区当前偏移量会有什么样的情况出现。
WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2]
WARN storm.kafka.PartitionManager - Using new offset: 4
WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2]
WARN storm.kafka.PartitionManager - Using new offset: 4
WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [6]; retrying with default start offset time from configuration. configured start offset time: [-2]
WARN storm.kafka.PartitionManager - Using new offset: 4
这个时候我们看到,消费者的分区偏移量的记录将会自动同步为每一个分区当前最大的偏移量了,kafkaspout会先用偏移量6去拉去,发现拉去不到,就到broker中获取该主题对应分区的最大偏移量。。
{"topology":{"id":"818ab9cc-d56f-454f-88b2-06dd830d54c1","name":"intsmaze-20161222-150006"},"offset":4,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"}
....
把offset的偏移量设置为7000,一样在拓扑启动后,会更新为每个分区的最大偏移量。
重新部署一个拓扑消费该主题,设置该拓扑的id为twotest,这个时候启动拓扑,我们发现,并没有启动拓扑前的消息数据,这是因为,拓扑启动后,要获得偏移量,而这个偏移量只能是当前主题每个分区的最大偏移量(因为分区的偏移量是递增,且
分区的数据会定时删除的,所以无法知道当前分区当前最开始的偏移量。)
Refreshing partition manager connections
Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
Deleted partition managers: []
New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
Read partition information from: /kafka-offset/twotest/partition_0 --> null
No partition information found, using configuration to determine offset
Starting Kafka hadoop002.icccuat.com:0 from offset 7
Read partition information from: /kafka-offset/twotest/partition_1 --> null
No partition information found, using configuration to determine offset
Starting Kafka hadoop003.icccuat.com:1 from offset 7
Read partition information from: /kafka-offset/twotest/partition_2 --> null
No partition information found, using configuration to determine offset
Starting Kafka hadoop001.icccuat.com:2 from offset 7
Finished refreshing
Refreshing partition manager connections
Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
Deleted partition managers: []
New partition managers: []
Finished refreshing
Refreshing partition manager connections
Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
Deleted partition managers: []
New partition managers: []
Finished refreshing
Refreshing partition manager connections
Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
Deleted partition managers: []
New partition managers: []
Finished refreshing
Refreshing partition manager connections
Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop003.icccuat.com:6667, partition=1}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
Deleted partition managers: []
New partition managers: []
发送三条信息,查看该实例目录如下。
{"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"offset":8,"partition":0,"broker":{"host":"hadoop002.icccuat.com","port":6667},"topic":"intsmazeX"}
再启动一个拓扑,实例为twotest不变:
[INFO] Task [1/2] Refreshing partition manager connections
[INFO] Task [2/2] Refreshing partition manager connections
[INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
[INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
[INFO] Task [1/2] assigned [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
[INFO] Task [2/2] assigned [Partition{host=hadoop003.icccuat.com:6667, partition=1}]
[INFO] Task [1/2] Deleted partition managers: []
[INFO] Task [2/2] Deleted partition managers: []
[INFO] Task [1/2] New partition managers: [Partition{host=hadoop002.icccuat.com:6667, partition=0}, Partition{host=hadoop001.icccuat.com:6667, partition=2}]
[INFO] Task [2/2] New partition managers: [Partition{host=hadoop003.icccuat.com:6667, partition=1}]
[INFO] Read partition information from: /kafka-offset/twotest/partition_0 --> {"topic":"intsmazeX","partition":0,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop002.icccuat.com"},"offset":8}
[INFO] Read partition information from: /kafka-offset/twotest/partition_1 --> {"topic":"intsmazeX","partition":1,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop003.icccuat.com"},"offset":8}
[INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af
[INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af
[INFO] Starting Kafka hadoop002.icccuat.com:0 from offset 8
[INFO] Starting Kafka hadoop003.icccuat.com:1 from offset 8
[INFO] Task [2/2] Finished refreshing
[INFO] Read partition information from: /kafka-offset/twotest/partition_2 --> {"topic":"intsmazeX","partition":2,"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"broker":{"port":6667,"host":"hadoop001.icccuat.com"},"offset":8}
[INFO] Read last commit offset from zookeeper: 8; old topology_id: 3d6a5f80-357f-4591-8e5c-b3d4d2403dfe - new topology_id: 348af8da-994a-4cdb-a629-e4bf107348af
[INFO] Starting Kafka hadoop001.icccuat.com:2 from offset 8
[INFO] Task [1/2] Finished refreshing
[INFO] Task [2/2] Refreshing partition manager connections
[INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=hadoop002.icccuat.com:6667, 1=hadoop003.icccuat.com:6667, 2=hadoop001.icccuat.com:6667}}
[INFO] Task [2/2] assigned [Partition{host=hadoop003.icccuat.com:6667, partition=1}]
[INFO] Task [1/2] Refreshing partition manager connections
[INFO] Task [2/2] Deleted partition managers: []
[INFO] Task [2/2] New partition managers: []
{"topology":{"id":"3d6a5f80-357f-4591-8e5c-b3d4d2403dfe","name":"demo-20161222-152236"},"offset":8,"partition":1,"broker":{"host":"hadoop003.icccuat.com","port":6667},"topic":"intsmazeX"}
然后发送消息,我们可以看到两个拓扑都会运行的,因为两个拓扑共用一个元数据信息。
这个过程有些坑要注意:
1:在使用kafka-spout的时候,我们要指定该kafka消费者在zookeeper中存储偏移量的地址,这里是/kafka-offset。同时指定该kafka对应的实例id这里是onetest.kafkapout和kafka客户端代码不一样,它没有消费组的概念,也不能这样说吧,只能说数据的存放不一样,不同的实例代表
不同的消费组。
2:修改某一个kafkaspout实例的时候,我们一定要把该id的拓扑关闭掉,我们在项目中遇到一个大坑,就是不熟一样的kafkaspout它的id是相同的,也就是共用同一个目录,那么如果我们没有下线这些拓扑任务,而只是把
这些拓扑任务设置为不活跃状态,那么我们修改zookeeper中偏移量后,再把拓扑设置为活跃状态后,会发现修改无效,offset还是变为以前的offset了,这是因为拓扑没有杀掉,它的运行程序中也会保存当前消费的偏移量,会定时更新的。
3:我们在杀拓扑时,要设置时间,因为拓扑默认30秒向zookeeper提交一下偏移量信息。
修改偏移量有两种,一种就是在部署拓扑前,先修改zookeeper中的偏移量,或者直接删除zookeeper中的对应实例的目录。这样从新部署都会从最新的偏移量开始运行。
下面的是我当初自己学习kafka时,思考自己写kafka时,该如何解决kafka的消费者和消费组之间对数据消费时的判断。虽然框架极大简化了我们的生产力,但是作为一个有
思想的程序员,我们应该换一个角度去思考一个框架,而不应该再是这个框架有什么功能,我们用这个框架的这个功能,这样下去,我们就会一直认为这个框架好厉害,却不明白其内部实现方式。
如果自己要实现kafka功能: