zoukankan      html  css  js  c++  java
  • 1、Kfaka 部署

    一、使用Kfaka自带的zookeeper服务。

    1、下载Kafka,下载地址:

    http://kafka.apache.org/downloads

    2、区分Kfaka版本:

      Scala 2.12 - kafka_2.12-2.2.1.tgzascsha512

    Kadka后面的 2.12是对应的scala版本,2.2.1是kafka版本。

    3、安装

    #tar zxvf kafka_2.12-2.1.0.tgz -C /data/kafka/
    #ln -s kafka_2.12-2.1.0 kafka
    #cd /data/kafka

    4、启动服务

    启动zookeeper服务
    #./bin/zookeeper-server-start.sh config/zookeeper.properties
    启动kafka服务
    #./bin/kafka-server-start.sh config/server.properties

    5、查看kafka是否启动成功,kafka默认启动在9092端口

    [root@test kafka 14:26:29]#netstat -anltp|grep 9092
    tcp        0      0 0.0.0.0:9092            0.0.0.0:*               LISTEN      113895/java         
    tcp        0      0 127.0.1.1:9092          127.0.0.1:36251         ESTABLISHED 113895/java         
    tcp        0      0 127.0.0.1:36192         127.0.1.1:9092          ESTABLISHED 115239/java         
    tcp        0      0 127.0.1.1:9092          127.0.0.1:36250         ESTABLISHED 113895/java         
    tcp        0      0 127.0.0.1:36251         127.0.1.1:9092          ESTABLISHED 117408/java         
    tcp        0      0 127.0.1.1:9092          127.0.0.1:36192         ESTABLISHED 113895/java         
    tcp        0      0 127.0.0.1:36191         127.0.1.1:9092          ESTABLISHED 114541/java         
    tcp        1      0 127.0.0.1:41400         127.0.1.1:9092          CLOSE_WAIT  113895/java         
    tcp        0      0 127.0.0.1:36250         127.0.1.1:9092          ESTABLISHED 117408/java         
    tcp        0      0 127.0.1.1:9092          127.0.0.1:36191         ESTABLISHED 113895/java

    6、创建topic主题

    1>>>  创建topic
    #./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic yjt Created topic "yjt".
    参数解释:
    --create:创建。
    --zookeeper: 指定zookeeper服务器,这里使用的是kafka自带的zookeeper服务器,所以使用localhost+端口的方式,如果是自己配置的zookeeper服务,这里需要使用主机+端口的方式。
    --replication-factor:指定副本数量,主要是为了数据的冗余。
    --partitions:指定主题的分区数量。
    --topic:指定主题的名字,这里是yjt。

    2>>> 查看当前以及存在的主题:

      #./bin/kafka-topics.sh --list --zookeeper localhost:2181    #这里使用了--list选项。列出服务器上存在的topic。

      __consumer_offsets
      connect-test
      my-topic
      test
      yjt

     3>>> 查看某一个topic的信息。

      #./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic yjt   #这里使用了--describe选项。
      Topic:yjt PartitionCount:1 ReplicationFactor:1 Configs:
      Topic: yjt Partition: 0 Leader: 0 Replicas: 0 Isr: 0

    解释:
    第一行显示的是所有分区的摘要,其次,每一行显示一个分区,这里分区为1,所有只显示了一行(既第二行):
    Topic:主题名字
    PartitionCount:当前主题所有分区数量,刚刚在创建主题的时候,只指定了一个分区,所以,这里显示1。
    ReplicationFactor:副本数量。同上。
    第二行:
    Leader:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的(注:针对于分区,也就是说,每一个分区都有一个leader,并且leader可能都不一样)。
    Replicas:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
    Isr(a set of in-sync replicas)也叫同步状态的副本集合:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。

    7、发布一些消息到yjt这个topic上

    #./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic yjt   #使用--broker-list指定kafaka服务器的地址,这里的9092是kafka默认的端口,如果在配置文件里面有改动,这里修改成对应的端口就行。
    >My name is yjt.
    >I am xxx years old.

    8、消费上面发布的消息。

    #./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yjt --from-beginning #使用--bootstrap-server指定服务器,同时使用--from-beginning选项指定从头开始读取。
    My name is yjt.
    I am xxx years old.

    9、如何在一台服务器上面配置多个kafka实例?也就是创建多个broker(broker可以理解为Kafka Server)。这里在创建两个broker。

      1)、复制配置文件,修改配置文件。

      复制配置文件:

    #cp config/server.properties config/server-1.properties
    #cp config/server.properties config/server-2.properties

      修改配置文件:

    config/server-1.properties: 
        broker.id=1         # broker Id,全局唯一,不能重复。
        listeners=PLAINTEXT://:9093 #监听地址,也可以是listeners=PLAINTEXT://主机或者IP:9093这种方式。
    log.dir=/tmp/kafka-logs-1 #日志文件
    config/server-2.properties: 
      broker.id
    =2
      listeners
    =PLAINTEXT://:9094
      log.dir=/tmp/kafka-logs-2

    10、启动多实例的broker。

    #./bin/kafka-server-start.sh config/server-1.properties &  #以后台启动,但是好像没什么卵用,控制台还是啪啪啪的输出一大堆信息,害的我开了好几个终端。
    #./bin/kafka-server-start.sh config/server-2.properties & > /dev/null 2>&1  #这种方式启动好像也是一样,不过没试nohup。

    启动多实例以后就可以在创建主题的时候指定多个分区了,当然,单个实例也可以启动多个分区。然而副本数量一定要<=broker数量。

    例如创建四个分区的topic

    #./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic yjt1
    #./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic yjt1
    Topic:yjt1    PartitionCount:4    ReplicationFactor:1    Configs:
    Topic: yjt1    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
    Topic: yjt1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0
    Topic: yjt1    Partition: 2    Leader: 1    Replicas: 1    Isr: 1
    Topic: yjt1    Partition: 3    Leader: 2    Replicas: 2    Isr: 2
    上述信息具体参考上面的解释。

    11、使用Kafka Connect 来导入导出数据。

      从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。

      kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。

    1)、首先,创建一些种子数据用来测试。

    #pwd
    /data/kafka/kafka
    #echo -e "foo bar" > test.txt

    2)、接下来,我们开始2个连接器运行在独立的模式,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。首先是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需的任何其他配置。

    #./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

    kafka附带了这些示例的配置文件,并且使用了刚才我们搭建的本地集群配置并创建了2个连接器:第一个是源连接器,从输入文件中读取并发布到Kafka主题中,第二个是接收连接器,从kafka主题读取消息输出到外部文件。

    在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始。流程如下:如下是默认值,如果想要改变,请修改(config/connect-file-source.properties config/connect-file-sink.properties这两个文件)

    1>>> 导入连接器应该读取从

    test.txt                 

    2>>> 创建和写入到topic:(默认情况下,当主题不存在的时候,可以自动创建)

    connect-test

    3>>> 从主题导导出连接器

    connect-test

    4>>> 写入到文件

    test.sink.txt

    3)查看文件信息:

    #cat test.sink.txt 
    foo
    bar

    4)消费信息

    #./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}

    二、使用外置的zookeeper服务。

    外置的zookeeper服务搭建过程就不说了,只是在搭建过程中需要注意zookeeper服务器的奇数性(数量为奇数),和myid文件的配置。

    zookeeper配置文件信息如下:

    # egrep -v "(^#|^$)" zoo.cfg 
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/data1/hadoop/data/zookeeper/data
    dataLogDir=/data1/hadoop/data/zookeeper/logs
    clientPort=2181
    server.1=master:2888:3888
    server.2=slave1:2888:3888
    server.3=slave2:2888:3888

    解释:

    tickTime:zookeeper客户端连接服务端心跳超时时间,默认是2s。
    initLimit:zookeeper  flower启动初始化,从leader同步数据的超时时间,这个时间是tickTime的倍数,既这里是2*2000
    syncLimit:在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。如果L发出心跳包在syncLimit之后,还没有从Flower那里收到响应,那么就认为这个Flower已经不在线了。注意:不要把这个参数设置得过大,否则可能会掩盖一些问题。


    配置了外置的zookeeper以后,在启动Kafka集群时,需要在server的配置文件里面添加连接信息,例如
    #sed -i "s/zookeeper.connect=localhost:2181/zookeeper.connect=master:2181,slave1:2181,slave2:2181/g" config/server.properties

    三、完全分布式部署

    所谓的完全分布式模式,就是在每一台服务器上启动一个实例。

    例如:

    我有master、slave1、slave2三台机器,这三台机器已经安装好zookeeper,免密等操作。

    在每一台服务器配置server文件

    config/server.properties: 
      broker.id=0    #必须修改这个值,全局唯一。其他的值像端口,日志存储等,都可以使用默认的配置,像上面的在一台服务器启动多个实例的时候,需要修改下面的两个配置(端口和日志路径,防止冲突),但是在完全分布式模式下,也可以不用修改。
      listeners=PLAINTEXT://:9092
      log.dir=/tmp/kafka-logs
    host.name=master #主机名,每个server都不同,需要修改。

    当然,可以自己编写服务的启动脚本,配置环境变量,编写拷贝安装包到其他节点的脚本。这里推荐使用pssh或者pdsh,当然也可以使用大型的自动化运维工具,像puppt、ansible、saltstack等工具。需要免密。

    四、配置文件参数

    1、server.properties

    //当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    broker.id=0
    //当前kafka对外提供服务的端口默认是9092
    port=9092
    //这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    host.name=master
    //这个是borker进行网络处理的线程数
    num.network.threads=3
    //这个是borker进行I/O处理的线程数
    num.io.threads=8
    //发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.send.buffer.bytes=102400
    //kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.receive.buffer.bytes=102400
    //这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    socket.request.max.bytes=104857600
    //消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,
    //如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    log.dirs=/tmp/kafka-logs
    //默认的分区数,一个topic默认1个分区数
    num.partitions=1
    //每个数据目录用来日志恢复的线程数目
    num.recovery.threads.per.data.dir=1
    //默认消息的最大持久化时间,168小时,7天
    log.retention.hours=168
    //这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.segment.bytes=1073741824
    //每隔300000毫秒去检查上面配置的log失效时间
    log.retention.check.interval.ms=300000
    //是否启用log压缩,一般不用启用,启用的话可以提高性能
    log.cleaner.enable=false
    //设置zookeeper的连接端口
    zookeeper.connect=master:2181,slave1:2181,slave2:2181
    //设置zookeeper的连接超时时间
    zookeeper.connection.timeout.ms=6000

     2、producer.properties

    metadata.broker.list=master:9092,slave1:9092,slave2:9092

    3、consumer.properties

    zookeeper.connect=master:2181,slave1:2181,slave2:2181
     

     推荐一些关于Kafka的博客:

    https://www.orchome.com/6

    https://www.cnblogs.com/qingyunzong/p/9004509.html(如果英文不好,该博客可以很好的学习kafka)

    当然,要是可以看懂英文文档,直接浏览官网学习是最好的。

  • 相关阅读:
    最近这段时间我,想在2008 的基础上,写2011 有的工具 不知道会样,这次又机会研究ploy
    Screen 可以查找屏幕pos系类的函数
    笔记1
    Ubuntu下如何安装 tar.bz2 文件
    安装ubuntu遇到“BusyBox”问题
    android luancher 如何添加快捷方式
    转 Android 源代码结构
    修改apk图标
    Linux Ubuntu 下如何安装 .SH文件
    解放你的电源键!!不用刷机不用装软件!超简单修改搜索锁屏、HOME键唤醒~~~~~
  • 原文地址:https://www.cnblogs.com/yjt1993/p/11017242.html
Copyright © 2011-2022 走看看