#!/usr/bin/python import json from kafka import KafkaProducer import random import time topic='topic' #生产者写入数据格式为json格式,需要加value_serializer参数 producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers="这里是实例ip,端口号9092,多个ip用,隔开" ) for i in range(2): a=random.randint(10000000000,99999999999) b = random.randint(7200,14400) key=str(a)+"_755WX_"+transit_depot_no actual_depart_tm = int(time.time()) - b actual_depart_tmArray = time.localtime(actual_depart_tm) newactual_depart_tm = time.strftime("%Y-%m-%d %H:%M:%S", actual_depart_tmArray) data={ "key": "160972", "actual_depart_tm": "2020-12-21 15:35:00", "pre_send_batch_dt": "2020-12-21", "plan_arrive_batch_dt": "2020-12-21", data["key"] = key data["actual_depart_tm"] = newactual_depart_tm producer.send(topic, data) producer.close()