group=orchestrate-factory && export KAFKA_HEAP_OPTS="" && /kafka/bin/kafka-consumer-groups.sh --command-config config/consumer.properties --describe --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --group $group | awk '{print $5}'|awk '{sum+=$1} END {print sum}'
kafka鉴权监控
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/bin/bash if [ -f /tmp/kafkachecklock ];then Ntime=`date +%s` Ftime=`date -d "$(stat /tmp/kafkachecklock|grep Modify|awk '{print $2,$3}'|awk -F '.' '{print $1}')" +%s` if [ $((Ntime-Ftime)) -gt 153 ];then rm -f /tmp/kafkachecklock fi kill 0 else dockerid=`docker ps|grep "kafka"|grep -vE "pause|side"|awk '{print $1}'` localIP=`ip addr|grep -A5 "^2:"|grep inet|awk 'NR==1{print $2}'|cut -d "/" -f1` #echo "dockerid: $dockerid" #echo "localIP: $localIP" rm -f /tmp/kafka_total if [ x"$dockerid" != x"" ];then touch /tmp/kafkachecklock Dockerpid=`docker inspect -f {{.State.Pid}} $dockerid` if [ "`nsenter -n -m -p -u -t $Dockerpid ls /lib/libjli.so`" == "" ];then nsenter -n -m -p -u -t $Dockerpid cp /usr/lib/jvm/java-1.8-openjdk/lib/amd64/jli/libjli.so /lib/libjli.so fi nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --list|grep -v KMOffsetCache > /tmp/kafkaItem for i in `cat /tmp/kafkaItem`;do nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --describe --bootstrap-server kafka-1.default.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --group $i|grep -vE "^$|TOPIC" > /tmp/kafkaN # sleep 2 j=1 while [ $j -le `cat /tmp/kafkaN|sort|uniq|wc -l` ] do TOPIC=`cat /tmp/kafkaN|awk NR==$j'{print $1}'` PARTITION=`cat /tmp/kafkaN|awk NR==$j'{print $2}'` CURRENT_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $3}'` LOG_END_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $4}'` LAG=`cat /tmp/kafkaN|awk NR==$j'{print $5}'` CONSUMER_ID=`cat /tmp/kafkaN|awk NR==$j'{print $6}'` HOST=`cat /tmp/kafkaN|awk NR==$j'{print $7}'` CLIENT_ID=`cat /tmp/kafkaN|awk NR==$j'{print $8}'` if [ $LAG == "-" ];then LAG=0;fi if [ $CURRENT_OFFSET == "-" ];then CURRENT_OFFSET=0;fi if [ $LOG_END_OFFSET == "-" ];then LOG_END_OFFSET=0;fi echo "KAFKA_CURRENT_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $CURRENT_OFFSET" >>/tmp/kafka_total echo "KAFKA_LOG_END_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LOG_END_OFFSET" >>/tmp/kafka_total echo "KAFKA_LAG{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LAG" >>/tmp/kafka_total j=$((j+1)) done done cat /tmp/kafka_total|curl --data-binary @- http://$localIP:9091/metrics/job/kafka"`echo $localIP|awk -F '.' '{print $NF}'`" fi rm -f /tmp/kafkachecklock fi #add to /etc/crontab if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then sudo cp -f $0 /etc/ sudo chmod +x /etc/$0 sudo bash -c "echo '*/1 * * * * root /etc/$0'>>/etc/crontab" fi
kafka监控
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/bin/bash if [ -f /tmp/kafkachecklock ];then Ntime=`date +%s` Ftime=`date -d "$(stat /tmp/kafkachecklock|grep Modify|awk '{print $2,$3}'|awk -F '.' '{print $1}')" +%s` if [ $((Ntime-Ftime)) -gt 153 ];then rm -f /tmp/kafkachecklock fi kill 0 else dockerid=`docker ps|grep "kafka"|grep -v "pause"|awk '{print $1}'` kafkaNum=`docker ps|grep "kafka"|grep -v "pause"|awk '{print $NF}'|awk -F "_" '{print $2}'|cut -d "." -f1` localIP=`ip addr|grep -A5 "^2:"|grep inet|awk 'NR==1{print $2}'|cut -d "/" -f1` rm -f /tmp/kafka_total${kafkaNum} if [ x"$dockerid" != x"" ];then touch /tmp/kafkachecklock Dockerpid=`docker inspect -f {{.State.Pid}} $dockerid` if [ $(nsenter -n -m -p -u -t $Dockerpid ls /lib/libjli.so|wc -l) -eq 0 ];then nsenter -n -m -p -u -t $Dockerpid cp -f /usr/lib/jvm/java-1.8-openjdk/jre/lib/amd64/jli/libjli.so /lib/libjli.so fi nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --bootstrap-server $kafkaNum.default.svc.cluster.local:9092 --list|grep -v KMOffsetCache > /tmp/kafkaItem for i in `cat /tmp/kafkaItem`;do nsenter -n -m -p -u -t $Dockerpid /kafka/bin/kafka-consumer-groups.sh --describe --bootstrap-server $kafkaNum.default.svc.cluster.local:9092 --group $i|grep -vE "^$|TOPIC" > /tmp/kafkaN # sleep 2 j=1 while [ $j -le `cat /tmp/kafkaN|wc -l` ] do TOPIC=`cat /tmp/kafkaN|awk NR==$j'{print $1}'` PARTITION=`cat /tmp/kafkaN|awk NR==$j'{print $2}'` CURRENT_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $3}'` LOG_END_OFFSET=`cat /tmp/kafkaN|awk NR==$j'{print $4}'` LAG=`cat /tmp/kafkaN|awk NR==$j'{print $5}'` CONSUMER_ID=`cat /tmp/kafkaN|awk NR==$j'{print $6}'` HOST=`cat /tmp/kafkaN|awk NR==$j'{print $7}'` CLIENT_ID=`cat /tmp/kafkaN|awk NR==$j'{print $8}'` if [ $LAG == "-" ];then LAG=0;fi if [ $CURRENT_OFFSET == "-" ];then CURRENT_OFFSET=0;fi if [ $LOG_END_OFFSET == "-" ];then LOG_END_OFFSET=0;fi echo "KAFKA_CURRENT_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $CURRENT_OFFSET" >>/tmp/kafka_total${kafkaNum} echo "KAFKA_LOG_END_OFFSET{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LOG_END_OFFSET" >>/tmp/kafka_total${kafkaNum} echo "KAFKA_LAG{app="$i",topic="$TOPIC",partition="$PARTITION",CONSUMER_ID="$CONSUMER_ID",host="$HOST",CLIENT_ID="$CLIENT_ID"} $LAG" >>/tmp/kafka_total${kafkaNum} j=$((j+1)) done done cat /tmp/kafka_total${kafkaNum}|curl --data-binary @- http://$localIP:9091/metrics/job/kafka"`echo $localIP|awk -F '.' '{print $NF}'`" fi rm -f /tmp/kafkachecklock fi #add to /etc/crontab if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then sudo cp -f $0 /etc/ sudo chmod +x /etc/$0 sudo bash -c "echo '*/1 * * * * root /etc/$0'>>/etc/crontab" fi
清空普罗米修斯pushgateway
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/bin/bash Ntime=`date +%s` LOCALIP=`ip addr |grep $(ip addr|grep ^2:|awk '{print $2}'|cut -d ":" -f1)|awk 'NR==2{print $2}'|cut -d "/" -f1` curl http://$LOCALIP:9091/metrics|grep push_time_seconds|grep -v "#" > /tmp/metricsjob for i in $(cat /tmp/metricsjob|awk '{print $NF}');do JOBTIME=$(echo $i|awk '{print sprintf("%d", $0);}') if [ $((Ntime-JOBTIME)) -gt 600 ];then JOB=$(cat /tmp/metricsjob|grep $i|awk -F "job=" '{print $2}'|awk -F '"' '{print $2}') curl -X DELETE http://$LOCALIP:9091/metrics/job/$JOB echo $JOB >>/tmp/deletejobs fi done #add to /etc/crontab if [ `cat /etc/crontab|grep $0|wc -l` -eq 0 ];then sudo cp -f $0 /etc/ sudo chmod +x /etc/$0 sudo bash -c "echo '*/3 * * * * root /etc/$0'>>/etc/crontab" fi
查看list
export KAFKA_HEAP_OPTS=""
/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --command-config /kafka/config/consumer.properties --list
查看详细信息
/kafka/bin/kafka-consumer-groups.sh --command-config config/consumer.properties --describe --bootstrap-server kafka-1-master.basis.svc.cluster.local:9092 --group baseBusiness
kafka集群部署
https://www.cnblogs.com/saneri/p/8762168.html
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/bin/bash # check zk status for i in {1..3};do kubectl exec -n basis -it `kubectl get po -n basis|grep zookeeper-$i| awk '{print $1}'` -- bash 2>&1 << EOF export JVMFLAGS="" /opt/zookeeper/bin/zkServer.sh status EOF done sleep 5 # check kafka product send kubectl exec -n basis -it `kubectl get po -n basis |grep kafka-deployment-1| awk '{print$1}'` -- bash 2>&1 << EOF export KAFKA_HEAP_OPTS="" /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-1.basis.svc.cluster.local:2181,zookeeper-2.basis.svc.cluster.local:2181,zookeeper-3.basis.svc.cluster.local:2181 --replication-factor 1 --partitions 3 --topic test /kafka/bin/kafka-producer-perf-test.sh --producer.config config/producer.properties --num-records 10000 --record-size 4096 --topic test --throughput 10000 --producer-props bootstrap.servers=kafka-1.basis.svc.cluster.local:9092,kafka-2.basis.svc.cluster.local:9092,kafka-3.basis.svc.cluster.local:9092 acks=all EOF sleep 5 # check kafka consumer kubectl -n basis exec -it `kubectl get po -n basis|grep kafka-deployment-1| awk '{print$1}'` -- bash 2>&1 << EOF export KAFKA_HEAP_OPTS="" /kafka/bin/kafka-consumer-perf-test.sh --consumer.config config/consumer.properties --topic test --broker-list kafka-1.basis.svc.cluster.local:9092,kafka-2.basis.svc.cluster.local:9092,kafka-3.basis.svc.cluster.local:9092 --group app-test-consumer --messages 10000 --threads 1 EOF