zoukankan      html  css  js  c++  java
  • 详谈kafka的深入浅出

    第一:kafka的介绍,kafka官网:http://kafka.apache.org/

    http://www.jasongj.com/2015/03/10/KafkaColumn1/

    kafka的简单介绍:

    1. kafka是一个流平台,所谓流平台:

    1. 允许发布和订阅记录流。在这方面类似消息队列和企业级的消息系统。
    2. 允许以容错的方式存储记录流。
    3. 允许以流的形式处理记录。

    2.kafka相关必须明白一下几个概念: 

      1.producer:
        生产者,发布消息到 kafka 集群中的服务或程序。

        Producer负责决定将数据发送到Topic的那个分区上。这可以通过简单的循环方式来平衡负载,或则可以根据某些语义来决定分区(例如基于数据中一些关键字)。
      2.broker:
        kafka 集群中包含的服务器,一个broker代表一台服务器。
      3.topic:
        每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。

        对于每个主题,Kafka会会维护一个如下所示的分区日志:

      

      每个分区是一个有序的,以不可变的记录顺序追加的Commit Log。分区中的每个记录都有一个连续的ID,称为Offset,唯一标识分区内的记录。

      Kafka集群使用记录保存时间的配置来保存所有已发布的记录(无论他们是否被消费)。例如,配置策略为两天,那么在一条记录发布两天内,这条记录是可以被消费的,之后将被丢弃以腾出空间。Kafka的性能和数据量无关,所以存储长时间的数据并不会成为问题。

      4.partition:
        partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
      5.consumer:
        从 kafka 集群中消费消息的程序或服务。

      

        实际上唯一需要保存的元数据是消费者的消费进度,即消费日志的偏移量(Offset)。这个Offset是由Consumer控制的:通常消费者会在读取记录时以线性方式提升Offset,但是事实上,由于Offset由Consumer控制,因此它可以以任何顺序消费记录。例如一个Consumer可以通过重置Offset来处理过去的数据或者跳过部分数据。

        这个特征意味着Kafka的Consumer可以消费“过去”和“将来”的数据而不对集群和其他Consumer不造成太大的影响。例如,可以使用命令行工具tail来获取Topic尾部的内容而不对已经在消费Consumer造成影响。

      6.Consumer group:
        总括:high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。

        详解:Consumer使用一个group name来标识自己的身份,每条被发送到一个Topic的消息都将被分发到属于同一个group的Consumer的一个实例中(group name相同的Consumer属于一个组,一个Topic的一条消息会被这个组中的一个Consumer实例消费)。Consumer实例可以在单独的进程中或者单独的机器上。

          如果所有的Consumer实例都是属于一个group的,那么所有的消息将被均衡的分发给每个实例。

          如果所有的Consumer都属于不同的group,那么每条消息将被广播给所有的Consumer。

          

        

    上图介绍:一个包含两个Server的Kafka集群,拥有四个分区(P0-P3),有两个Consumer group:Group A和Group B。Group有C1、C2两个Consumer,GroupB有C3、C4、C5、C6四个Consumer。

    更常见的是,Topic有少量的Consumer group,每一个都是“一个逻辑上的订阅者”。每个group包含多个Consumer实例,为了可伸缩性和容错性。这就是一个发布-订阅模式,只是订阅方是一个集群。

        Kafka中消费的实现方式是“公平”的将分区分配给Consumer,每一个时刻分区都拥有它唯一的消费者。Consumer成员关系有Kafka程度动态维护。如果新的Consumer加入了分区,那么它会从这个分区其他的Consumer中分配走一部分分区;如果部分Consumer实例宕机,它的分区会被其他Consumer实例接管。

        Kafka只保证同一个分区内记录的顺序,而不是同一个Topic的不同分区间数据的顺序。每个分区顺序结合按Key分配分区的能力,能满足大多数程序的需求。如果需要全局的顺序,可以使用只有一个分区的Topic,这意味着每个group只能有一个Consumer实例(因为一个分区同一时刻只能被一份Consumer消费——多加的Consumer只能用于容错)

      7.replica:
        partition 的副本,保障 partition 的高可用。
      8.leader:
        replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
      9.follower:
        replica 中的一个角色,从 leader 中复制数据。
      10.controller:
        kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。
      12.zookeeper:
        kafka 通过 zookeeper 来存储集群的 meta 信息。

      13.Kafka as a Messaging System(消息系统)

        消息传统上有两种模式:队列和发布-订阅。

          在队列中,一群Consumer从一个Server读取数据,每条消息被其中一个Consumer读取。

          在发布-订阅中,消息被广播给所有的Consumer。这两种模式有各自的优缺点。

                         模式区别:队列模式的优点是你可以在多个消费者实例上分配数据处理,从而允许你对程序进行“伸缩”。确定是队列不是多用户的,一旦消息被一个Consumer读取就不会再给其他Consumer。发布订阅模式允许广播数据到多个Consumer,那么就没办法对单个Consumer进行伸缩。

        Kafka的Consumer group包含两个概念。与队列一样,消费组允许通过一些进程来划分处理(每个进程处理一部分)。与发布订阅一样,Kafka允许广播消息到不同的Consumer group。

        Kafka模式的优势是每个Topic都拥有队列和发布-订阅两种模式。

        Kafka比传统的消息系统有更强的顺序保证。

        传统的消息系统在服务器上按顺序保存消息,如果多个Consumer从队列中消费消息,服务器按照存储的顺序输出消息。然后服务器虽然按照顺序输出消息,但是消息将被异步的传递给Consumer,所以他们将以不确定的顺序到达Consumer。这意味着在并行消费中将丢失消息顺序。传统消息系统通常采用“唯一消费者”的概念只让一个Consumer进行消费,但这就丢失了并行处理的能力。  

        Kafka做的更好一些。通过提供分区的概念,Kafka能提供消费集群顺序和负载的平衡。这是通过将分区分配个一个Consumer group中唯一的一个Consumer而实现的,一个分区只会被一个分组中的一个Consumer进行消费。通过这么实现,能让一个Consumer消费一个分区并按照顺序处理消息。因为存在多个分区,所有可以在多个Consumer实例上实现负载均衡。注意,一个分组内的Consumer实例数不能超过分区数。

      14. Kafka as a Storage System(存储系统)

        任何将发送消息和消费结构的消息队列都有效的用作一个消息的存储系统。不同的是Kafka是一个更好的存储系统。

        被写入到Kafka的数据将被写入磁盘并复制以保证容错。Kafka允许Producer等待确定,以保证Producer可以确认消息被成功持久化并复制完成。

        Kafka使用的存储结构,使其提供相同的能力,无论是存储50KB或者50TB持久化数据。

        因为允许客户端控制读取的位置,可以将Kafka视为高性能,低延迟的日志存储、复制、传播的分布式系统。

      15:Kafka for Stream Processing

    仅仅是读写和存储流数据是不够的,Kafka的目标是对流失数据的实时处理。
    
    在Kafka中,Stream Producer从输入的Topic中读取数据,执行一些操作,生成输出流到输出的Topic中。
    
    例如,零售的应用程序将收到销售和出货的输入流,并输出根据该数据计算的重排序和价格调整后的数据流。
    
    可以使用Producer和Consumer实现简单的处理。对于更复杂的转换,Kafka提供的完成的Stream API,允许构建将流中数据聚合或将流连接到一起的应用。
    
    这用于解决以下的一些困难:处理无需的数据,执行有状态的计算等。
    
    Stream API基于Kafka的核心函数古剑:使用Producer和Consumer API用于输入,使用Kafka作为有状态的存储,使用group机制来实现Stream处理器的容错。
    View Code

      16:Putting the Pieces Together

    消息、存储和流处理这种组合看是不寻常,但是Kafka作为流式平台这是必须的。
    
    类似HDFS的分布式文件系统存储静态的文件用于批处理。这种的系统允许存储和处理历史数据。
    
    传统的企业消息系统允许处理在你订阅之后的未来的数据。以这种方式构建的应用程序在未来数据到达时进行处理。
    
    Kafka组合这些能力,并且组合这些对Kafka作为流应用平台和流数据通道至关重要。
    
    通过组合存储和低延迟的订阅,流应用程序能以相同的方式处理过去和未来的数据。一个单一的程序可以处理过去的历史数据,并且不会在达到一个位置时停止,而是能继续处理将来到达的数据。这是一个广泛的流处理的概念,其中包含批处理和消息驱动的应用程序。
    
    同样,对于数据流通道,组合订阅机制和实时事件使Kafka成为非常低延迟的管道;数据的存储能力使其能和可能会进行停机维护的周期性处理数据的离线系统集成,或用于必须保证数据被确认交付的场景。流处理程序可以在数据到达后进行处理。
    View Code

    3.kafka的应用场景

    消息队列(MQ)

    KafKa可以代替传统的消息队列软件(阿里的队列软件RocketMQ就是基于KafKa实现的),在队列软件的选择上KafKa已经成了不二之选,使用KafKa来实现队列有如下优点

    • KafKa的append来实现消息的追加,保证消息都是有序的有先来后到的顺序,
    • 稳定性强队列在使用中最怕丢失数据,KafKa能做到理论上的写成功不丢失
    • 分布式容灾好
    • 容量大相对于内存队列,KafKa的容量受硬盘影响
    • 数据量不会影响到KafKa的速度

    就以上几点和笔者之前使用的Redis来承载队列服务要优秀的多,在后续文章的比较中会一一说明

    分布式日志系统(Log)

    在很多时候我们需要对一些庞大的数据进行存留,一些业务型公司可能永不上应为基本可以依靠数据库解决日志的问题,但是服务型公司比如jpush,云监控此类服务,日志存储这块会遇到巨大的问题,日志不能丢,日志存文件不好找,定位一条消息成本高(遍历当天日志文件),实时显示给用户难,这几类问题KafKa都能游刃有余

    • KafKa的集群备份机制能做到n/2的可用,当n/2以下的机器宕机时存储的日志不会丢失
    • KafKa可以对消息进行分组分片,并且通过offset可以做到获取中间莫一条消息(通过算法很容易的到莫个时段的日志)
    • KafKa非常容易做到实时日志查询,可以从日志尾部获取需要显示给用户查询的资料即可
    数据通道(Messaging)

    kafka特有的offset机制能够保证消息至少被获取一次,当程序在获取途中死亡这条消息会被认定为未被消费,下次会继续消费这条消息,此特性使得kafka可以作为一个保障数据传输的通道来使用,但是kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;所以kafka只能使用作为"常规"的消息系统

    第二:kafka的简单搭建机器使用

    声明:采用三台服务器安装kafka集群
    第一步:安装前准备,就不详细的说了
      首先保准三台机器的主机名可以ping通(配置主机名)
      安装java环境

      sudo add-apt-repository ppa:openjdk-r/ppa
      sudo apt-get update
      sudo apt-get install openjdk-8-jdk

      每台机器要安装上supervisor(防止挂掉)
      apt-get install supervisor -y

    第二步:安装

    下载kafka安装包,并将安装包(kafka_2.11-0.10.2.0.tgz)拷贝到三台机器,解压缩,移动解压缩后的文件夹到/usr/local/kafka(可以随意指定)目录
    校验和 051e5e16050c85ebdc40f3bbbc188317 kafka_2.11-0.10.2.0.tgz

    tar xvf kafka_2.11-0.10.2.0.tgz
    mv kafka_2.11-0.10.2.0 /usr/local/kafka

    第三步:修改zk配置(配置详细解释见下文)

    2. 修改/usr/local/kafka/config/zookeeper.properties。其中server.1,server.2,server.3根据部署的集群数量和 ip地址可以做调整的。(一下是需要修改的参数)

    tickTime=2000      #这个时间是Zookeeper服务集群之间的相互检查或客户端连接服务器之间的检查,也就是每个 tickTime 时间就会发送一个心跳。
    initLimit=10       #这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
    syncLimit=5        #这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是2*2000=4 秒
    # the directory where the snapshot is stored.
    dataDir=/data/zookeeper    # zk的数据目录
    dataLogDir=/data/log/zookeeper   #zk的日志目录
    # the port at which the clients will connect
    clientPort=2181        #客户端连接的端口,也就是zk监听的端口
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0        #0或者不设置,则每个ip连接zookeeper时的连接数没有限制。如果设置maxClientCnxns的值时需要把kafka server的连接数考虑进去,因为启动kafka server时,kafka server也会连接zookeeper的
    server.1=ip1:2888:3888  
    server.2=ip2:2888:3888
    server.3=ip3:2888:3888

      #注解:
      数字:第几号服务器,
      ip:该服务器的ip地址
      2888:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口
      3888:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口

    配置id号:

    1,2,3必须要对应zookeeper里的sever1的ip
    echo '1' > /var/zookeeper/myid  (ip1服务器上)
    echo '2' > /var/zookeeper/myid  (ip2服务器上)
    echo '3' > /var/zookeeper/myid  (ip3服务器上)

    第四步: 修改/usr/local/kafka/config/server.properties。其中broker.id配置ip1上面为0,ip2上面为1,ip3上面为2,listeners的ip替换成机器的ip。zookeeper.connect配置为三个节点的内网ip,端口号都保持默认即可(配置详细解释见下文)

    broker.id=0  #broker的标识,并且集群中不得重复
    delete.topic.enable=true  #打开这个参数是可以有选择的删除topic的,如果不开启,删除的动作是不会执行(根据自己的需求是否需要打开)
    listeners=PLAINTEXT://ip1:9093  # listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的
    log.dirs=/data/kafka/kafka-logs  #kafka的数据目录文件
    zookeeper.connect=ip1:2181,ip2:2181,ip3:2181 #连接zk的地址,这里可以写多个,也可以写一个

    第五步:配置supervisor,添加两个配置

    [program:zookeeper]
    command=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
    user=root
    autostart=true
    autorestart=true
    startsecs=3
    
    [program:kafka-server]
    command=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    user=root
    autostart=true
    autorestart=true
    startsecs=3

    把三台机器都配置完成后,先启动三台机器的zk,然后启动kafka

    第六步:创建topic

    创建topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic br0

    查看创建是否成功:

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic br0
    输出如下,注意Replicas有0,1,2,Irs有0,1,2,对应三个节点的broker.id
    Topic:br0    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic:br0    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

    第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区就只加了一行
    *Leader:负责处理消息的读和写,leader是从所有节点中随机选择的
    *Replicas:列出了所哟副本的节点,不管节点是否在服务中
    *Isr:正在服务中的节点

     检查通信:

    在ip2上面启动“消费者”,localhost换成ip2,回车之后会挂住,有消息来的时候会打印出来

    bin/kafka-console-consumer.sh --bootstrap-server ip2:9092 --from-beginning --topic br0

    在ip3上面启动“生产者”,localhost换成ip3,回车之后输入一些文字,敲回车

    bin/kafka-console-producer.sh --broker-list ip3 --topic br0
    test1

    观察ip2机器的“消费者”终端将会有消息打印

    到此部署结束

    第三:kafka的配置详解

    http://blog.csdn.net/u013035314/article/details/46741377

  • 相关阅读:
    How To Configure Server Side Transparent Application Failover [ID 460982.1]
    10g & 11g Configuration of TAF(Transparent Application Failover) and Load Balancing [ID 453293.1]
    AIX 系统介绍
    10g & 11g Configuration of TAF(Transparent Application Failover) and Load Balancing [ID 453293.1]
    Parameter DIRECT: Conventional Path Export Versus Direct Path Export [ID 155477.1]
    Linux下 数据文件 效验问题
    open_links_per_instance 和 open_links 参数说明
    Oracle 外部表
    Export/Import DataPump Parameter ACCESS_METHOD How to Enforce a Method of Loading and Unloading Data ? [ID 552424.1]
    Linux下 数据文件 效验问题
  • 原文地址:https://www.cnblogs.com/ylqh/p/7717861.html
Copyright © 2011-2022 走看看