zoukankan      html  css  js  c++  java
  • 【中间件】Kafka 学习 01

    KafKa

    博客教程-1
    博客教程-2

    kafka简介

    kafka起源

    Kafka是由LinkedIn开发并开源的分布式消息系统,2012年捐赠给Apache基金会,采用Scala语言,运行在JVM中,最新版本2.0.1下载地址

    kafka设计目标

    Kafka是一种分布式的,基于发布/订阅的消息系统
    设计目标:

    • 以时间复杂度O(1)的方式提供消息持久化能力,对TB级别的数据也能保证常数时间复杂度的访问性能;
    • 高吞吐率。在低配机器上也能保证每秒10万条以上消息的传输;
    • 支持kafka server间的消息分区,分布式消费,同时保证每个Partition内消息的顺序传输;
    • 支持离线数据和实时数据处理
    • scale out,支持在线水平扩展,无需停机即可扩展机器

    使用消息系统的好处

    解耦,冗余,扩展性,灵活性&峰值处理能力,可恢复性,顺序保证,缓冲,异步通信

    对比常用消息中间件

    ActiveMQ RabbitMQ Kafka
    produce容错,是否丢失数据 有ack模型,也有事务模型,保证至少不会丢失数据。ack模型可能会有重复消息,事务模型保证完全一致 批量形式下可能会丢失数据;非批量形式下:1.使用同步模式可能会有重复数据,2.使用异步模式可能会丢失数据
    consumer容错,是否丢失数据 有ack模型,数据不会丢失,但可能会有重复数据 批量形式下可能会丢数据。非批量形式下,可能会重复处理数据(ZK写offset是异步的)
    架构模型 基于JMS协议 基于AMQP模型,比较成熟,但更新超慢。RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长连接,queue有消息就会推送到consumer端,consumer循环从输入流读取数据);RabbitMQ以broker为中心;有消息确认机制 producer,broker,consumer,以consumer为中心,消息的消费信息保存在客户端consumer上,consumer根据消费点从broker上批量pull数据;无消息确认机制
    吞吐量 RabbitMQ在吞吐量方面稍逊于Kafka,两者出发点不一样,RabbitMQ支持消息的可靠传递,支持事务,不支持批量操作;基于存储的可靠性的要求存储可以采用内存或者硬盘 Kafka具有高吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高
    可用性 RabbitMQ支持miror的queue,主queue失效,miror queue接管 Kafka的broker支持主备模式
    集群负载均衡 RabbitMQ的负载均衡需要单独的loadbalancer进行支持 Kafka采用zookeeper对集群中broker,consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上,并且producer可以基于语义指定分片,消息发送到broker的某分片上

    Kafka架构

    Kafka术语

    • Topic
      用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上。
    • Partition
      是Kafka中横向扩展和一切并行化的基础,是物理上的概念,每个Topic都至少被切分为1个Partition
    • Offset
      消息在Partition中的编号,编号顺序不跨Partition
    • Consumer
      用于从Broker中取出/消费Message
    • Consumer Group
      每个Consumer属于一个特定的Consumer Group(可以为每个Consumer指定group name则属于默认的group)
    • Producer
      用户往Broker中发送/上产消息Message
    • Replication
      Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)
    • Leader
      每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步
    • Broker
      Kafka集群包含一个或多个服务器,这种服务器被称为broker。Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作
    • ISR(In-Sync Replica)
      是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR

    kafka单机下载安装

    下载

    kafka 下载地址:
    http://mirrors.hust.edu.cn/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz

    # wget工具下载
    wget http://mirrors.hust.edu.cn/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
    

    解压

    tar zxf kafka_2.12-2.0.1.tgz -C /aikq/kafka
    

    修改配置文件config/server.properties

    配置文件解析,参考地址

    cd /aikq/kafka/kafka_2.12-2.0.1
    # 修改配置文件config/server.properties
    vim config/server.properties
    
    # 后台启动kafka
    ./kafka-server-start.sh ../config/server.properties &
    
    # 新建xshell连接-1,生成者
    bin/kafka-console-producer.sh --broker-list ip(服务器ip):9092 --topic test
    # 新建xshell连接-2,消费者
    bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test --from-beginning
    bin/kafka-console-consumer.sh --zookeeper ip:2181 --topic test --from-beginning(老版本消费命令)
    
    # 生成者产生消息,消费者可以消费消息
    
    

    kafka配置文件详细解释

    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. It will get the value returned from 
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    listeners=PLAINTEXT://192.168.0.24:9092
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    advertised.listeners=PLAINTEXT://192.168.0.24:9092
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # A comma separated list of directories under which to store log files
    log.dirs=/opt/data/kafka/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=localhost:3181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    
    ############################# Group Coordinator Settings #############################
    
    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0
    

    kafka伪集群模式

    broker-0:
    vim config/server-0.properties

    broker.id=0
    listeners=PLAINTEXT://:9092
    port=9092
    #host.name=192.168.1.177
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=5
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    queued.max.requests =500
    log.cleanup.policy = delete
    

    broker-1:
    vim config/server-1.properties

    broker.id=1
    listeners=PLAINTEXT://:9093
    port=9093
    #host.name=192.168.1.177
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=5
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    queued.max.requests =500
    log.cleanup.policy = delete
    

    broker-2:
    vim config/server-2.properties

    broker.id=2
    listeners=PLAINTEXT://:9094
    port=9094
    #host.name=192.168.1.177
    num.network.threads=4
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/tmp/kafka-logs
    num.partitions=5
    num.recovery.threads.per.data.dir=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    log.cleaner.enable=false
    zookeeper.connect=localhost:2181
    zookeeper.connection.timeout.ms=6000
    queued.max.requests =500
    log.cleanup.policy = delete
    

    分别启动这个三个broker

    bin/kafka-server-start.sh config/server-0.properties &   #启动broker-0
    bin/kafka-server-start.sh config/server-1.properties &   #启动broker-1
    bin/kafka-server-start.sh config/server-2.properties &   #启动broker-2
    

    生产者-消费者集群模式

    bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.177:9092,192.168.1.177:9093,192.168.1.177:9094
    

    kafka集群模式

    kafka-manager 可视化管理

    # linux 环境
    ./sbt clean dist
    # win 环境
    sbt clean dist
    
    • 安装
    # 解压
    unzip *.zip
    # cd conf 目录下,编辑配置文件 application.conf
    kafka-manager.zkhosts="localhost:2181"
    # cd bin 目录下,启动
    ./kafka-manager -Dconfig.file=../conf/application.conf -Dhttp.port=9000
    

    -Dconfig.file 配置文件地址
    -java-home 指定java环境
    -Dhttp.port 指定端口

  • 相关阅读:
    node.js 安装后怎么打开 node.js 命令框
    thinkPHP5 where多条件查询
    网站title中的图标
    第一次写博客
    Solution to copy paste not working in Remote Desktop
    The operation could not be completed. (Microsoft.Dynamics.BusinessConnectorNet)
    The package failed to load due to error 0xC0011008
    VS2013常用快捷键
    微软Dynamics AX的三层架构
    怎样在TFS(Team Foundation Server)中链接团队项目
  • 原文地址:https://www.cnblogs.com/aikaiqiang/p/11911374.html
Copyright © 2011-2022 走看看