zoukankan      html  css  js  c++  java
  • python批量向kafka塞数据

    python批量向kafka塞数据

    from kafka import KafkaClient
    from kafka.producer import SimpleProducer
    from kafka import KafkaProducer
    
    
    
    
    def send_data_2_kafka_(topic, datas):
        '''
            向kafka解析队列发送数据
        '''
        print('### 开始塞入 ###')
        PARTNUM = 10
        TOPICNAME = topic
        KAFKABROKER = kafkabroker
        client = KafkaClient(hosts=KAFKABROKER, timeout=30)
        producer = SimpleProducer(client, async_send=False)
        curcount = int(len(datas)//PARTNUM)
        l = len(datas)
        if l != PARTNUM:
            curcount = curcount + 1
        logger.info("datas: %d" % l)
        print("curcount:===", curcount) # 1
        # if curcount == 0:
        #     curdata = datas
        #     future = producer.send_messages(TOPICNAME, *curdata)
        #     print(future)
        if l:

            for i in range(curcount): # 2
                start = i*PARTNUM
                # print("i === ", i) # i = 0
                # print("start === ", start)
                if i != curcount - 1:  # 1
                    if datas:
                        end = (i+1)*PARTNUM
                        curdata = datas[start:end]
                        future = producer.send_messages(TOPICNAME, *curdata)
                        print(future)
                else:
                    if datas:
                        curdata = datas[start:]
                        future = producer.send_messages(TOPICNAME, *curdata)
                        print(future)
                
        producer.stop()
        client.close()
        print('### 结束塞入 ###')
    
    
    
    
    
    def kafka_send_date(topic, data):
        logger.info('### 开始塞入 ###')
        # kafka生产者链接
        producer = KafkaProducer(bootstrap_servers='192.168.2.134:9092')
        # future = producer.send(topic, json.dumps(date).encode())
        # future = producer.send(topic, str(date).replace("'", '"').encode('utf-8'))
        data = json.dumps(data)
        r = bytes('{}'.format(data), 'utf-8')
        future = producer.send(topic, r)
        record_metadata = future.get(timeout=10)
        print(record_metadata, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        logger.info('### 结束塞入 ###')
  • 相关阅读:
    jQuery each的实现与call方法的详细介绍
    转载Entity Framework 5.0(EF first)中的添加,删除,修改,查询,状态跟踪操作
    转载有个小孩跟我说LINQ(重点讲述Linq中GroupBy的原理及用法)
    luogu P3305 [SDOI2013]费用流
    bzoj 4819: [Sdoi2017]新生舞会
    bzoj4817: [Sdoi2017]树点涂色
    bzoj4816: [Sdoi2017]数字表格
    bzoj 4818: [Sdoi2017]序列计数
    [JSOI2007]重要的城市(x)
    BZOJ 1009 [HNOI2008]GT考试
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/11887650.html
Copyright © 2011-2022 走看看