zoukankan      html  css  js  c++  java
  • Shell 脚本并发数据到kafka topic

    shell脚本并发数据到kafka topic

    需求:

    每秒发送大量数据到kafka,验证下游系统性能,数据中的时间戳要求为当前时间,可以之间采集系统当前时间替换文件中旧的时间戳,保证每次发送的数据都为最新时间。

    利用kafka自带的脚本,将待发数据写入文件中,然后通过读取文件 方式,将数据批量发送到kafka task节点

    #在kafka master 节点用户home 目录下创建data 目录
    [root@node1 home]# mkdir data
    #进入data目录
    [root@node1 home]# cd data/
    #将准备好的数据放入data目录中
    [root@node1 data]# ls
    batch1-1000.log  batch2-1000.log  batch-send.sh
    #编写batch-send.sh 脚本
    [root@node1 data]# vi batch-send.sh
    

    shell content:

    #!/bin/bash
    
    #响应Ctrl+C中断
    trap 'onCtrlC' INT
    function onCtrlC () {
        echo 'Ctrl+C is captured'
        exit 1
    }
    
    #kafka及data所在目录
    dataPath=/home/data
    kafkaPath=/usr/local/kafka
    #broker list
    brokerlist=192.168.1.10:9092,192.168.1.11:9092,192.168.1.12:9092
    #kafka的topic,这里设置发送到2个kafka topic
    topic1=test-topic1
    topic2=test-topic2
    #消息总数,可根据需要配置,实际数量是*num后的数,num取决于数据文件中的数据量
    totalNum=100
    #循环计时参数
    batchNum=10
    
    #开始时间
    start=$(date +%s)
    #打印循环开始时间
    echo $(date)
    
    for ((i=1; i<=${totalNum}; i ++))
    do
    {
        #取余数
        modVal=$(( ${i} % ${batchNum} ))
    
        #如果循环计数达到设置参数,就发送一批次消息
        if [ ${modVal} = 0 ] ; then
         #在控制台显示进度,*num,取决于数据文件中的数据量     
          echo “$(( ${i}*100 )) of $(( ${totalNum}*100 )) sent”
         # 获取每次循环发送的当前时间
         current=`date "+%Y-%m-%d %H:%M:%S"`
         # 将时间转为UTC 时间,精确到秒
         timeStamp=`date -d "$current" +%s`
         #将current转换为时间戳,精确到毫秒,10#`date "+%N"是为了避免循环过程中出现error:value too great for base
         currentTimeStamp=$((timeStamp*1000+10#`date "+%N"`/1000000))
         #每次循环发送数据前,都将数据文件中的recvTs 参数改为当前时间,放在前台执行,由于是顺序执行,需要等待文件修改完后,才会下一步执行发送文件数据给kafka,每个文件中1000条数据
          sed -i  's/("recvTs":)[0-9]*(,)/1'$currentTimeStamp',/g' ${dataPath}/batch1-1000.log ${dataPath}/batch2-1000.log
    
          #批量发送消息,并且将控制台返回的提示符重定向到/dev/null,&放到后台并发执行
          cat ${dataPath}/batch1-1000.log | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic1} | > /dev/null &
          cat ${dataPath}/batch1-1000.log | ${kafkaPath}/bin/kafka-console-producer.sh --broker-list ${brokerlist} --sync --topic ${topic1} | > /dev/null &
        fi
    }
    #每个for循环等待0.1秒,batchNum=10,即每秒执行一次数据发送,
    sleep 0.1
    done
    #脚本结束时间,并统计脚本执行时间,打印到控制台
    end=$(date +%s)
    echo $(date)
    echo $(( $end - $start ))
    

    脚本执行效果

    [root@node1 data]# ./batch-send.sh 
    Sat Jan 30 16:15:53 CST 2021
    “1000 of 10000 sent”
    “2000 of 10000 sent”
    “3000 of 10000 sent”
    “4000 of 10000 sent”
    “5000 of 10000 sent”
    “6000 of 10000 sent”
    “7000 of 10000 sent”
    “8000 of 10000 sent”
    “9000 of 10000 sent”
    “10000 of 10000 sent”
    Sat Jan 30 16:16:04 CST 2021
    11
    [root@node1 data]# 
    

    这可以通过修改文件中的数据量和循环计时参数,实现每秒发送10000 甚至100000 条数据到kafka。

    ***************用努力照亮现实的梦!***********************
  • 相关阅读:
    docker Dockerfile文件的编写部分命令
    docker命令总结
    docker安装笔记
    在docker容器下利用数据卷实现在删除了mysql容器或者镜像的情况下恢复数据
    在docker下运行mysql
    mysql在docker下运行,出现中文乱码
    group by问题
    python中安装requests后又提示错误
    python安装HTMLTestRunner
    python接口测试中安装whl格式的requests第三方模块
  • 原文地址:https://www.cnblogs.com/orange2016/p/14349450.html
Copyright © 2011-2022 走看看