zoukankan      html  css  js  c++  java
  • kafka

    log.dir是数据持久化到磁盘的路径。
    kafka-topics --zookeeper localhost:2181 --create --topic test1
    
    
    安装Docker
    创建CentOS容器
    安装JDK 1.7+
    安装Zookeeper
    启动Kafka broker
    
    中间的kafka3个框应该是叫kafka的broker也叫kafka的server。生产者将数据发布到kafka的服务中,消费者从kafka的服务中获取数据。
    Kafka里面有zookeeper。既可以在虚拟机上部署kafka也可以通过docker去部署kafka。生产者和消费者都要指定topic。
    
    Docker安装和部署kafka的时候,首先安装Docker,然后创建centos容器,安装jdk7,安装zookper和kafka容器。

    kafka架构:
    生产者,消费者,Broker(接收生产者消息,给消费者订阅,收生产者发送的数据,并且将他持久化,同时提供给consume去订阅,Broker也是kafka的server,是一个服务需要单独起起来),ZooKeeper(不是kafka的一部分,只是要使用ZooKeeper)。
    
    Broker和ZooKeeper作为后台服务,生产者,消费者作为sdk,可以通过生产者的sdk发送数据给Broker,通过消费者的sdk从Broker中拿数据。
    
    生产者push数据推给Broker,消费者通过pull拉从Broker拿数据。为了防止消费者不能处理过多的数据,因此是消费者去取而不是Broker推过去。Broker不用知道消费者的存在。
    
    生产者如何感知Broker的存在,生产者如何知道将一条消息发送给那个Broker,消费者如何感知有哪些Broker,从哪个Broker取数据。
    
    生产者会如何获取整个kafka的元信息:在0.8以前生产者需要通过Zookeeper来获取整个Broker集群的信息,0.8以后在实例化一个生产者的时候可以指定多个Broker的url,比如集群有300个Broker,可以只指定3个Broker的IP和端口给消费者,生产者就可以通过这3个Broker获知整个集群的Broker信息列表,生产者会将这些元信息存在内存当中,如果有某个Broker挂了,生产者需要重新更新Broker的信息。
    
    消费者如何获取整个Broker-kafka的元信息:实例化消费者的时候指定一个获多个zookeeper的ip和端口,因为整个集群的元信息都会存进zookeeper里面去,消费者只要连接了zookeeper就只知道整个集群有多少个topic,每个topic再哪个Broker上面。
    生产者如何知道消息发送到哪个Broker上面去,消费者如何知道从哪个Broker去消费数据,这就涉及到patition和topic的概念。生产者就是通过patiton判断把消息发送给哪个Broker。
    
    
    生产者发送的所有数据都要指定该消息应该被发布到哪个topic,消费者再订阅的时候也要指定订阅哪个topic的消息,消费者可以显示指定topic也可以通过黑名单白名单来指定消费和不消费哪些topic。topic是一个逻辑概念,同一个topic的消息可以分布在一个或多个节点上面,一个topic包含一个或多个partition,每条消息仅属于一个topic。

    如图这个集群有3个Broker,同时有2个topic,topic1有6个patition,topic2有3个patition,每个topic具有的patition数量不一定相同。Broker会让patition均匀的分配,均匀分配才能让整个集群的负载均衡。

    patition是一个物理概念,一个patition只分布于一个Broker上,一个patition对应一个文件夹,一个patition包含多个Segment,Segment是透明的,用户并不感知Segment的存在,一个Segment对应一个文件。kafka会将数据持久化到硬盘上,kafka会把数据保留168小时,168小时之前的数据都删掉。或者超过10G数据就删除。
    
    
    如果一个topic有3和pation,在log指定的磁盘路径中看到topic1-0,topic1-1,topic1-2 3个文件夹,可以知道patition对应一个文件夹,patition里面有0000.index和0000.log文件,index文件是索引,log文件是真正的数据。inidex存每条消息的起始位置跟offset对应的关系,通过index文件找到log文件的位置。要找到一条消息,首先应该知道消息在哪个patition中并且这条消息在patition中的offset是多少。
    生产者发送消息的时候要决定消息发送到哪个Broker上去,
    
    
    public class HashPartitioner implements Partitioner {//patition的接口
    
      public HashPartitioner(VerifiableProperties verifiableProperties) {}
    
      @Override
      //这个方法返回值是消息应该被发送到哪个patition,因为一个patition只在一个Broker上,这个方法就是告诉这个消息应该被发送到哪个patition或者哪个Broker上去。
      public int partition(Object key, int numPartitions) {//一个参数是消息的key,二个参数是pation的总数(这条消息发送到的topic的patition的总数),
        if ((key instanceof Integer)) {
          return Math.abs(Integer.parseInt(key.toString())) % numPartitions;
        }
        return Math.abs(key.hashCode() % numPartitions);
      }
    }
    
    
    
    同步生产者异步生产者。
    同步生产者是向Broker发送一条消息之后发送失败就会重试发送3次,3次还是失败就会抛出异常或者加入失败队列后面再发送。
    异步生产者是发送一条数据之后就把数据放入一个队列里面,有一个后台进程不停地去队列拿数据然后发送给Broker。队列满了或者阻塞超过一定时间,kafka会将新的数据丢掉。
    
    
    
    CAP理论(帽子理论):A节点更新了,B节点要跟着更新。主节点挂了次节点要能工作。
  • 相关阅读:
    postman 调试接口报“401 身份认证信息未提供”错误
    UserWarning: XXX is writable by group/others and vulnerable to attack when used with get_resource_filename.
    以root权限执行python时候脚本时候报错“ExtractionError: Can't extract file(s) to egg cache”
    django接口调试示例说明
    查看linux系统版本、内存、CPU、存储容量
    一次批量杀死多个进程
    bash:pybot未找到命令
    Swoft-Api项目部署九:前、后置中间件
    Swoft-Api项目部署八:主从数据库配置
    Swoft-Api项目部署七:验证器
  • 原文地址:https://www.cnblogs.com/yaowen/p/8318755.html
Copyright © 2011-2022 走看看