zoukankan      html  css  js  c++  java
  • kafka1 三种模式安装

    一 搭建单节点单broker的kafka集群

    注意:请打开不同的终端分别执行以下步骤

    1.复制安装包到/usr/local目录下,解压缩,重命名(或者软链接),配置环境变量

    [root@hadoop ~]# cd /usr/local/
    [root@hadoop local]# tar xzvf kafka_2.11-2.0.0.tgz 
    [root@hadoop local]# mv kafka_2.11-2.0.0 kafka
    [root@hadoop local]# ln -s kafka_2.11-2.0.0 kafka #软链接或者重命名执行一条即可
    [root@hadoop local]# vi /etc/profile
    添加变量 export KAFKA_HOME=/usr/local/kafka
    在PATH后添加 :$KAFKA_HOME/bin
    [root@hadoop local]# source /etc/profile
    
    [root@hadoop kafka]# echo $KAFKA_HOME #查看环境变量
    /usr/local/kafka

    2.启动服务器

    启动zookeeper

    [root@hadoop kafka]# zookeeper-server-start.sh config/zookeeper.properties
    [root@hadoop kafka]# jps #打开另一个终端查看是否启动成功
    3892 Jps
    3566 QuorumPeerMain

    启动kafka

    [root@hadoop kafka]# kafka-server-start.sh config/server.properties 

    3.创建topic

    #创建一个分区,一个副本的主题
    #副本数无法修改,只能在创建主题时指定
    [root@hadoop kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 
    Created topic "test".  
    
    [root@hadoop kafka]# kafka-topics.sh --list --zookeeper localhost:2181 #列出主题
    test

    可以通过zk的客户端观察zk的数据结构

    [root@hadoop kafka]# zkCli.sh  -server localhost:2181 #进入zk客户端
    Connecting to localhost:2181
    2018-07-31 14:27:24,876 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT
    2018-07-31 14:27:24,879 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=hadoop
    2018-07-31 14:27:24,880 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_11
    2018-07-31 14:27:24,882 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
    2018-07-31 14:27:24,882 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/java/jre
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.9.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:.:/usr/java/lib/dt.jar:/usr/java/lib/tools.jar:/usr/java/jre/lib
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/local/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.10.0-514.el7.x86_64
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
    2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/usr/local/kafka
    2018-07-31 14:27:24,888 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@67424e82
    Welcome to ZooKeeper!
    2018-07-31 14:27:25,037 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
    JLine support is enabled
    2018-07-31 14:27:25,131 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@876] - Socket connection established to localhost/127.0.0.1:2181, initiating session
    2018-07-31 14:27:25,153 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10000ded7830002, negotiated timeout = 30000
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] ls /
    [cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
    [zk: localhost:2181(CONNECTED) 1] ls  /brokers
    [ids, topics, seqid]
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
    [test]
    [zk: localhost:2181(CONNECTED) 3] get /brokers/topics/test
    {"version":1,"partitions":{"0":[0]}}
    cZxid = 0x22
    ctime = Tue Jul 31 14:22:42 CST 2018
    mZxid = 0x22
    mtime = Tue Jul 31 14:22:42 CST 2018
    pZxid = 0x24
    cversion = 1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 36
    numChildren = 1
    [zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/test 
    [partitions]
    [zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/test/partitions
    [0]
    [zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test/partitions/0
    [state]
    [zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions/0/state
    []
    [zk: localhost:2181(CONNECTED) 8] get /brokers/topics/test/partitions/0/state
    {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
    cZxid = 0x26
    ctime = Tue Jul 31 14:22:42 CST 2018
    mZxid = 0x26
    mtime = Tue Jul 31 14:22:42 CST 2018
    pZxid = 0x26
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 72
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 9] quit #退出
    Quitting...
    2018-07-31 15:01:53,761 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x10000ded7830002 closed
    2018-07-31 15:01:53,789 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@519] - EventThread shut down for session: 0x10000ded7830002
    [root@hadoop kafka]# 
    View Code

    4.发送消息

    [root@hadoop kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic test
    >hello world
    >how are you

    5.启动消费者

    [root@hadoop kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 
    hello world
    how are you

     最后查看一下进程

    [root@hadoop kafka]# jps
    3905 Kafka
    7637 ConsoleConsumer
    5702 ConsoleProducer
    3566 QuorumPeerMain
    9135 Jps

    二 搭建单节点多broker的kafka集群

    首先把上述终端全部关闭,重新打开不同的终端分别执行以下步骤

    1.创建并配置多个server配置文件

    注意:以下3个broker中,必须有一个broker的端口号是9092。
    这个是我踩过的坑,我刚开始建立了3个broker端口号分别为9093,9094,9095,然后消费消息时无论如何都不显示结果,在这卡了半天。
    我猜是因为consumer.properties、producer.properties、connect-distributed.properties等文件中默认bootstrap.servers=localhost:9092,如果3个broker中没有一个端口号是9092,需要将这些配置文件中的bootstrap.servers参数值的端口号均改为其中一个(比如9093),否则消费者无法消费消息。但是我试过修改这3个配置文件,竟然没用!!!难道是我猜错了?郁闷。。。
    后来我把其中一个端口号改为9092就好了。

    [root@hadoop ~]# cd /usr/local/kafka/config/
    [root@hadoop config]# cp server.properties server0.properties 
    [root@hadoop config]# cp server.properties server1.properties 
    [root@hadoop config]# cp server.properties server2.properties 
    [root@hadoop config]# vi server0.properties 
    broker.id=0
    listeners=PLAINTEXT://:9092
    log.dirs=/tmp/kafka-logs0
    [root@hadoop config]# vi server1.properties 
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs1
    [root@hadoop config]# vi server2.properties 
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs2

    2.启动服务器

    启动zookeeper

    [root@hadoop kafka]# zookeeper-server-start.sh config/zookeeper.properties 
    ...

    分别启动3个kafka服务器(可以在每条命令之后加 & 使命令在后台运行)

    [root@hadoop kafka]# kafka-server-start.sh config/server0.properties &
    [root@hadoop kafka]# kafka-server-start.sh config/server1.properties &
    [root@hadoop kafka]# kafka-server-start.sh config/server2.properties &

    查看

    [root@hadoop config]# jps #显示启动了zookeeper和3个kafka服务器
    12161 QuorumPeerMain
    13154 Kafka
    15940 Jps
    15609 Kafka
    12828 Kafka

    3.创建主题(3个副本)

    注意:副本数必须小于等于broker数

    [root@hadoop kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test02
    Created topic "test02".

    查看主题内容

    [root@hadoop kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02
    Topic:test02    PartitionCount:1    ReplicationFactor:3    Configs:
        Topic: test02    Partition: 0    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0

    还可以查看上一个主题内容

    [root@hadoop kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

    4.发布新消息给主题

    [root@hadoop kafka]# netstat -ano | grep 909 #查看3个kafka server的端口号(9092,9093,9094)是否存在
    tcp6       0      0 :::9092                 :::*                    LISTEN      off (0.00/0/0)
    tcp6       0      0 :::9093                 :::*                    LISTEN      off (0.00/0/0)
    tcp6       0      0 :::9094                 :::*                    LISTEN      off (0.00/0/0)
    tcp6       0      0 192.168.42.133:49292    192.168.42.133:9093     ESTABLISHED keepalive (7058.77/0/0)
    tcp6       0      0 192.168.42.133:50262    192.168.42.133:9094     ESTABLISHED keepalive (7075.16/0/0)
    tcp6       0      0 192.168.42.133:9092     192.168.42.133:54622    ESTABLISHED keepalive (7058.77/0/0)
    tcp6       0      0 192.168.42.133:9093     192.168.42.133:49292    ESTABLISHED keepalive (7058.77/0/0)
    tcp6       0      0 192.168.42.133:9094     192.168.42.133:50262    ESTABLISHED keepalive (7075.16/0/0)
    tcp6       0      0 192.168.42.133:50268    192.168.42.133:9094     ESTABLISHED keepalive (7157.08/0/0)
    tcp6       0      0 192.168.42.133:54622    192.168.42.133:9092     ESTABLISHED keepalive (7058.77/0/0)
    tcp6       0      0 192.168.42.133:9094     192.168.42.133:50264    ESTABLISHED keepalive (7075.16/0/0)
    tcp6       0      0 192.168.42.133:9094     192.168.42.133:50268    ESTABLISHED keepalive (7157.08/0/0)
    tcp6       0      0 192.168.42.133:50264    192.168.42.133:9094     ESTABLISHED keepalive (7075.16/0/0)
    
    [root@hadoop ~]# kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test02
    >hello kafka

    5.消费消息

    [root@hadoop kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test02 --from-beginning
    hello kafka

    注意:旧版本的kafka消费消息命令如下:

    [root@hadoop kafka]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic test02 --from-beginning 
    zookeeper is not a recognized option #新版本kafka移除了这个参数

    百度后发现,从 0.9.0.0版本后,kafka的consumer配置作了很大的改变,不仅增加了很多配置,而且把原来的zookeeper相关配置统统取消了。感兴趣的朋友可以查看官方文档中新旧consumer配置的对比

    6.容错测试

    在容错性测试之前,可以先将生产者和消费者关掉。

    a) 找到并杀死server2进程(broker 2是原来3个broker中的leader,现在我们杀死leader)

    [root@hadoop kafka]# ps -ef|grep server2.properties #找到server2的进程号为13154
    root 13154 12486 1 11:02 pts/4 00:00:33 ...信息太多,忽略.../server2.properties
    root 16228 14977 0 11:56 pts/6 00:00:00 grep --color=auto server2.properties
    [root@hadoop kafka]# kill
    13154 #杀死进程
    [root@hadoop kafka]# ps
    -ef|grep server2.properties #再次查看,进程号没有了 root 16239 14977 0 11:57 pts/6 00:00:00 grep --color=auto server2.properties
    [root@hadoop kafka]# jps #只剩下2个kafka了
    16240 Jps 12161 QuorumPeerMain 15609 Kafka 12828 Kafka

    b) 查看主题描述

    #主题还在,只是leader由原来的broker 2 变为了broker 1
    [root@hadoop kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02
    Topic:test02    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: test02    Partition: 0    Leader: 1    Replicas: 2,1,0    Isr: 1,0

    c) 启动消费者消费主题消息

    [root@hadoop kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test02 --from-beginning
    [2018-08-03 12:05:47,143] WARN [Consumer clientId=consumer-1, groupId=console-consumer-16224] Connection to node -3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
    hello kafka

    可以发现,消费者依然可以消费原来的消息。

    当然,如果生产者重新发布新消息,消费者同样也是可以消费的。

    三 搭建多节点多broker的kafka集群

    假设现在我们有3台服务器,分别为s101,s102,s103。

    首先在3台服务器上先建立zookeeper完全分布式集群,步骤参考我以前写的博客。

    接着安装kafka。

    1.先在s101服务器上安装kafka(步骤参考 一 搭建单节点单broker的kafka集群 第1步)

    2.将 kafka + 环境变量 拷贝到s102,s103服务器上。

    3.分别修改3个服务器下的server.properties文件

    [root@hadoop config]# vi server.properties #s101修改配置如下
    broker.id=101 #注意3个broker id不冲突即可
    log.dirs=/usr/local/kafka/kafka-logs #自己设置一个目录,但注意不要放在/tmp/临时目录下
    zookeeper.connect=s101:2181,s102:2181,s103:2181 
    [root@hadoop config]# vi server.properties #s102修改配置如下
    broker.id=102 
    log.dirs=/usr/local/kafka/kafka-logs 
    zookeeper.connect=s101:2181,s102:2181,s103:2181 
    [root@hadoop config]# vi server.properties #s103修改配置如下
    broker.id=103 
    log.dirs=/usr/local/kafka/kafka-logs 
    zookeeper.connect=s101:2181,s102:2181,s103:2181 

    4.创建log目录 mkdir /usr/local/kafka/kafka-logs 

    5.分别启动3个zookeeper以及3个kafka server即可。

  • 相关阅读:
    实现不限层级的Element的NavMenu
    vue_插槽的理解和使用
    vue 动态修改路由参数
    什么是回流,什么是重绘,有什么区别?
    Vue路由获取路由参数
    【前端图表】echarts实现散点图x轴时间轴
    为什么 char 数组比 String 更适合存储密码?
    MySQL 日期时间类型怎么选?千万不要乱用!
    MySQL not exists 真的不走索引么?
    谷歌开源的代码评审规范,值得借鉴!
  • 原文地址:https://www.cnblogs.com/zhengna/p/9395999.html
Copyright © 2011-2022 走看看