zoukankan      html  css  js  c++  java
  • zookeeper+kafka集群安装之二

    版权声明:本文为博主原创文章,未经博主同意不得转载。 https://blog.csdn.net/cheungmine/article/details/26683941

    zookeeper+kafka集群安装之二

    此为上一篇文章的续篇, kafka安装须要依赖zookeeper, 本文与上一篇文章都是真正分布式安装配置, 能够直接用于生产环境.

    zookeeper安装參考:

    http://blog.csdn.net/ubuntu64fan/article/details/26678877

    首先了解几个kafka中的概念:

    1. kafka是一个消息队列服务器,服务称为broker, 消息发送者称为producer, 消息接收者称为consumer;
    2. 通常我们部署多个broker以提供高可用性的消息服务集群.典型的是3个broker;
    3. 消息以topic的形式发送到broker,消费者订阅topic,实现按需取用的消费模式;
    4. 创建topic须要指定replication-factor(复制数目, 通常=broker数目);
    5. 每一个topic可能有多个分区(partition), 每一个分区的消息内容不会反复:

    假定我们有一个名称为test的topic, 分区数目为2, 当我们发送到这个test详细的消息"msg1:hello beijing"和"msg2:hello shanghai"的时候,我们怎样知道消息的发送路径呢(发往哪个分区)?
    
    msg1如果被发送到分区test.1,则肯定不会发送到test.2. 数据发送路径选择决策受kafka.producer.Partitioner的影响:
    
    interface Partitioner {
        int partition(java.lang.Object key, int numPartitions);
    }

    一个伪代码的实现例如以下:

    package org.mymibao.mq.client;
    
    import kafka.producer.Partitioner;
    
    public class DefaultKafkaPartitioner implements Partitioner {
        private final static int FIRST_PARTITION_ID = 1;
    
        public int partition(Object key, int numPartitions) {
            return FIRST_PARTITION_ID;
        }
    }

    分区API依据相关的键值以及系统中具有的代理分区的数量返回一个分区id。

    将该id用作索引,在broker_id和partition组成的经过排序的列表中为对应的生产者请求找出一个代理分区。缺省的分区策略是hash(key)%numPartitions。如果key为null,那就进行随机选择。使用partitioner.class这个配置參数可用插入自己定义的分区策略.分区文件不会跨越broker,可是多个broker上能够有某个topic的分区副本.

    kafka安装配置參考:

    1)下载KAFKA

        $ wget http://apache.fayea.com/apache-mirror/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

    安装和配置參考上一篇文章:

    http://blog.csdn.net/ubuntu64fan/article/details/26678877

    2)配置$KAFKA_HOME/config/server.properties

    我们安装3个broker。分别在3个vm上:zk1。zk2,zk3:

    zk1:

    $ vi /etc/sysconfig/network

    NETWORKING=yes
    HOSTNAME=zk1

    $ vi $KAFKA_HOME/config/server.properties

    broker.id=0
    port=9092
    host.name=zk1
    advertised.host.name=zk1
    ...
    num.partitions=2
    ...
    zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

    zk2:

    $ vi /etc/sysconfig/network

    NETWORKING=yes
    HOSTNAME=zk2

    $ vi $KAFKA_HOME/config/server.properties

    broker.id=1
    port=9092
    host.name=zk2
    advertised.host.name=zk2
    ...
    num.partitions=2
    ...
    zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

    zk3:

    $ vi /etc/sysconfig/network

    NETWORKING=yes
    HOSTNAME=zk3

    $ vi $KAFKA_HOME/config/server.properties

    broker.id=2
    port=9092
    host.name=zk3
    advertised.host.name=zk3
    ...
    num.partitions=2
    ...
    zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

    3)启动zookeeper服务, 在zk1,zk2,zk3上分别执行:

    $ zkServer.sh start

    4)启动kafka服务, 在zk1,zk2,zk3上分别执行:

    $ kafka-server-start.sh $KAFKA_HOME/config/server.properties

    5) 新建一个TOPIC(replication-factor=num of brokers)

    $ kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181

    列出已有的topics:

    $ kafka-topics.sh --list --zookeeper zk1:2181

    6)如果我们在zk2上,开一个终端发送消息至kafka(zk2模拟producer)

    $ kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test

    在发送消息的终端输入:Hello Kafka

    7)如果我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)

    $ kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
    在消费消息的终端显示Hello Kafka

    8) 编程操作Producer和Consumer的样例參考:

    http://shift-alt-ctrl.iteye.com/blog/1930791




查看全文
  • 相关阅读:
    linux 查看系统负载:uptime
    centos who命令 查看当前登录系统用户信息
    centos7 管理开机启动:systemd
    Linux ethtool 命令
    Linux ifconfig 命令
    linux centos7 目录
    POJ 1169
    POJ 1163
    POJ 1154
    POJ 1149
  • 原文地址:https://www.cnblogs.com/ldxsuanfa/p/10570958.html
  • Copyright © 2011-2022 走看看