一. 部署
- pull wurstmeister/zookeeper
sudo docker pull wurstmeister/zookeeper
- pull wurstmeister/kafka
sudo docker pull wurstmeister/kafka
- 启动zookeeper
sudo docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
- 启动kafka1
sudo docker run -d -t --name kafka1 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.18.166:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.18.166:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
- 启动kafka2
noted: -p 9093:9092 端口必须如此设置,不然会导致连接不到kafka2
sudo docker run -d -t --name kafka2 -p 9093:9092 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.18.166:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.18.166:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 wurstmeister/kafka
- 两个kafka中server.properties 修改num.partitions=2 ,表示2个分区
# 两个分区
num.partitions=2
# 两个副本
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
- 重启kafka container
二:两个group 消费
- product
import json
from kafka import KafkaProducer
def sendMsg(topic,msg_dict):
producer = KafkaProducer(bootstrap_servers=["192.168.18.166:9092","192.168.18.166:9093"],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
'''send json String to kafka '''
producer.send(topic, msg_dict)
producer.close()
if __name__ == '__main__':
for i in range(10):
sendMsg("peter.test_cluser",str(str(i)+'11'))
print("over"+str(str(i)+'10'))
sendMsg("json",msg_dict)
- 两个consumer指定分区消费,,如果不指定分区,则消费全部消息
#consumer 1
from kafka import KafkaConsumer
import logging
import json
import datetime
from kafka import TopicPartition
def main():
#consumer = KafkaConsumer( "peter.test_cluser",group_id="peter_consumer_cluser3", max_poll_records=5, max_poll_interval_ms=600000,
consumer = KafkaConsumer( group_id="peter_consumer_cluser1", max_poll_records=5, max_poll_interval_ms=600000,
#enable_auto_commit =False,
bootstrap_servers=["192.168.18.166:9092","192.168.18.166:9093"], value_deserializer=json.loads)
print("start consumer",str(consumer))
consumer.assign([TopicPartition('peter.test_cluser', 0)]) # 指定topic 和指定分区消费
for message in consumer:
# print(str(message.offset()))
print("receive label message")
if message:
try:
print("@@@@@ ---> consumer_cluser1 get new message ",str(message.value))
#consumer.commit()
except Exception as e:
logging.error("@@----> Exception : ")
logging.error(e)
traceback.print_exc()
if __name__ == '__main__':
main()
consumer2
from kafka import KafkaConsumer
import logging
import json
import datetime
from kafka import TopicPartition
def main():
#consumer = KafkaConsumer( "peter.test_cluser",group_id="peter_consumer_cluser3", max_poll_records=5, max_poll_interval_ms=600000,
consumer = KafkaConsumer( group_id="peter_consumer_cluser2", max_poll_records=5, max_poll_interval_ms=600000,
#enable_auto_commit =False,
bootstrap_servers=["192.168.18.166:9092","192.168.18.166:9093"], value_deserializer=json.loads)
print("start consumer",str(consumer))
consumer.assign([TopicPartition('peter.test_cluser', 1)])# 指定topic 和指定分区消费
for message in consumer:
# print(str(message.offset()))
print("receive label message")
if message:
try:
print("@@@@@ ---> consumer_cluser1 get new message ",str(message.value))
#consumer.commit()
except Exception as e:
logging.error("@@----> Exception : ")
logging.error(e)
traceback.print_exc()
if __name__ == '__main__':
main()
三. 同一个组,两个consumer交替消费
- product 同上中的
- 两个consumer 使用同一个group
consumer1:
from kafka import KafkaConsumer
import logging
import json
import datetime
from kafka import TopicPartition
def main():
consumer = KafkaConsumer( "peter.test_cluser",group_id="peter_consumer_cluser3", max_poll_records=5, max_poll_interval_ms=600000,
#consumer = KafkaConsumer( group_id="peter_consumer_cluser2", max_poll_records=5, max_poll_interval_ms=600000,
#enable_auto_commit =False,
bootstrap_servers=["192.168.18.166:9092","192.168.18.166:9093"], value_deserializer=json.loads)
print("start consumer",str(consumer))
#consumer.assign([TopicPartition('peter.test_cluser', 1)])
for message in consumer:
# print(str(message.offset()))
print("receive label message")
if message:
try:
print("@@@@@ ---> consumer_cluser1 get new message ",str(message.value))
#consumer.commit()
except Exception as e:
logging.error("@@----> Exception : ")
logging.error(e)
traceback.print_exc()
if __name__ == '__main__':
main()
consumer2:
from kafka import KafkaConsumer
import logging
import json
import datetime
from kafka import TopicPartition
def main():
consumer = KafkaConsumer( "peter.test_cluser",group_id="peter_consumer_cluser3", max_poll_records=5, max_poll_interval_ms=600000,
#consumer = KafkaConsumer( group_id="peter_consumer_cluser2", max_poll_records=5, max_poll_interval_ms=600000,
#enable_auto_commit =False,
bootstrap_servers=["192.168.18.166:9092","192.168.18.166:9093"], value_deserializer=json.loads)
print("start consumer",str(consumer))
#consumer.assign([TopicPartition('peter.test_cluser', 1)])
for message in consumer:
# print(str(message.offset()))
print("receive label message")
if message:
try:
print("@@@@@ ---> consumer_cluser1 get new message ",str(message.value))
#consumer.commit()
except Exception as e:
logging.error("@@----> Exception : ")
logging.error(e)
traceback.print_exc()
if __name__ == '__main__':
main()