zoukankan      html  css  js  c++  java
  • python批量向kafka塞数据

    python批量向kafka塞数据

    from kafka import KafkaClient
    from kafka.producer import SimpleProducer
    from kafka import KafkaProducer
    
    
    
    
    def send_data_2_kafka_(topic, datas):
        '''
            向kafka解析队列发送数据
        '''
        print('### 开始塞入 ###')
        PARTNUM = 10
        TOPICNAME = topic
        KAFKABROKER = kafkabroker
        client = KafkaClient(hosts=KAFKABROKER, timeout=30)
        producer = SimpleProducer(client, async_send=False)
        curcount = int(len(datas)//PARTNUM)
        l = len(datas)
        if l != PARTNUM:
            curcount = curcount + 1
        logger.info("datas: %d" % l)
        print("curcount:===", curcount) # 1
        # if curcount == 0:
        #     curdata = datas
        #     future = producer.send_messages(TOPICNAME, *curdata)
        #     print(future)
        if l:

            for i in range(curcount): # 2
                start = i*PARTNUM
                # print("i === ", i) # i = 0
                # print("start === ", start)
                if i != curcount - 1:  # 1
                    if datas:
                        end = (i+1)*PARTNUM
                        curdata = datas[start:end]
                        future = producer.send_messages(TOPICNAME, *curdata)
                        print(future)
                else:
                    if datas:
                        curdata = datas[start:]
                        future = producer.send_messages(TOPICNAME, *curdata)
                        print(future)
                
        producer.stop()
        client.close()
        print('### 结束塞入 ###')
    
    
    
    
    
    def kafka_send_date(topic, data):
        logger.info('### 开始塞入 ###')
        # kafka生产者链接
        producer = KafkaProducer(bootstrap_servers='192.168.2.134:9092')
        # future = producer.send(topic, json.dumps(date).encode())
        # future = producer.send(topic, str(date).replace("'", '"').encode('utf-8'))
        data = json.dumps(data)
        r = bytes('{}'.format(data), 'utf-8')
        future = producer.send(topic, r)
        record_metadata = future.get(timeout=10)
        print(record_metadata, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        logger.info('### 结束塞入 ###')
  • 相关阅读:
    OpenStack Cinder组件支持的块存储设备表
    恒天云 3.0:打造基于OpenStack的私有云新模式
    看到一位园友博客有感
    android圆角功能,非常好用,可以用在图片,视频,gif等上面
    Android studio 3.0以上版本无法引入,找不到v4,v7包方案解决
    Android工程运用阿里freeline10秒快速编译分享
    Android监听view的attached或detached状态
    Android弹出Toast工具类总结
    为何IT开发人员如此辛苦?
    android TextView如何换行?
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/11887650.html
Copyright © 2011-2022 走看看