zoukankan      html  css  js  c++  java
  • kafka的使用经验

    kafka参数说明(参考):
    https://www.cnblogs.com/weixiuli/p/6413109.html


    kafka时间戳字段原因(过期清理,日志切分,流式处理),0.10版本开始才有时间戳概念
    https://www.cnblogs.com/huxi2b/p/6050778.html

    kafka消息是存放在磁盘上,发送一次,累积到一定数量或者时间间隔就落盘一次,消费一次就读一次磁盘

    topic划分为若干分区,分区对一个目录,分区划分为segment,一个segment对应三个二进制文件(后缀分别是index,log,timeindex),类似mysql存储机制
    消息数据存放在log文件里面,对应的位置存放在index里面,时间戳存放在timeindex里面


    分区有副本概念,如果一个topic有10个分区,分3个节点,那么可能是3/3/4的存放方式,比如kjLog-1,kjLog-4,kjLog-7这样的存放方式。
    但是其他kjLog-*目录也会有,但是目录下的index,log,timeindex文件一定是空的,没有数据。


    副本选举机制:
    如果有多个副本,会有一个选举机制,假设有1个分区有5个副本,共6份,如果分区1挂了,其他5份有3份及时和分区1同步了,就会进入ISR,这个东西存放在zk里面
    当kafka发现有个分区挂了后,就从ISR找到每个可用的副本所在节点ID,下发通知,这些副本里面3份及时的分区对应的节点就会同时向zk注册,zk机制是只有一个节点能注册成功,这样先注册的就选举成功,成为新的分区,其他分区都要跟它保持同步。


    ===================
    kafka配置:
    cd /usr/local/src
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
    tar -zxvf kafka_2.11-0.10.1.1.tgz -C /home/dig/service/
    cd /home/dig/service/
    ln -s kafka_2.11-0.10.1.0 kafka

    vim /home/dig/service/kafka/config/consumer.properties
    zookeeper.connect=bdp-001:2181,bdp-002:2181,bdp-003:2181
    zookeeper.connection.timeout.ms=6000
    group.id=test-consumer-group


    vim /home/dig/service/kafka/config/producer.properties
    metadata.broker.list=bdp-001:9092,bdp-002:9092,bdp-003:9092
    compression.codec=none

    producer.type=sync
    serializer.class=kafka.serializer.DefaultEncoder
    batch.num.messages=200


    vim /home/dig/service/kafka/config/server.properties
    broker.id=0
    host.name=bdp-001
    port=9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/home/dig/service/kafka/data
    num.partitions=6
    num.recovery.threads.per.data.dir=1

    #消息保留时间
    log.retention.hours=168

    #消息最多保留的字节数
    log.segment.bytes=1073741824

    #每隔多久检查上面两个参数是否达到阀值
    log.retention.check.interval.ms=300000

    log.cleaner.enable=false

    zookeeper.connect=bdp-001:2181,bdp-002:2181,bdp-003:2181
    zookeeper.connection.timeout.ms=6000
    auto.create.topics.enable=false
    delete.topic.enable=true


    另外server.properties中,不要启用自动创建topic:
    auto.create.topics.enable=true
    否则producer发送消息时,提示分区异常。


    拷贝到另外两个节点:
    scp -r /home/dig/service/kafka_2.11-0.10.1.0/ bdp-002:/home/dig/service/
    scp -r /home/dig/service/kafka_2.11-0.10.1.0/ bdp-003:/home/dig/service/


    注意每个节点id不一样:
    cat /home/dig/service/kafka/config/server.properties | grep broker.id


    临时启动kafka:
    rm -rf /tmp/bdp-logs
    /home/dig/service/kafka/bin/kafka-server-start.sh /home/dig/service/kafka/config/server.properties


    在3台机器上分别执行:
    rm -rf /home/dig/service/zookeeper/data/*

    ps -ef |grep -v grep | egrep -i 'kafka|Kafka' |awk '{print $2}' |xargs -t -i kill -9 {}
    rm -rf /tmp/bdp-logs && ll /tmp/bdp-logs || jps


    所有节点关闭完后先启动zk:
    /home/dig/service/zookeeper/bin/zkServer.sh start

    再启动kafka:
    /home/dig/service/kafka/bin/kafka-server-start.sh -daemon /home/dig/service/kafka/config/server.properties
    jps
    sleep 1
    ll /tmp/bdp-logs


    查看kafka启动状态:
    ls /brokers/ids
    输入3个kafka实例的id即集群启动成功:
    [1, 2, 3]


    apache版本的kafka自带关闭命令无效:
    /home/dig/service/kafka/bin/kafka-server-stop.sh /home/dig/service/kafka/config/server.properties

    /home/dig/service/kafka/bin/kafka-server-start.sh /home/dig/service/kafka/config/server.properties

    有效关闭脚本:
    ps -ef |grep -v grep | egrep -i 'kafka|Kafka' |awk '{print $2}' |xargs -t -i kill -9 {}


    ===========================================
    测试kafak是否部署成功:


    创建Topic
    /home/dig/service/kafka/bin/kafka-topics.sh
    --zookeeper etl1:2181,etl2:2181,etl3:2181
    --replication-factor 2
    --partitions 3
    --create
    --topic kjLog


    或者这么删除(只删除zk的元数据,分区文件没有删除):
    kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic mytest1


    删除话题队列:
    kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytest1


    查看存在的话题队列:
    kafka-topics.sh --zookeeper localhost:2181 --list


    向话题队列mytest1 发送消息,运行后直接输入:
    kafka-console-producer.sh --broker-list kafka1:9092 --topic mytest1

    消费者从话题队列mytest3 中取消息(运行后,直接可以看到输出结果,老版本是连zk,新版本连bootstrap及broker):
    kafka-console-consumer.sh
    --bootstrap-server etl3:9092
    --topic kjLog
    --consumer-property group.id=kjEtlGroup

    # 从头开始消费
    --from-beginning


    消费组是在消费时自动生成的,默认是console-consumer-31553,后面的数组是随机的,
    也可以消费时指定具体消费组名字


    查看有哪些消费组:
    kafka-consumer-groups.sh --zookeeper localhost:2181 --list

    等价于登录zk,查看有哪些消费组:
    ls /consumers/


    ===========================================
    查看消息数量offset:
    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --time -1 --topic kjLog --partitions 0
    输出topic的容量大小
    kjLog:2:116060276
    kjLog:1:70158992
    kjLog:0:15411674


    查看指定topic的详细信息:
    kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:12181 --topic kjLog --group kjEtlGroup
    输出结果:
    消费者组 话题id 分区id 当前已消费的条数 总条数 未消费的条数 属主
    Group Topic Pid Offset logSize Lag Owner
    kjEtlGroup kjLog 0 15484439 15484445 6 none
    kjEtlGroup kjLog 1 70655159 70655189 30 none
    kjEtlGroup kjLog 2 116860888 116860904 16 none


    查看分区详细信息:
    kafka-topics.sh --zookeeper localhost:2181 --describe
    输出结果:
    Topic:mytest4 PartitionCount:6 ReplicationFactor:2 Configs:
    Topic: mytest4 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
    Topic: mytest4 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1

    表示Topic名称是mytest4,一共6个分区,并且每个分区都有两份完全一样的。
    分区0的leader节点是在id为2的kafka实例上,复制节点是2和3两个实例,正在服务的节点也是2和3
    分区1的leader节点是在id为3的kafka实例上,复制节点是3和1两个实例,正在服务的节点也是3和1


    ===========================================

    迁移消息:
    vim topics-to-move.json
    {"topics":
    [{"topic": "mytest1"}],
    "version":1
    }


    生成迁移计划:
    bdp-reassign-partitions.sh
    --zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181
    --broker-list "4"
    --topics-to-move-json-file topics-to-move.json
    --generate


    输出如下结果:
    Current partition replica assignment

    {"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[1]},{"topic":"mytest1","partition":1,"replicas":[3]},{"topic":"mytest1","partition":0,"replicas":[2]}]}
    Proposed partition reassignment configuration

    {"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[4]},{"topic":"mytest1","partition":1,"replicas":[4]},{"topic":"mytest1","partition":0,"replicas":[4]}]}


    将 Proposed partition 对应json内容写入到文件reassignment-node.json中

    执行topic迁移:
    bdp-reassign-partitions.sh
    --zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181
    --reassignment-json-file reassignment-node.json
    --execute


    执行迁移后,会在zk上注册一个节点:
    登录zk查看: get /admin/reassign_partitions
    结果如下:
    {"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[4]}]}


    查看迁移结果:
    bdp-reassign-partitions.sh
    --zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181
    --reassignment-json-file reassignment-node.json
    --verify


    上面的方式并没有增加新分区,而是对原有分区做了一个副本,增加一个topic的分区方法是:
    kafka-topics.sh --zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 --alter --partitions 5 --topic mytest1
    这个命令的效果是topic为mytest1的分区数量增加到5个,而不是在原有分区数之上再增加5个分区。

    kafka分区只能增加,不能通过自带的命令删除(但是直接删除文件夹可以删除),否则报错:
    Error while executing topic command The number of partitions for a topic can only be increased
    kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased


    增加了分区后,迁移分区的方法就简单了,
    编写如下json,其中12,13,14是分区id, 101,102,103,104,105,106是kafka实例id
    vim
    {
    "partitions": [{
    "topic": "mytest1",
    "partition": 12,
    "replicas": [101,102]
    },
    {
    "topic": "mytest1",
    "partition": 13,
    "replicas": [103,104]
    },
    {
    "topic": "mytest1",
    "partition": 14,
    "replicas": [105,106]
    }],
    "version": 1
    }


    然后执行分区重分布命令:
    bdp-reassign-partitions.sh
    --zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181
    --reassignment-json-file partitions-extension-push-token-topic.json
    --execute

    这样topic为mytest1的分区就改变了,存放到新的物理服务器的节点上了。


    =============================================================
    官方文档:
    http://kafka.apache.org/documentation.html


    kafka开发文档:
    http://www.aboutyun.com/thread-9906-1-1.html


    官方文档翻译:
    http://www.cnblogs.com/lzqhss/p/4434901.html


    参考翻译:
    http://blog.csdn.net/suifeng3051/article/details/48053965


    =============================================================
    问题1:
    http://www.oschina.net/question/196281_248633?sort=time

    这是Kafka消息分区造成的,你可以去了解一下Kafka是如何分区的,就知道原因了。

    问题原因可能是:你的所有消息的Key都是一样的,使用默认的Partitioner: hash(key)%numPartitions,这样每次的partion num都是一样的,所以数据都落到一个分区了。

    而同一consumer grop并行消息,也是按照分区来分配的,因为只有一个分区上有数据,所以有一个consumer始终拿不到消息。

    解决办法:1.自定义分区函数。2.消息散列为不同的key

    问题2:
    [2016-08-01 14:13:19,609] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
    [2016-08-01 14:13:19,615] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
    [2016-08-01 14:13:27,784] ERROR [test-consumer-group_bdp-003-1470075194621-ae0d9636], error during syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
    kafka.common.ConsumerRebalanceFailedException: test-consumer-group_bdp-003-1470075194621-ae0d9636 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)


    解决这是一个bug:
    There is a bug in kafka.tools.ConsumerOffsetChecker. If the a particular Zookeeper node holding consumed offset information doesn't exit, the tool exits throwing the execption.

    For example, suppose you have a consumer group "mygroup" and a topic "topictest". Then the offset for partition 2 is maintained in Znode: /consumers/mygroup/offsets/topictest/2.

    If there is no entry for partition 2 of topic topictest in Znode, then consumer offsetchecker tool will exit while checking offset for partition 2. Basically, it will fail while checking the first partition "n" for which the Znode /consumers/mygroup/offsets/topictest/n is missing on Zookeeper.


    问题3:
    zookeeper启用日志文件,修改conf下面的log4j.properties,但无效。

    再修改bin下的zkEnv.sh才能生效:
    if [ "x${ZOO_LOG_DIR}" = "x" ]
    then
    ZOO_LOG_DIR="/home/dig/service/zookeeper/log"
    fi

    if [ "x${ZOO_LOG4J_PROP}" = "x" ]
    then
    ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
    fi


    问题4:
    kafka消息存放多久?
    这个取决于kafka消息清理策略。
    默认kafka消息的清理策略是删除,但是没有明确写入server.propertiesp文件中:
    log.cleanup.policy=delete

    默认消息保存一周时间,默认不限制消息总量的长度,检查间隔是1ms。
    log.retention.hours=168
    log.retention.bytes=-1
    log.retention.check.interval.ms=1

    问题5:
    logserver报错:
    [ERROR][kafka-producer-network-thread | producer-1][2017-04-05 09:37:21,129][LogServiceImpl.java:onCompletion:220]:Fail to send record to Kafka. Key: appupgrade_alive_user, Value Length: 190
    org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

    原因是,kafka集群宕机或者重启后,topic partition重新选举leader,并且不是原来的leader后,而生产者还是使用原来的leader来生产消息,报错。
    解决:重启生产者所在服务器。

    问题6:
    logserver报错:
    [WARN][kafka-producer-network-thread | producer-1][2017-04-05 09:37:21,234][Sender.java:completeBatch:298]:Got error produce response with correlation id 23026690 on topic-partition kjLog-0, retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION

    原因同上,都是因为leader重新选举导致。

    问题7:
    logserver报错:
    536116 [ERROR][kafka-producer-network-thread | producer-1][2017-04-04 09:26:41,952][LogServiceImpl.java:onCompletion:102]:Fail to send record to Kafka. Key: konkasysteminfo_open_p age, Value Length: 462
    536117 org.apache.kafka.common.errors.TimeoutException: Batch containing 46 record(s) expired due to timeout while requesting metadata from brokers for kjLog-2

    原因不确定,初步怀疑也是分区选举leader变更导致。

    ===========================================
    1.kafka为什么要在topic里加入分区的概念?
    topic是逻辑的概念,partition是物理的概念,对用户来说是透明的。producer只需要关心消息发往哪个topic,而consumer只关心自己订阅哪个topic,并不关心每条消息存于整个集群的哪个broker。
    logsize是写入分区的消息条数,offset是已经消费的条数,这两个只都是从0开始,每次累加(i++),单位是条,不是字节
    lag表示没有消费的,已经缓存的条数,lag = logsize - offsize

    为了性能考虑,如果topic内的消息只存于一个broker,那这个broker会成为瓶颈,无法做到水平扩展。所以把topic内的数据分布到整个集群就是一个自然而然的设计方式。Partition的引入就是解决水平扩展问题的一个方案。

    每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同paritition上读数据,实现了水平扩展
    里所讲,每个partition可以被认为是一个无限长度的数组,新数据顺序追加进这个数组。物理上,每个partition对应于一个文件夹。一个broker上可以存放多个partition。这样,producer可以将数据发送给多个broker上的多个partition,consumer也可以并行从多个broker上的不同paritition上读数据,实现了水平扩展。


    2.如果没有分区,topic中的segment消息写满后,直接给订阅者不是也可以吗?
    “segment消息写满后”,consume消费数据并不需要等到segment写满,只要有一条数据被commit,就可以立马被消费

    segment对应一个文件(实现上对应2个文件,一个数据文件,一个索引文件),一个partition对应一个文件夹,一个partition里理论上可以包含任意多个segment。所以partition可以认为是在segment上做了一层包装。
    这个问题换个角度问可能更好,“为什么有了partition还需要segment”。
    如果不引入segment,一个partition直接对应一个文件(应该说两个文件,一个数据文件,一个索引文件),那这个文件会一直增大。同时,在做data purge时,需要把文件的前面部分给删除,不符合kafka对文件的顺序写优化设计方案。引入segment后,每次做data purge,只需要把旧的segment整个文件删除即可,保证了每个segment的顺序写。


    3、kafka的消息生产者使用的包是import kafka.javaapi.producer.Producer,不是kafka.producer.Producer
    import kafka.producer.KeyedMessage;
    import kafka.javaapi.producer.Producer;
    import kafka.producer.ProducerConfig;

    private final Producer<Integer, String> producer;
    props.put("metadata.broker.list", KafkaProperties.kafkaConnect);
    producer = new Producer<Integer, String>(new ProducerConfig(props));


    4、kafka消息发送字节流,扩展序列化类 参考链接:
    http://grepcode.com/file/repo1.maven.org/maven2/org.apache.twill/twill-core/0.1.0-incubating/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java


    5、kafka消息部分代码解析:
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    //topicCountMap是设置每个topic开多少线程,每个线程处理执行多个task,
    //每个task会从每个topic的一个分区中消费数据,如果有3个分区,每个task会同时从3个topic的分区中获取数据,即每个task会从3个分区中获取数据。
    //这里的参数new Integer(1)表示kafka消费服务器开多少个线程,通常最好是线程数量小于等于对应topic的分区数量
    topicCountMap.put(topic, new Integer(2));

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    //这里的get(0)表示从第一个分区中提取消息
    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    -------------------------
    技术不在于多么高超先进巧妙,而在于要有现实价值!!!
  • 相关阅读:
    VGA实验 :逻辑分析仪
    VGA实验:点亮屏幕
    ASP.NET 4‎.0 生成 eurl.axd Http异常错误的处理方法 (汗IIS 怎这么多莫名其妙的问题)
    腾讯检测 IP 省市的接口
    hql 中cast 方法的使用
    asp.net 防止外部提交数据(转)
    为 SQL Server 启用缓存通知
    net中前台 javascript与后台c#函数相互调用
    const 与 readonle 的异同
    获取页面执行时间的几种方法(asp.net转)
  • 原文地址:https://www.cnblogs.com/geektcp/p/9609342.html
Copyright © 2011-2022 走看看