zoukankan      html  css  js  c++  java
  • 【基础组件5】kafka入门(一)集群搭建+常用命令+基本原理+存储分析

    一、集群搭建

    集群搭建参考以下四篇:

    主要是第1篇即可

    https://blog.csdn.net/qq_34258346/article/details/80436599

    https://blog.csdn.net/u012943767/article/details/80899202

    https://www.cnblogs.com/ding2016/p/8282907.html

    https://blog.csdn.net/xuexi_39/article/details/83015813

    基本原理参考以下2篇:

    https://mp.weixin.qq.com/s/srOzKv12pla0xJpXTx-OcA

    https://www.cnblogs.com/cynchanpin/p/7339537.html

    前提条件:安装好java配置好环境变量、zookeeper集群(可参考:https://blog.csdn.net/qq_34258346/article/details/80423787)
    (3台服务器)
    10.1.32.85 hadoop7
    10.1.32.193 hadoop8
    10.1.32.161 hadoop9
    1、下载kafka的安装包
    cd home/hadoop/app/
    wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz
    或者直接把包上传上去

    2、解压安装包
    tar -zxvf kafka_2.12-0.10.2.1.tgz

    3、添加环境变量
    在vi /etc/profile文件下增加
    export KAFKA_HOME=/home/hadoop/app/kafka_2.12-0.10.2.1
    export PATH=$KAFKA_HOME/bin:$PATH
    使用source /etc/profile使配置生效
    4、修改kafka配置文件
    cd /home/hadoop/app/kafka_2.12-0.10.2.1/config/
    用ll查看目录下的文件,发现zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群。我们主要关注server.properties 这个文件即可

    修改server.properties配置文件,配置如下:
    broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    listeners=PLAINTEXT://10.1.32.85:9092 #kafka监听地址
    num.network.threads=3 #这个是borker进行网络处理的线程数
    num.io.threads=8 #这个是borker进行I/O处理的线程数
    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    log.dirs=/tmp/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    num.partitions=1 #默认的分区数,一个topic默认1个分区数
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    zookeeper.connect=10.1.32.85:2181,10.1.32.193:2181,10.1.32.161:2181 #设置zookeeper的连接端口
    zookeeper.connection.timeout.ms=6000 #连接zookeeper的超时时间

    上面是参数的解释,实际的修改项为:
    broker.id=1 每台服务器的broker.id都不能相同
    listeners=PLAINTEXT://10.1.32.85:9092
    在log.retention.hours=168下面新增下面三项
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    zookeeper.connect=10.1.32.85:2181,10.1.32.193:2181,10.1.32.161:2181 设置zookeeper的连接端口

    以上是hadoop7(10.1.32.85)一台机器的配置,同理在另外两台(hadoop8(10.1.32.193)、hadoop9(10.1.32.161))机器做相应的配置

    5、启动Kafka集群并测试
    从后台启动Kafka集群(3台都需要启动):
    kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.12-0.10.2.1/config/server.properties
    ————————————————
    版权声明:本文为CSDN博主「path哥」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_34258346/article/details/80436599

    二、常用命令

    1.创建topic

    ./kafka-topics.sh --zookeeper 100.168.222.17:2181,100.168.222.18:2181,100.168.222.19:2181/kafka --replication-factor 2 --partitions 1 --create --topic metric

    2.查看topic

    ./kafka-topics.sh --zookeeper 100.168.222.17:2181,100.168.222.18:2181,100.168.222.19:2181/kafka --list

    3.删除topic

    ./kafka-topics.sh --zookeeper 100.168.222.17:2181,100.168.222.18:2181,100.168.222.19:2181/kafka --delete --topic metric

    4.创建生产者(插入数据)

    ./kafka-console-producer --broker-list 100.168.222.17:9092,100.168.222.18:9092,100.168.222.19:9092 --topic metric

    5.创建消费者(老消费者)

    ./kafka-console-consumer.sh --zookeeper 100.168.222.17:2181,100.168.222.18:2181,100.168.222.19:2181/kafka --topic test --from-beginning

    6.创建消费者(新消费者)

    ./kafka-console-consumer --bootstrap-server 100.168.222.17:9092,100.168.222.18:9092,100.168.222.19:9092 --topic metric

    kafka中有两个设置的地方:对于老的消费者,由--zookeeper参数设置;对于新的消费者,由--bootstrap-server参数设置

    四、kafka的原理、基本流程 

    二、Kafka基本架构

    它的架构包括以下组件:

    1、话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名;

    2、生产者(Producer):是能够发布消息到话题的任何对象;

    3、服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群;

    4、消费者(Consumer):可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息;

    上图中可以看出,生产者将数据发送到Broker代理,Broker代理有多个话题topic,消费者从Broker获取数据。

    四、Zookeeper在kafka的作用

    简单说下整个系统运行的顺序:

    (1)启动zookeeper 的 server

    (2)启动kafka 的 server

    (3)Producer 如果生产了数据,会先通过 zookeeper 找到 broker,然后将数据存放到 broker

    (4)Consumer 如果要消费数据,会先通过 zookeeper 找对应的 broker,然后消费。

    五、kafka的存储机制(拓展篇,可不看)

    https://www.cnblogs.com/cynchanpin/p/7339537.html

  • 相关阅读:
    POJ 3114 Tarjan+Dijkstra
    278. First Bad Version
    209. Minimum Size Subarray Sum
    154. Find Minimum in Rotated Sorted Array II
    153. Find Minimum in Rotated Sorted Array
    710. Random Pick with Blacklist
    767. Reorganize String
    524. Longest Word in Dictionary through Deleting
    349. Intersection of Two Arrays
    350. Intersection of Two Arrays II
  • 原文地址:https://www.cnblogs.com/Agnes1994/p/12218575.html
Copyright © 2011-2022 走看看