- 生产的代码比较简单:
import json
from kafka import KafkaProducer
from decimal import Decimal
import decimal
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
return float(o)
super(DecimalEncoder, self).default(o)
def producer():
host_port = 'localhost:9095'
topic = '1111'
producer = KafkaProducer(bootstrap_servers=[host_port], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
msg_dict = {
"sleep_time": 10,
"db_config": {
"database": "test_1",
"host": "xxxx",
"user": "root",
"password": "root"
},
"table": "msg",
"msg": "Hello World"
}
msg_dict = {'price_cost': Decimal('6729.0716'), 'location_id': 193, 'product_id': 842892}
msg = json.dumps(msg_dict,cls=DecimalEncoder)
producer.send(topic, msg, partition=0)
print('send success')
producer.close()
if __name__ == '__main__':
producer()
- 消费的代码涉及到的东西比较多,个人觉得群组功能挺不错的:
import json
from pykafka import KafkaClient
def customer2():
hosts = 'localhost:9095,localhost:9092'
client = KafkaClient(hosts=hosts)
topic = client.topics['1111']
consumer = topic.get_simple_consumer(consumer_group=b'123456', auto_commit_interval_ms=1,
auto_commit_enable=True)
for msg in consumer:
a = json.loads(msg.value,encoding='utf-8')
print(a)
if __name__ == '__main__':
customer2()
问答:
- kafka,在没有消费端消费消息的情况下,生产消息,启动消费端获取不到数据,在启动消费端后推送消息又可以获取到数据?
消费端使用pykafka
而不是使用kafka
,使用get_simple_consumer
方法来获取数据,基本上每次指定一个新的group都会获取前边所有历史数据
- 消费端指定多个host?
host使用逗号隔开.