zoukankan      html  css  js  c++  java
  • 创建kafak集群

    下载kafka

    下载页面:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz

    # wget http://mirror.bit.edu.cn/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
    # tar xf kafka_2.11-0.11.0.0.tgz
    # cd kafka_2.11-0.11.0.0
    # cd conf
    # touch s1.
    

    在kafka的项目下创建3个节点的配置文件

    # cd kafka_2.11-0.11.0.0
    # cd conf
    # touch s1.properties s2.properties s3.properties
    # cat s1.properties
    broker.id=1
    host.name=192.168.7.203
    port=9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka-logs-1
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.7.203:2181,192.168.7.203:2182,192.168.7.203:2183
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    auto.create.topics.enable=false
    
    # cat s2.properties
    broker.id=2
    host.name=192.168.7.203
    port=9093
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka-logs-2
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.7.203:2181,192.168.7.203:2182,192.168.7.203:2183
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    auto.create.topics.enable=false
    
    # cat s3.properties 
    broker.id=3
    host.name=192.168.7.203
    port=9094
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/kafka-logs-3
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.7.203:2181,192.168.7.203:2182,192.168.7.203:2183
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    auto.create.topics.enable=false
    
    

    启动kafka

    # cd kafka_2.12-0.11.0.0
    # bin/kafka-server-start.sh -daemon config/s1.properties
    # bin/kafka-server-start.sh -daemon config/s2.properties
    # bin/kafka-server-start.sh -daemon config/s3.properties
    
    
    

    检查是否启动成功

    # jps
    131458 QuorumPeerMain
    131510 QuorumPeerMain
    131416 QuorumPeerMain
    162396 Jps
    149869 Kafka
    
    

    相关操作命令

    创建topic
    bin/kafka-topics.sh --create --zookeeper 192.168.7.203:2181 --replication-factor 1 --partitions 1 --topic consumptionsale

    查看全部topic:
    bin/kafka-topics.sh --list --zookeeper 192.168.19.98:2181

    查看topic的详细信息:
    bin/kafka-topics.sh --zookeeper 192.168.7.203:2183 --describe --topic consumptionsale

    生产信息:
    bin/kafka-console-producer.sh --broker-list 192.168.19.98:9092 --topic test

    消费信息:
    bin/kafka-console-consumer.sh --bootstrap-server 192.168.19.98:9092 --topic test --from-beginning

    bin/kafka-console-consumer.sh --zookeeper 192.168.19.98:2181 --topic test --from-beginning

    后台启动kafka
    bin/kafka-server-start.sh -daemon config/server-2.properties

    kafka添加节点

    添加新节点很简单,只需要器启动一个kafka节点,其中使用同一个集群的zookeeper配置,更改ip和端口号和存储路径即可。

    kafka topic 调整

    当一个topic建立的时候,指定了分区和副本数量,kafka集群一般只会在broker故障的时候进行分区的leader切换,一般不会更改副本和分区的存储位置。当kafka集群扩容和迁移的时候必须要进行手动调整才能改变分区和副本所在的broker。

    进行分区迁移时最好先保留一个分区在原来的磁盘,这样不会影响正常的消费和生产,因为一次迁移所有的副本,无法正常消费和生产,部分迁移则可以正常消费和生产。

    查询topic的详尽信息:

    # bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic testpanjunbai
    Topic:testpanjunbai	PartitionCount:2	ReplicationFactor:2	Configs:
    	Topic: testpanjunbai	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
    	Topic: testpanjunbai	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 2,1
    
    

    说明这个topic 有2个分区,2个副本,在broker 1,2 上面。

    生产一个要迁移的topic文件,使用json格式:

    # cat  topics-to-move.json 
    {
      "topics": [
        {"topic": "testpanjunbai"}
      ],
      "version":1
    }
    
    

    使用命令生产一个kafka执行的迁移表,吧topic迁移到1,2,3,4,5这些broker上面

    # /root/kafka_2.12-0.11.0.3/bin/kafka-reassign-partitions.sh --zookeeper 192.168.226.100:2183 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4,5" -generate
    # cat expand-cluster-reassignment.json 
    
    {"version":1,
      "partitions":[{"topic":"testpanjunbai","partition":0,"replicas":[1,3]},
     		{"topic":"testpanjunbai","partition":1,"replicas":[2,3]}]
    }
    
    
    
    

    生产的json文件详细描述了迁移后的topic分区和副本存在的broker节点,也可以在这里手动进行调整。调整如下:

    # cat expand-cluster-reassignment.json
    
    {"version":1,
      "partitions":[{"topic":"testpanjunbai","partition":0,"replicas":[3,4]},
     		{"topic":"testpanjunbai","partition":1,"replicas":[4,5]}]
    }
    
    

    使用命令执行迁移:

    # /root/kafka_2.12-0.11.0.3/bin/kafka-reassign-partitions.sh --zookeeper 192.168.226.100:2183 --reassignment-json-file expand-cluster-reassignment.json -execute
    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"testpanjunbai","partition":1,"replicas":[1,2]},{"topic":"testpanjunbai","partition":0,"replicas":[2,1]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.
    
    

    检查是否完成迁移:

    # /root/kafka_2.12-0.11.0.3/bin/kafka-reassign-partitions.sh --zookeeper 192.168.226.100:2183 --reassignment-json-file expand-cluster-reassignment.json --verify
    Status of partition reassignment: 
    Reassignment of partition [testpanjunbai,0] completed successfully
    Reassignment of partition [testpanjunbai,1] completed successfully
    

    查看topic状态:

    # /root/kafka_2.12-0.11.0.3/bin/kafka-topics.sh --zookeeper 192.168.226.42:2181 --describe --topic testpanjunbiai 
    Topic:testpanjunbai	PartitionCount:2	ReplicationFactor:2	Configs:
    	Topic: testpanjunbai	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
    	Topic: testpanjunbai	Partition: 1	Leader: 4	Replicas: 4,5	Isr: 4,5
    
    

    当然也可以使用kafka-manager工具进行远程迁移操作。
    当一个节点停止后,在重启,他将不会是分区的leader,会被其他节点的副本取代,但是在集群中他仍是Preferred Leader,需要使用命令来让他在成为节点。

    # /root/kafka_2.12-0.11.0.3/bin/kafka-topics.sh --zookeeper 192.168.226.42:2181 --describe --topic testpanjunbiai 
    Topic:testpanjunbai	PartitionCount:2	ReplicationFactor:2	Configs:
    	Topic: testpanjunbai	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
    	Topic: testpanjunbai	Partition: 1	Leader: 5	Replicas: 4,5	Isr: 4,5
    
    # /root/kafka_2.12-0.11.0.3/bin/kafka-preferred-replica-election.sh --zookeeper 192.168.226.42:2181 
    Created preferred replica election path with {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":34},{"topic":"__consumer_offsets","partition":36},{"topic":"testpanjunbai","partition":1},{"topic":"__consumer_offsets","partition":27},{"topic":"__consumer_offsets","partition":1},{"topic":"__consumer_offsets","partition":20},{"topic":"__consumer_offsets","partition":7},{"topic":"__consumer_offsets","partition":42},{"topic":"__consumer_offsets","partition":49},{"topic":"__consumer_offsets","partition":4},{"topic":"__consumer_offsets","partition":33},{"topic":"__consumer_offsets","partition":14},{"topic":"__consumer_offsets","partition":46},{"topic":"testpanjunbai","partition":0},{"topic":"__consumer_offsets","partition":24},{"topic":"__consumer_offsets","partition":28},{"topic":"__consumer_offsets","partition":6},{"topic":"__consumer_offsets","partition":37},{"topic":"__consumer_offsets","partition":43},{"topic":"__consumer_offsets","partition":21},{"topic":"__consumer_offsets","partition":15},{"topic":"__consumer_offsets","partition":11},{"topic":"__consumer_offsets","partition":30},{"topic":"__consumer_offsets","partition":2},{"topic":"test123","partition":1},{"topic":"__consumer_offsets","partition":47},{"topic":"__consumer_offsets","partition":25},{"topic":"__consumer_offsets","partition":29},{"topic":"__consumer_offsets","partition":8},{"topic":"__consumer_offsets","partition":23},{"topic":"__consumer_offsets","partition":40},{"topic":"__consumer_offsets","partition":31},{"topic":"__consumer_offsets","partition":19},{"topic":"__consumer_offsets","partition":16},{"topic":"__consumer_offsets","partition":38},{"topic":"test123","partition":0},{"topic":"__consumer_offsets","partition":44},{"topic":"__consumer_offsets","partition":10},{"topic":"__consumer_offsets","partition":3},{"topic":"__consumer_offsets","partition":35},{"topic":"__consumer_offsets","partition":26},{"topic":"__consumer_offsets","partition":39},{"topic":"__consumer_offsets","partition":13},{"topic":"__consumer_offsets","partition":17},{"topic":"__consumer_offsets","partition":22},{"topic":"__consumer_offsets","partition":9},{"topic":"__consumer_offsets","partition":0},{"topic":"__consumer_offsets","partition":41},{"topic":"__consumer_offsets","partition":48},{"topic":"__consumer_offsets","partition":18},{"topic":"__consumer_offsets","partition":32},{"topic":"__consumer_offsets","partition":12},{"topic":"__consumer_offsets","partition":45},{"topic":"__consumer_offsets","partition":5}]}
    Successfully started preferred replica election for partitions Set([__consumer_offsets,32], [__consumer_offsets,16], [__consumer_offsets,49], [__consumer_offsets,44], [__consumer_offsets,28], [test123,0], [__consumer_offsets,17], [__consumer_offsets,23], [__consumer_offsets,7], [testpanjunbai,0], [__consumer_offsets,4], [__consumer_offsets,29], [__consumer_offsets,35], [__consumer_offsets,3], [__consumer_offsets,24], [__consumer_offsets,41], [__consumer_offsets,0], [__consumer_offsets,38], [__consumer_offsets,13], [__consumer_offsets,8], [__consumer_offsets,5], [__consumer_offsets,39], [__consumer_offsets,36], [__consumer_offsets,40], [__consumer_offsets,45], [__consumer_offsets,15], [__consumer_offsets,33], [__consumer_offsets,37], [__consumer_offsets,21], [testpanjunbai,1], [__consumer_offsets,6], [test123,1], [__consumer_offsets,11], [__consumer_offsets,20], [__consumer_offsets,47], [__consumer_offsets,2], [__consumer_offsets,27], [__consumer_offsets,34], [__consumer_offsets,9], [__consumer_offsets,22], [__consumer_offsets,42], [__consumer_offsets,14], [__consumer_offsets,25], [__consumer_offsets,10], [__consumer_offsets,48], [__consumer_offsets,31], [__consumer_offsets,18], [__consumer_offsets,19], [__consumer_offsets,12], [__consumer_offsets,46], [__consumer_offsets,43], [__consumer_offsets,1], [__consumer_offsets,26], [__consumer_offsets,30])
    
    

    再次查看topic状态:

    # /root/kafka_2.12-0.11.0.3/bin/kafka-topics.sh --zookeeper 192.168.226.42:2181 --describe --topic testpanjunbaiTopic:testpanjunbai	PartitionCount:2	ReplicationFactor:2	Configs:
    	Topic: testpanjunbai	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
    	Topic: testpanjunbai	Partition: 1	Leader: 4	Replicas: 4,5	Isr: 5,4
    

    修改分区的 Preferred Leader ,同上面迁移过程一样,只需要调整expand-cluster-reassignment.json中分区的副本顺序,在执行即可,执行完毕后会发现新指定的 Preferred Leader 变成leader了。

    参考文档:

    https://blog.csdn.net/u013214151/article/details/53495045
    https://blog.csdn.net/u013573133/article/details/48142677
    http://orchome.com/
    http://kafka.apache.org/quickstart
    https://tech.meituan.com/kafka-fs-design-theory.html
    http://1028826685.iteye.com/blog/2326601
    https://www.iteblog.com/archives/1611.html
    http://blog.csdn.net/huanggang028/article/details/47830529
    https://www.w3cschool.cn/apache_kafka/apache_kafka_introduction.html
    http://www.jasongj.com/2015/04/24/KafkaColumn2/
    http://blog.csdn.net/qiaochao911/article/details/40920645
    http://abloz.com/tech/2017/07/15/kafka-error-test/
    http://blog.csdn.net/lizhitao/article/details/25667831

  • 相关阅读:
    Ubuntu 14.04 卸载通过源码安装的库
    Ubuntu 14.04 indigo 相关依赖
    Ubuntu 14.04 indigo 安装 cartographer 1.0.0
    Ubuntu 14.04 改变文件或者文件夹的拥有者
    安装cartographer遇到Unrecognized syntax identifier "proto3". This parser only recognizes "proto2"问题
    Unrecognized syntax identifier "proto3". This parser only recognizes "proto2". ”问题解决方法
    查看所有用户组,用户名
    1卸载ROS
    Ubuntu14.04 软件安装卸载
    Ubuntu14.04系统显示器不自动休眠修改
  • 原文地址:https://www.cnblogs.com/panjunbai/p/7651512.html
Copyright © 2011-2022 走看看