zoukankan      html  css  js  c++  java
  • kafka命令

    group=orchestrate-factory && export KAFKA_HEAP_OPTS="" && /kafka/bin/kafka-consumer-groups.sh --command-config config/consumer.properties --describe --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --group $group | awk '{print $5}'|awk '{sum+=$1} END {print sum}'
    

      

    kafka鉴权监控

     

    #!/bin/bash
    if [ -f /tmp/kafkachecklock ];then
        Ntime=`date +%s`
        Ftime=`date -d "$(stat /tmp/kafkachecklock|grep Modify|awk '{print $2,$3}'|awk -F '.' '{print $1}')" +%s`
        if [ $((Ntime-Ftime)) -gt 153 ];then
                    rm -f /tmp/kafkachecklock
        fi
        kill 0
    else
        dockerid=`docker ps|grep "kafka"|grep -vE "pause|side"|awk '{print $1}'`
        localIP=`ip addr|grep -A5 "^2:"|grep inet|awk 'NR==1{print $2}'|cut -d "/" -f1`
            #echo "dockerid: $dockerid"
            #echo "localIP: $localIP"
        rm -f /tmp/kafka_total
        if [ x"$dockerid" != x"" ];then
           touch /tmp/kafkachecklock
           Dockerpid=`docker inspect -f {{.State.Pid}} $dockerid`
           if [ "`nsenter -n -m -p -u -t $Dockerpid ls /lib/libjli.so`" == "" ];then
              nsenter -n -m -p -u -t $Dockerpid cp /usr/lib/jvm/java-1.8-openjdk/lib/amd64/jli/libjli.so /lib/libjli.so
           fi
           nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --list|grep -v KMOffsetCache > /tmp/kafkaItem
           for i in `cat /tmp/kafkaItem`;do
               nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --describe --bootstrap-server kafka-1.default.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --group $i|grep -vE "^$|TOPIC"  > /tmp/kafkaN
           #    sleep 2
            j=1
            while [ $j -le `cat /tmp/kafkaN|sort|uniq|wc -l` ]
            do
            TOPIC=`cat /tmp/kafkaN|awk NR==$j'{print $1}'`
            PARTITION=`cat /tmp/kafkaN|awk NR==$j'{print $2}'`
            CURRENT_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $3}'`
            LOG_END_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $4}'`
            LAG=`cat /tmp/kafkaN|awk NR==$j'{print $5}'`
            CONSUMER_ID=`cat /tmp/kafkaN|awk NR==$j'{print $6}'`
            HOST=`cat /tmp/kafkaN|awk NR==$j'{print $7}'`
            CLIENT_ID=`cat /tmp/kafkaN|awk NR==$j'{print $8}'`
            if [ $LAG == "-" ];then    LAG=0;fi
            if [ $CURRENT_OFFSET == "-" ];then CURRENT_OFFSET=0;fi
            if [ $LOG_END_OFFSET == "-" ];then LOG_END_OFFSET=0;fi
            
            echo "KAFKA_CURRENT_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $CURRENT_OFFSET"   >>/tmp/kafka_total
            echo "KAFKA_LOG_END_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LOG_END_OFFSET"   >>/tmp/kafka_total
            echo "KAFKA_LAG{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LAG"   >>/tmp/kafka_total
            j=$((j+1))
            done
    
           done
        cat /tmp/kafka_total|curl --data-binary @- http://$localIP:9091/metrics/job/kafka"`echo $localIP|awk -F '.' '{print $NF}'`"
        fi
        rm -f /tmp/kafkachecklock
    fi
    #add to /etc/crontab
    if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then
            sudo cp -f $0 /etc/
            sudo chmod +x /etc/$0
            sudo bash -c "echo '*/1 * * * * root /etc/$0'>>/etc/crontab"
    fi
    View Code

    kafka监控

     
    #!/bin/bash
    if [ -f /tmp/kafkachecklock ];then
        Ntime=`date +%s`
        Ftime=`date -d "$(stat /tmp/kafkachecklock|grep Modify|awk '{print $2,$3}'|awk -F '.' '{print $1}')" +%s`
        if [ $((Ntime-Ftime)) -gt 153 ];then
                    rm -f /tmp/kafkachecklock
        fi
        kill 0
    else
        dockerid=`docker ps|grep "kafka"|grep -v "pause"|awk '{print $1}'`
        kafkaNum=`docker ps|grep "kafka"|grep -v "pause"|awk '{print $NF}'|awk -F "_" '{print $2}'|cut -d "." -f1`
        localIP=`ip addr|grep -A5 "^2:"|grep inet|awk 'NR==1{print $2}'|cut -d "/" -f1`
        rm -f /tmp/kafka_total${kafkaNum}
        if [ x"$dockerid" != x"" ];then
           touch /tmp/kafkachecklock
           Dockerpid=`docker inspect -f {{.State.Pid}} $dockerid`
           if [ $(nsenter -n -m -p -u -t $Dockerpid ls /lib/libjli.so|wc -l) -eq 0 ];then
              nsenter -n -m -p -u -t $Dockerpid cp -f /usr/lib/jvm/java-1.8-openjdk/jre/lib/amd64/jli/libjli.so /lib/libjli.so
           fi
           nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --bootstrap-server $kafkaNum.default.svc.cluster.local:9092 --list|grep -v KMOffsetCache > /tmp/kafkaItem
           for i in `cat /tmp/kafkaItem`;do
               nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --describe --bootstrap-server $kafkaNum.default.svc.cluster.local:9092 --group $i|grep -vE "^$|TOPIC"  > /tmp/kafkaN
           #    sleep 2
            j=1
            while [ $j -le `cat /tmp/kafkaN|wc -l` ]
            do
            TOPIC=`cat /tmp/kafkaN|awk NR==$j'{print $1}'`
            PARTITION=`cat /tmp/kafkaN|awk NR==$j'{print $2}'`
            CURRENT_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $3}'`
            LOG_END_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $4}'`
            LAG=`cat /tmp/kafkaN|awk NR==$j'{print $5}'`
            CONSUMER_ID=`cat /tmp/kafkaN|awk NR==$j'{print $6}'`
            HOST=`cat /tmp/kafkaN|awk NR==$j'{print $7}'`
            CLIENT_ID=`cat /tmp/kafkaN|awk NR==$j'{print $8}'`
            if [ $LAG == "-" ];then    LAG=0;fi
            if [ $CURRENT_OFFSET == "-" ];then CURRENT_OFFSET=0;fi
            if [ $LOG_END_OFFSET == "-" ];then LOG_END_OFFSET=0;fi
            
            echo "KAFKA_CURRENT_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $CURRENT_OFFSET"   >>/tmp/kafka_total${kafkaNum}
            echo "KAFKA_LOG_END_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LOG_END_OFFSET"   >>/tmp/kafka_total${kafkaNum}
            echo "KAFKA_LAG{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LAG"   >>/tmp/kafka_total${kafkaNum}
            j=$((j+1))
            done
    
           done
        cat /tmp/kafka_total${kafkaNum}|curl --data-binary @- http://$localIP:9091/metrics/job/kafka"`echo $localIP|awk -F '.' '{print $NF}'`"
        fi
        rm -f /tmp/kafkachecklock
    fi
    #add to /etc/crontab
    if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then
            sudo cp -f $0 /etc/
            sudo chmod +x /etc/$0
            sudo bash -c "echo '*/1 * * * * root /etc/$0'>>/etc/crontab"
    fi
    View Code

    清空普罗米修斯pushgateway

    #!/bin/bash
    Ntime=`date +%s`
    LOCALIP=`ip addr |grep $(ip addr|grep ^2:|awk '{print $2}'|cut -d ":" -f1)|awk 'NR==2{print $2}'|cut -d "/" -f1`
    curl http://$LOCALIP:9091/metrics|grep push_time_seconds|grep -v "#" > /tmp/metricsjob
    for i in $(cat /tmp/metricsjob|awk '{print $NF}');do
        JOBTIME=$(echo $i|awk '{print sprintf("%d", $0);}')
        if [ $((Ntime-JOBTIME)) -gt 600 ];then
            JOB=$(cat /tmp/metricsjob|grep $i|awk -F "job=" '{print $2}'|awk -F '"' '{print $2}')
            curl -X DELETE http://$LOCALIP:9091/metrics/job/$JOB         
            echo $JOB >>/tmp/deletejobs
        fi
    done
    #add to /etc/crontab
    if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then
            sudo cp -f $0 /etc/
            sudo chmod +x /etc/$0
            sudo bash -c "echo '*/3 * * * * root /etc/$0'>>/etc/crontab"
    fi
    View Code

    查看list

    export KAFKA_HEAP_OPTS=""
    /kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --list

    查看详细信息

    /kafka/bin/kafka-consumer-groups.sh --command-config config/consumer.properties --describe --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --group baseBusiness

     kafka集群部署

    https://www.cnblogs.com/saneri/p/8762168.html
    
    #!/bin/bash
    # check zk status
    for i in {1..3};do
    kubectl exec -n basis -it  `kubectl get po -n basis|grep zookeeper-$i| awk '{print $1}'` -- bash  2>&1 << EOF
    export JVMFLAGS=""
    /opt/zookeeper/bin/zkServer.sh  status
    EOF
    done
    
    sleep 5
    # check kafka product send
    kubectl exec -n basis -it  `kubectl get po -n basis |grep kafka-deployment-1| awk '{print$1}'` -- bash  2>&1 << EOF
    export KAFKA_HEAP_OPTS=""
    /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-1.basis.svc.cluster.local:2181,zookeeper-2.basis.svc.cluster.local:2181,zookeeper-3.basis.svc.cluster.local:2181 --replication-factor 1 --partitions 3 --topic test
    /kafka/bin/kafka-producer-perf-test.sh --producer.config config/producer.properties --num-records 10000 --record-size 4096 --topic test --throughput 10000 --producer-props bootstrap.servers=kafka-1.basis.svc.cluster.local:9092,kafka-2.basis.svc.cluster.local:9092,kafka-3.basis.svc.cluster.local:9092 acks=all
    EOF
    
    
    sleep 5
    # check kafka consumer
    kubectl -n basis exec -it  `kubectl get po -n basis|grep kafka-deployment-1| awk '{print$1}'` -- bash  2>&1 << EOF
    export KAFKA_HEAP_OPTS=""
    /kafka/bin/kafka-consumer-perf-test.sh --consumer.config config/consumer.properties --topic test --broker-list kafka-1.basis.svc.cluster.local:9092,kafka-2.basis.svc.cluster.local:9092,kafka-3.basis.svc.cluster.local:9092 --group app-test-consumer --messages 10000  --threads 1
    EOF
    check_zk_kafka_acl.sh
  • 相关阅读:
    Sql Server 中的 @@ERROR
    MVC 自定义HtmlHelper帮助类型之分页
    CI(CodeIgniter)框架中的增删改查操作
    WMAP 启动报错 PHP- 提示缺少 msvcr110.dll 的问题
    Bat命令学习 (转载)
    C# 实现刻录光盘功能
    几个操作、名词的说明
    CheckStateChanged(复选框选中状态更改事件)和 CheckedChanged(单选按钮选中状态更改事件)二者区别?
    关于设置Visaul Studio 2010 代码编辑界面背景的方法
    checkBox1_CheckedChanged(object sender, EventArgs e)和checkBox1_CheckStateChanged(object sender, EventArgs e)不同
  • 原文地址:https://www.cnblogs.com/hanwei666/p/11743390.html
Copyright © 2011-2022 走看看