zoukankan      html  css  js  c++  java
  • Kafka学习笔记

    1.生产者消费者模式

    2.Kafka的设计和架构

    1.Kafka基本概念

    Producer:消息和数据的生产者,向Kafka的一个topic发布消息的进程/代码/服务

    Consumer:消息和数据的消费者,订阅数据(Topic)并且处理其发布的消息的进程/代码/服务

    Consumer Group:逻辑概念,对于同一个topic,会广播给不同的group,一个group中,只有一个consumer可以消费该消息

    Broker:物理概念,Kafka集群中的每个Kafka节点

    Topic:逻辑概念,Kafka消息的类别,对数据进行区分、隔离

    Partition:物理概念,Kafka下数据存储的基本单元。一个topic数据,会被分散存储到多个Partition,每一个Partition是有序的

    Replication(副本、备份):同一个Partition可能会有多个Replication,多个Replication之间数据是一样的

    2.Kafka概念延伸

    Replication特点:

    Replication的基本单位是Topic的Partition

    所有读和写都从Leader进,Followers只作备份

    Follower必须能够及时复制Leader的数据

    当集群中有Broker挂掉的情况,系统可以主动地使Replication提供服务

    系统默认设置每一个Topic的Replication系数为1,可以在创建Topic时单独设置

    每一个Topic被切分为多个Partitions(Partition属于消费者存储的基本单位)

    消费者数目小于或等于Partition的数目(多个消费者若消费同个Partition会出现数据错误,所有Kafka如此设计)

    Broker Group中的每一个Broker保存Topic的一个或多个Partitions

    3.Kafka的基本结构

    kafka消息结构:

    Offset: 消息的偏移量

    Length: 消息的长度

    CRC32 : 消息校验字段,校验信息的完整性

    Magic:  用于判断该消息是不是kafka消息

    attributes: 可选字段,存放当前消息的属性

    4.Kafka的特点

    分布式:多分区、多副本、多订阅者、基于Zookeeper调度

    高性能:高吞吐量、低延迟、高并发、时间复杂度为O(1)

    持久性与扩展性:数据可持久化、容错性、支持在线水平扩展、消息自动平衡

    3.Kafka应用场景及操作

    1.Kafka应用场景

    消息队列

    行为跟踪

    元信息监控

    日志收集

    流处理

    2.Kafka安装与启动(单机版)

    官方文档:http://kafka.apache.org/quickstart#quickstart_createtopic

    解压tgz包,2.11-2.0.0版本自带Zookeeper

    tar -xzf kafka_2.11-2.0.0.tgz
    cd kafka_2.11-2.0.0

    用SecureCRT克隆会话,方便查看日志,启动Zookeeper

    bin/zookeeper-server-start.sh config/zookeeper.properties

    克隆会话,启动Kafka

    bin/kafka-server-start.sh config/server.properties

    创建Topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    查看Topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181

    启动生产者

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    启动消费者

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    用jps命令查看已经启动的Java进程,查看服务是否正确

    root@ubuntu:~/kafka/kafka_2.11-2.0.0# jps
    43427 ConsoleConsumer
    42291 ConsoleProducer
    41043 QuorumPeerMain
    43719 Jps
    41356 Kafka

    在Producer输入文字,在Consumer中可以即时收到

    Producer:

     Consumer:

     

    3.Kafka安装与启动(集群版)

    环境准备

    kafka版本:kafka_2.12-2.2.0
    三台主机IP 192.168.94.140,192.168.94.139,192.168.94.138

    配置工作

    1、修改hosts
    由于zookeeper是通过主机名通信的,我们需要需改hosts.
    编辑vim /etc/hosts
    1)保留前2行,其余的127.0.0.1注释掉

    127.0.0.1 localhost
    ::1 localhost

    2)添加三台机器的host绑定。
    如:

    192.168.94.140 CentOS7One
    192.168.94.139 CentOS7Two
    192.168.94.138 CentOS7Three

    3)修改hostname:
    编辑vim /etc/sysconfig/network
    修改hostname为上述绑定中对应的名称
    如:
    HOSTNAME=CentOS7One
    三台机器都需要操作,重启后生效reboot
    2、kafka安装配置
    下载后解压kafka

    Download the 2.2.0 release and un-tar it.
    > tar -xzf kafka_2.12-2.2.0.tgz
    > cd kafka_2.12-2.2.0

    自定义目录
    首先新建kafka的日志目录和zookeeper数据目录,因为这两项默认放在tmp目录,而tmp目录中内容会随重启而丢失,所以我们自定义两个目录:
    新增/var/kafka-logs和/data/zk目录

    修改 server.properties
    进入kafka的config目录
    vim server.properties
    修改内容如下:

    broker.id=1       #保证每个broker唯一,第一台可以不修改默认为0,后面两台需要修改,如改为2和3
    num.partitions=3      #分区数量一般与broker保持一致
    listeners=PLAINTEXT://192.168.94.140:9092     #修改为本机ip
    zookeeper.connect=192.168.94.140:2181,192.168.94.139:2181,192.168.94.138:2181      #配置三台服务zookeeper连接地址
    host.name=192.168.94.140      #新增host.name值,分别设为不同的值(3台机器根据自己的ip设置)
    log.dirs=/var/kafka-logs/       #修改log.dirs目录为之前自定义的目录

    修改 zookeeper.properties
    vim zookeeper.properties
    修改内容如下:

    dataDir=/data/zk  #修改为自定义的目录
    #maxClientCnxns=0  #注释掉
    #设置连接参数,添加如下配置
    tickTime=2000
    initLimit=10
    syncLimit=5
    #设置broker Id的服务地址
    server.0=192.168.94.140:2888:3888
    server.1=192.168.94.139:2888:3888
    server.2=192.168.94.138:2888:3888

    zookeeper数据目录添加id配置
    在各台服务的zookeeper数据目录添加myid文件,写入服务broker.id属性值,如这里的目录是/data/zk
    第一台broker.id为0的服务到该目录下执行:

    echo 0 > myid

    其他两台机器分别新增myid为1,myid为2.

    3、启动Kafka

    以下操作三台机器都要做

    启动zookeeper

    sh /opt/kafka/kafka_2.12-2.2.0/bin/zookeeper-server-start.sh -daemon /opt/kafka/kafka_2.12-2.2.0/config/zookeeper.properties 

    启动Kafka

    sh /opt/kafka/kafka_2.12-2.2.0/bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-2.2.0/config/server.properties

    创建topic

    sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test

    查看topic

    sh kafka-topics.sh --list --zookeeper localhost:2181

    查看topic的副本和分区

    ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

    在192.168.94.140启动生产者

    sh kafka-console-producer.sh --broker-list 192.168.94.140:9092 --topic test

    在192.168.94.139启动消费者

    sh kafka-console-consumer.sh --bootstrap-server 192.168.94.139:9092 --topic test --from-beginning

     

    4.Java测试连接Kafka单机版和Kafka集群的代码

    单机版测试:https://github.com/cjy513203427/Java_Advanced_Knowledge/tree/master/src/com/advance/kafka

    集群版测试:https://github.com/cjy513203427/KafkaExample

    4.Kafka高级特性

    1.Kafka消息事务

    事务保证一避免僵尸实例

    每个事务Producer分配一个transactional.id ,在进程重新启动时能 够识别相同的Producer实例

    Kafka增加了一个与transactional.id相关的epoch ,存储每个 transactional.id 内部元数据

    —旦epoch被触发,任何具有相同的transactional.id和更旧的 epocli的Producer被视为僵尸,Kafka会拒绝来自这些Procedure 的后续事务性写入

    一个消息在该偏移量被提交时,被视为"消息被消费"

    2.零拷贝

    内核空间和用户空间的交互拷贝次数为0,不是不拷贝

  • 相关阅读:
    Postman使用
    Java-性能调优-理解GC日志
    Java-性能调优实战(jps、jstack)
    Java-性能调优工具-jstat
    Java-内存模型
    生产者消费者JAVA实现
    Java-GC垃圾收集器
    JAVA-JVM垃圾回收算法
    Java线程状态流转---线程
    【洛谷5072】[Ynoi2015] 盼君勿忘(莫队)
  • 原文地址:https://www.cnblogs.com/Java-Starter/p/9984395.html
Copyright © 2011-2022 走看看