zoukankan      html  css  js  c++  java
  • python批量向kafka塞数据

    python批量向kafka塞数据

    from kafka import KafkaClient
    from kafka.producer import SimpleProducer
    from kafka import KafkaProducer
    
    
    
    
    def send_data_2_kafka_(topic, datas):
        '''
            向kafka解析队列发送数据
        '''
        print('### 开始塞入 ###')
        PARTNUM = 10
        TOPICNAME = topic
        KAFKABROKER = kafkabroker
        client = KafkaClient(hosts=KAFKABROKER, timeout=30)
        producer = SimpleProducer(client, async_send=False)
        curcount = int(len(datas)//PARTNUM)
        l = len(datas)
        if l != PARTNUM:
            curcount = curcount + 1
        logger.info("datas: %d" % l)
        print("curcount:===", curcount) # 1
        # if curcount == 0:
        #     curdata = datas
        #     future = producer.send_messages(TOPICNAME, *curdata)
        #     print(future)
        if l:

            for i in range(curcount): # 2
                start = i*PARTNUM
                # print("i === ", i) # i = 0
                # print("start === ", start)
                if i != curcount - 1:  # 1
                    if datas:
                        end = (i+1)*PARTNUM
                        curdata = datas[start:end]
                        future = producer.send_messages(TOPICNAME, *curdata)
                        print(future)
                else:
                    if datas:
                        curdata = datas[start:]
                        future = producer.send_messages(TOPICNAME, *curdata)
                        print(future)
                
        producer.stop()
        client.close()
        print('### 结束塞入 ###')
    
    
    
    
    
    def kafka_send_date(topic, data):
        logger.info('### 开始塞入 ###')
        # kafka生产者链接
        producer = KafkaProducer(bootstrap_servers='192.168.2.134:9092')
        # future = producer.send(topic, json.dumps(date).encode())
        # future = producer.send(topic, str(date).replace("'", '"').encode('utf-8'))
        data = json.dumps(data)
        r = bytes('{}'.format(data), 'utf-8')
        future = producer.send(topic, r)
        record_metadata = future.get(timeout=10)
        print(record_metadata, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        logger.info('### 结束塞入 ###')
  • 相关阅读:
    退出窗口时出现“当”的响声
    屏幕设备环境
    修改一个完全颜色的CListCtrl类
    修改一个完全颜色的CListCtrl类
    MFC中CString.Format的详细用法
    网上阅卷系统自动识别功能代码
    mfc 子对话框数据传给父对话框
    already defined in *.obj
    Object 的使用
    this 函数执行上下文
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/11887650.html
Copyright © 2011-2022 走看看