zoukankan      html  css  js  c++  java
  • python批量向kafka塞数据

    python批量向kafka塞数据

    from kafka import KafkaClient
    from kafka.producer import SimpleProducer
    from kafka import KafkaProducer
    
    
    
    
    def send_data_2_kafka_(topic, datas):
        '''
            向kafka解析队列发送数据
        '''
        print('### 开始塞入 ###')
        PARTNUM = 10
        TOPICNAME = topic
        KAFKABROKER = kafkabroker
        client = KafkaClient(hosts=KAFKABROKER, timeout=30)
        producer = SimpleProducer(client, async_send=False)
        curcount = int(len(datas)//PARTNUM)
        l = len(datas)
        if l != PARTNUM:
            curcount = curcount + 1
        logger.info("datas: %d" % l)
        print("curcount:===", curcount) # 1
        # if curcount == 0:
        #     curdata = datas
        #     future = producer.send_messages(TOPICNAME, *curdata)
        #     print(future)
        if l:

            for i in range(curcount): # 2
                start = i*PARTNUM
                # print("i === ", i) # i = 0
                # print("start === ", start)
                if i != curcount - 1:  # 1
                    if datas:
                        end = (i+1)*PARTNUM
                        curdata = datas[start:end]
                        future = producer.send_messages(TOPICNAME, *curdata)
                        print(future)
                else:
                    if datas:
                        curdata = datas[start:]
                        future = producer.send_messages(TOPICNAME, *curdata)
                        print(future)
                
        producer.stop()
        client.close()
        print('### 结束塞入 ###')
    
    
    
    
    
    def kafka_send_date(topic, data):
        logger.info('### 开始塞入 ###')
        # kafka生产者链接
        producer = KafkaProducer(bootstrap_servers='192.168.2.134:9092')
        # future = producer.send(topic, json.dumps(date).encode())
        # future = producer.send(topic, str(date).replace("'", '"').encode('utf-8'))
        data = json.dumps(data)
        r = bytes('{}'.format(data), 'utf-8')
        future = producer.send(topic, r)
        record_metadata = future.get(timeout=10)
        print(record_metadata, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        logger.info('### 结束塞入 ###')
  • 相关阅读:
    如何选择合适的开源消息中间件
    使用Rest访问Redis中的数据
    论消息队列在分布式系统的重要性
    grub-install: warning: this GPT partition label contains no BIOS Boot Partition; embedding won’t be possible Ubuntu使用BIOS启动时, GPT分区表下安装grub2报错 的解决办法
    Linux Ubuntu 16.04 启动后 桌面崩溃
    Linux Ubuntu 1604 grub2 rescue mod 启动
    EF自动探测更改
    C# 使用OracleParameter传递参数提示缺少表达式
    Gitlab安装后 500 错误 PostGre数据库无法启动
    DevExpress GridControl GridView多选状态下,代码赋值FocusedRowHandle,样式无变化
  • 原文地址:https://www.cnblogs.com/xiao-xue-di/p/11887650.html
Copyright © 2011-2022 走看看