zoukankan      html  css  js  c++  java
  • linux下kafka与zookeeper集群部署

    *********************************配置主机名,通过主机名连接机器*********************************

    比如说,已经有了三台主机

    1,在linux上设置hostname,通过hostname来访问linux虚拟机

    1.1. 修改hosts文件

    vim /etc/hosts
    
    #/etc/hosts 的内容一般有如下类似内容:
    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    192.168.202.156    node1
    192.168.202.157    node2
    192.168.202.158    node3

    node1我当时没有专门加这一行,而是直接在127.0.0.1后面,把localhost.localdomain修改为 node1

    1.2. 修改network

    修改配置文件/etc/sysconfig/network
    修改HOSTNAME=yourname
    
    NETWORKING=yes
    HOSTNAME=node1

    然后三台机器重启,reboot

    重启后,ssh node2 ,发现能通过主机名字,连上

    *********************************不同机器间,免密访问*********************************

    通过secureCRT,send commands to all sessions,可以达到一个输入,在多个linux中响应

    免密访问可以看 http://blog.chinaunix.net/uid-26284395-id-2949145.html

    1、ssh-keygen

    2、ssh-copy-id -i  /root/.ssh/id_rsa.pub node1  (更换node2、3,然后一共重复三遍,将每台机器的publickey放到三台机器中)

    最后,可以查看 cat /root/.ssh/authorized_keys 是否有node1、2、3,有的话就是可以

    通过ssh node1、2、3,可以分别连上三台机器。

    *********************************安装clustershell*********************************

    我的linux是CentOS6.5

    去下载包 clustershell-1.6-1.el6.noarch.rpm — RPM RHEL6/CentOS6/SL6

    https://github.com/cea-hpc/clustershell/downloads

    执行命令,安装:rpm -ivh clustershell-1.6-1.el6.noarch.rpm

    安装成功后,

    vim /etc/clustershell/groups
    
    在groups里面加一个组
    
    kafka: node[1-3]

    这样就把node[1-3] 加入到kafka这个组里面。

    这样,clustershell 安装成功

    clush  -g kafka -c /opt/kafka

    可以将/opt/kafka复制到集群中这个组中去

    *********************************安装zookeeper,并启动*********************************

    cd zookeeper-3.4.10
    
    cd conf/
    
    cp zoo_sample.cfg zoo.cfg
    
    vim zoo.cfg 
    加入:
    server.1=node1:2888:3888
    server.2=node2:2888:3888
    server.3=node3:2888:3888
    
    clush -g kafka -c zoo.cfg 
    
    clush -g kafka mkdir /tmp/zookeeper
    
    echo "1" > /tmp/zookeeper/myid
    
    [root@node1 conf]# clush -g kafka cat /tmp/zookeeper/myid 
    node3: 3
    node2: 2
    node1: 1
    
    [root@node1 zookeeper-3.4.10]# clush -g kafka "/opt/kafka/zookeeper-3.4.10/bin/zkServer.sh start /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg "
    node1: ZooKeeper JMX enabled by default
    node1: Using config: /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg
    node2: ZooKeeper JMX enabled by default
    node3: ZooKeeper JMX enabled by default
    node2: Using config: /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg
    node3: Using config: /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg
    node1: Starting zookeeper ... STARTED
    node2: Starting zookeeper ... STARTED
    node3: Starting zookeeper ... STARTED
    
    
    [root@node1 zookeeper-3.4.10]# clush -g kafka "/opt/kafka/zookeeper-3.4.10/bin/zkServer.sh status /opt/kafka/zookeeper-3.4.10/conf/zoo.cfg "
    通过看各个节点的状态,验证zookeeper集群是否启动成功
    也可以通过看 2181/2888/3888这几个端口是否都被占用来验证

    如果没有启动成功,那就可能是防火墙的问题,吧防火墙关了即可

    clush -g kafka service iptables stop

    接下来,可以看看三台机器数据是不是同步的:
    在 node1 上,用 zookeeper 的客户端工具,连接服务器
    bin/zkCli.sh -server node1:2181
    #
    #
    #
    #
    ls /
    会看到 / 下面的一些东西
    也可以创建一个节点,并给他一个值hello:
    create /test hello
    ls / 可以看一下
    然后在 node2 上,如果可以看到node1 创建的数据,说明数据是同步一致的:
    bin/zkCli.sh -server node1:2181
    get /test 可以看到刚才输入的hello
    通过quit可以退出

    *********************************安装kafka,并启动*********************************

    安装:
    修改server.properties 
    broker.id=1
    zookeeper.connect=node1:2181,node2:2181,node3:2181
    修改完成后,分发到集群中
    并单独修改broker.id=2 、3 之类
    
    在三台机器上启动:
    bin/kafka-server-start.sh -daemon config/server.properties
    启动后,查看9092端口是否被监听
    lsof -i:9092


    在node1上创建消费者,接收消息

    创建一个topic: [root@node1 kafka_2.
    10-0.10.2.1]# bin/kafka-topics.sh --zookeeper node1:2181 --topic topic1 --create --partitions 3 --replication-factor 2 Created topic "topic1". 查看这个topic [root@node1 kafka_2.10-0.10.2.1]# bin/kafka-topics.sh --zookeeper node1:2181 --topic topic1 --describe Topic:topic1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: topic1 Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic1 Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2 创建一个consumer,去接收生产者的消息 [root@node1 kafka_2.10-0.10.2.1]# bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic topic1 Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. 1 hello
    在node2上创建生产者,生产消息
    [root@node2 kafka_2.
    10-0.10.2.1]# bin/kafka-console-producer.sh --broker-list node2:9092 --topic topic1 1 hello
    查看已有的topic
    bin/kafka-topics.sh --list --zookeeper node1:2181
    Furthermore, ConsumerOffestChecker shows a row for each topic partition. Your topic topic5 does have some partitions.
    • Pid: partition ID
    • Offset: the latest committed offset for a partition for the corresponding consumer group
    • logSize: the number of messages stored in the partition
    • Lag: the number of not yet consumed message for a partition for the corresponding consumer group (ie, lag = logSize - offset)
    • Owner: unique ID of the running consumer thread

    [orco@node1 kafka_2.10-0.10.1.1]$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper node1 --topic topic5 --group group1
    [
    2017-07-26 11:39:16,748] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Group Topic Pid Offset logSize Lag Owner group1 topic5 0 0 0 0 none group1 topic5 1 10 10 0 none group1 topic5 2 0 0 0 none

    有点记不清,eclipse中使用java api 调用kafka服务,好像额外需要在service.properties中修改下面这个

    #listeners=PLAINTEXT://:9092

    listeners=PLAINTEXT://192.168.202.156:9092

    或者是

    listeners=PLAINTEXT://node1:9092

    不同机器,不同的node2 node3等等

    这个博客主要是javaEE相关或者不相关的记录, hadoop与spark的相关文章我写在下面地址的博客啦~ http://www.cnblogs.com/sorco
  • 相关阅读:
    php 压缩文件 zip
    php 创建返回结果配置文件 实例
    php 生成xml文件
    php 获取读取文件内容
    基于JAVA语言的多线程技术
    Java HTTP请求
    TCP与UDP
    VC6.0 调试.dll文件
    [JNI] Java 调用 C++ dll
    HTTPS与SSL
  • 原文地址:https://www.cnblogs.com/orco/p/6844757.html
Copyright © 2011-2022 走看看