python批量向kafka塞数据
from kafka import KafkaClient from kafka.producer import SimpleProducer from kafka import KafkaProducer
def send_data_2_kafka_(topic, datas):
'''
向kafka解析队列发送数据
'''
print('### 开始塞入 ###')
PARTNUM = 10
TOPICNAME = topic
KAFKABROKER = kafkabroker
client = KafkaClient(hosts=KAFKABROKER, timeout=30)
producer = SimpleProducer(client, async_send=False)
curcount = int(len(datas)//PARTNUM)
l = len(datas)
if l != PARTNUM:
curcount = curcount + 1
logger.info("datas: %d" % l)
print("curcount:===", curcount) # 1
# if curcount == 0:
# curdata = datas
# future = producer.send_messages(TOPICNAME, *curdata)
# print(future)
if l:
for i in range(curcount): # 2
start = i*PARTNUM
# print("i === ", i) # i = 0
# print("start === ", start)
if i != curcount - 1: # 1
if datas:
end = (i+1)*PARTNUM
curdata = datas[start:end]
future = producer.send_messages(TOPICNAME, *curdata)
print(future)
else:
if datas:
curdata = datas[start:]
future = producer.send_messages(TOPICNAME, *curdata)
print(future)
producer.stop()
client.close()
print('### 结束塞入 ###')
def kafka_send_date(topic, data): logger.info('### 开始塞入 ###') # kafka生产者链接 producer = KafkaProducer(bootstrap_servers='192.168.2.134:9092') # future = producer.send(topic, json.dumps(date).encode()) # future = producer.send(topic, str(date).replace("'", '"').encode('utf-8')) data = json.dumps(data) r = bytes('{}'.format(data), 'utf-8') future = producer.send(topic, r) record_metadata = future.get(timeout=10) print(record_metadata, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) logger.info('### 结束塞入 ###')