zoukankan      html  css  js  c++  java
  • kafka 学习之初体验


    学习问题:

    1.kafka是否需要zookeeper?
    2.kafka是什么?
    3.kafka包含哪些概念?
    4.如何模拟客户端发送、接受消息初步测试?(kafka安装步骤)
    5.kafka cluster怎么同zookeeper交互的?

    1.kafka是否需要zoopkeeper

    kafka应用需要zookeeper,可以使用kafka安装包提供的zookeeper,也可以单独下载zookeeper

    2.kafka是什么.

    kafka是一个分布式消息系统。Kafka是一个 分布式的、可分区的、可复制的消息系统

    3.kafka常见的概念:

    product:生产者,向kafka发送的消息的程序,称为生成者

    consumer:消费者,从kafka订阅消息的程序,称为消费者

    topic:kafka以是topic为单位进行归纳消息的,或者就是一组收集好的消息

    broker:kafka的服务。kafka是怎么运行的,是集群形式运行,而集群可以由一个或者的多个服务组成,其中每个服务就称为一个broker

    partition:分区。一个top由一个或者多个分区组成。分区可以将一个topitc分移到多个地方存储,用于提高并行处理能力。

    replication:副本。一个分区由一个副本或者多个副本组成。副本用于分区的备份。


    4.安装步骤
    (1)下载kafka_2.10-0.9.0.0.tgz包,放在/usr/local目录下
       tar zxvf  kafka_2.10-0.9.0.0.tgz
       ln -sv kafka_2.10-0.9.0.0 kafka
    (2)配置java运行环境,kafka启动需要zookeeper,且zookeeper启动需要java运行环境。
       安装好好jdk 配置环境变量:JAVA_HOME 、PATH、CLASSPATH
       显示下面的即可
       [root@kafka bin]# echo $JAVA_HOME
       /usr/local/java/
       [root@kafka bin]# echo $CLASSPATH
       .:/usr/local/java//lib:/usr/local/java//jre/lib
       [root@kafka ~]# echo $PATH
       /usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/java//bin:/root/bin
    (3)进入到kafka目录,找到kafka配置目录下zookeeper配置文件,启动zookeeper 可以使用这里zookeeper.properties默认配置
    cd /usr/local/kafka/conf
    [root@kafka config]# ls zookeeper.properties
    zookeeper.properties
      
    [root@kafka config]# egrep -v '^#|^$' zookeeper.properties
    dataDir=/tmp/zookeeper    
    clientPort=2181
    maxClientCnxns=0
    (4)用kafka自带的脚本启动zookeeper,注意用脚本启动的时候要带上配置文件。可以从上面默认的配置文件看出zookeeper
    默认监听的端口是2181,用于提供消费者。consumer, 指定的Socket(localhost+2181),说明消费者的消息来自zookeeper(协调转发)
    启动zookeeper:
    [root@kafka kafka]# ./bin/zookeeper-server-start.sh  config/zookeeper.properties 

    [2016-07-08 21:52:14,446] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    [2016-07-08 21:52:14,449] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
    [2016-07-08 21:52:14,449] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
    [2016-07-08 21:52:14,449] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
    [2016-07-08 21:52:14,449] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
    [2016-07-08 21:52:14,475] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    [2016-07-08 21:52:14,475] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
    [2016-07-08 21:52:14,490] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:host.name=kafka (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:java.version=1.7.0_71 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:java.home=/usr/local/java/jre (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:java.class.path=.:/usr/local/java//lib:/usr/local/java//jre/lib:/usr/local/kafka/bin/../libs/jetty-security-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0-scaladoc.jar:/usr/local/kafka/bin/../libs/zookeeper-3.4.6.jar:/usr/local/kafka/bin/../libs/slf4j-log4j12-1.7.6.jar:/usr/local/kafka/bin/../libs/kafka-clients-0.9.0.0.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0-javadoc.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0-sources.jar:/usr/local/kafka/bin/../libs/lz4-1.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-0.9.0.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-0.9.0.0.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.1.7.jar:/usr/local/kafka/bin/../libs/kafka-tools-0.9.0.0.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.6.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.1.jar:/usr/local/kafka/bin/../libs/javax.annotation-api-1.2.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.5.0.jar:/usr/local/kafka/bin/../libs/jetty-io-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0.jar:/usr/local/kafka/bin/../libs/jersey-common-2.22.1.jar:/usr/local/kafka/bin/../libs/jersey-media-jaxb-2.22.1.jar:/usr/local/kafka/bin/../libs/log4j-1.2.17.jar:/usr/local/kafka/bin/../libs/jersey-client-2.22.1.jar:/usr/local/kafka/bin/../libs/jetty-http-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/connect-file-0.9.0.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-3.2.jar:/usr/local/kafka/bin/../libs/zkclient-0.7.jar:/usr/local/kafka/bin/../libs/javax.inject-1.jar:/usr/local/kafka/bin/../libs/jetty-util-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/usr/local/kafka/bin/../libs/scala-library-2.10.5.jar:/usr/local/kafka/bin/../libs/connect-json-0.9.0.0.jar:/usr/local/kafka/bin/../libs/connect-api-0.9.0.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/javassist-3.18.1-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/jetty-server-9.2.12.v20150709.jar:/usr/local/kafka/bin/../libs/argparse4j-0.5.0.jar:/usr/local/kafka/bin/../libs/jackson-core-2.5.4.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.5.4.jar:/usr/local/kafka/bin/../libs/jersey-server-2.22.1.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.0.1.jar:/usr/local/kafka/bin/../libs/jersey-guava-2.22.1.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.5.4.jar:/usr/local/kafka/bin/../libs/validation-api-1.1.0.Final.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.22.1.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/usr/local/kafka/bin/../libs/javax.inject-2.4.0-b31.jar:/usr/local/kafka/bin/../libs/kafka_2.10-0.9.0.0-test.jar (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:os.version=2.6.32-431.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,490] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,491] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,491] INFO Server environment:user.dir=/usr/local/kafka_2.10-0.9.0.0 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,500] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,500] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,500] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
    [2016-07-08 21:52:14,511] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
    提示信息说明zookeeper已经启动 可以用命令jps、netstat验证进程或者端口
    [root@kafka ~]# jps |grep -vi jps
    21380 QuorumPeerMain         #zookeeper进程
    [root@kafka ~]# netstat -tlnp|grep 2181
    tcp        0      0 :::2181                     :::*                        LISTEN      21380/java    #zookeeper服务端口

    (3)用kafka自带脚本启动kafka,带上kafka配置文件
    [root@kafka config]# egrep -v '^$|^#'  server.properties
    broker.id=0                 #服务id
    listeners=PLAINTEXT://:9092
    port=9092                   #kafka默认监听端口
    host.name=127.0.0.1         #主机名
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs    #kafka的日志文件
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=127.0.0.1:2181      #zookeeper集群
    zookeeper.connection.timeout.ms=6000  #zookeeper连接超时时间
    启动kafka:
    [root@kafka kafka]# ./bin/kafka-server-start.sh  config/server.properties
    启动之后这里有一串信息。
    同样可以通过jps命令或者netstat命令验证kafka启动情况。
    [root@kafka ~]# jps |grep -i kafka
    21646 Kafka
    [root@kafka ~]# netstat -tlnp|grep 9092
    tcp        0      0 :::9092                     :::*                        LISTEN      21646/java
    这样(3)、(4)就启动zookeeper和kafka应用了。
    (4)模拟客户端发送、接受消息初步测试。先必须创建一个topic
    启动创建topic脚本:./kafka-topics.sh --create --zookeeper localhost:9092 --partitions 1 --replication-factor 1  --topic  kafka_test02
    解释下,kafka-topics.sh是kafka安装包自动的topic脚本,其用法可以--help
    --create #创建
    --zookeeper localhost:9092  #指定--zookeeper 为consumer提供服务端口。
    --partitions 1    #创建一个分区,因为topic必须要有一个分区或者多个分区,这里创建一个。
    --replication-factor  #副本,为分区的副本
    --replication-factor  #指定新创建的topic名
    执行后。
    [root@kafka kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-test02
    Created topic "kafka-test02".  #显示topic kafka-test02成功。
    另外可以通过命令查看现有的

    [root@kafka kafka]#  bin/kafka-topics.sh --list  --zookeeper localhost:2181
    --topic
    kafak-test
    kafka-test02
    my_first_topic
    (5)用kafka自带一个kafka-console-producer.sh 创建一个product的xshell,用
    .bin/kafka-console-producer.sh   --broker-list localhost:9092  --topic  kafka-test02
    #  kafka-test02是刚刚创建的topic,producer,指定的Socket(localhost+9092),说明生产者的消息要发往kafka,也即是broker。
    [root@kafka kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092  --topic  kafka-test02
    hello world       #在product的shell发送hello world


    bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic kafka-test02 --from-beginning
    #consumer,  指定的Socket(localhost+2181),说明消费者的消息来自zookeeper(协调转发)


    [root@kafka kafka]# bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic kafka-test02 --from-beginning

    hello world       #在consumer的shell收到hello world 信息

    (5)jps命令查看producer、consumer
    [root@kafka ~]# jps
    22380 ConsoleProducer
    22468 ConsoleConsumer
    22575 Jps
    21646 Kafka
    21380 QuorumPeerMain
    这里实验的是建立的一个服务broker上面的。


    5。单节点的多个broker集群
    启动多个broker服务,此时的kafka配置文件需要重新修改。
    [root@kafka config]# egrep -v '^$|^#'  server.properties
    broker.id=0                 #服务id
    listeners=PLAINTEXT://:9092
    port=9092                   #kafka默认监听端口
    host.name=127.0.0.1         #主机名
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs    #kafka的日志文件
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=127.0.0.1:2181      #zookeeper集群
    zookeeper.connection.timeout.ms=6000  #zookeeper连接超时时间

    参考博客:http://www.aboutyun.com/thread-12847-1-1.html

  • 相关阅读:
    五个知识体系之-Linux常用命令学习
    测试职业生涯中,五个知识体系
    英语:真正有效的英语学习心得,把英语当母语学习!(转载)
    侧滑面板(对viewGroup的自定义)
    安卓程序员要拿到5000和1w的薪资,分别需要掌握哪些技术?
    轻巧级记事本的开发
    web.xml 中的listener、 filter、servlet 加载顺序及其详解
    如何向android studio中导入第三方类库
    【NPR】卡通渲染
    线程池原理及其实现
  • 原文地址:https://www.cnblogs.com/the-study-of-linux/p/5654867.html
Copyright © 2011-2022 走看看