zoukankan      html  css  js  c++  java
  • 大数据-kafka

    kafka

    kafka是一个高吞吐,低延迟的分布式消息队列。

    使用场景:

    • 流式处理:spark streaming和storm处理

    • 日志收集:Kafka各种服务的log,Kafka以接口服务统一开放给各种consumer应用(hadoop、Hbase、Solr)

    • 消息系统:解耦conusmer与producer

    • 用户活动跟踪:记录用户活动数据到对应的topic中,订阅topic数据,做实时的监控分析,存到数仓中离线分析和挖掘。

    核心概念

    基础模型

    kafka由多个broker节点组件,向broker插入消息的应用为producer(消息生产者),从broker读取消息的应用为consumer(消息消费者)

    不同的消息分为不同的topic,每个topic划分了多个partition分区,partition分布在不同的broker节点上,分区的分布存在负载均衡策略。

    每个partition由多个有序message消息构成,分区对其中的消息设置了offset偏移量(递增的序号),每个分区各自维护一套消息的偏移量

    kafka的数据以partition为单位进行数据备份。

    消息处理策略

    消息生产producer

    • producer选择topic向其中插入message,

    • kafka根据根据分区策略将数据分为到某个分区

    • 消息被插入到分区的队尾

    • 消息存储后默认7天磁盘删除,消息消费次数无限制。

    • 分区策略,有key使用hash取余分配对应分区,无key随机均衡分配

    消息消费consumer

    • consumer消费者向broker请求,指定topic、分区、分区的偏移量来获取消息数据。

    • consumer需要自己管理偏移量,下次读取是以前一次记录的偏移量基础上继续向后读取,读取过程中更新偏移量。consumer修改自己的偏移量,能够读取一个分区上的不同位置的消息。

    • consumer管理各自的偏移量数据,多消费者相互无影响,线程安全,能够高并发。

    消费者组

    • 每个consumer对自己标记consumer group消费者组。

    • 偏移量实际属于消费者组。消费者组之间相互独立。

    • 一条消息在一个组内只能消费一次,组中的多个consumer不能重复读取这条消息

    • 消费者组能够同时消费多个分区,但同一时间内每个分区只能有一个consumer在访问

    • 通过消费者组实现多个consumer线程高并发消费数据,且线程相对安全。

    • 消费者组内的comsumer数量需要与topic的分区数量保持对应关系,保证数据读取效率

    集群部署与使用

    • 解压 tar -zxvf kafka_2.11-0.8.2.1.tgz

    • 修改config/server.properties配置文件 vim config/server.properties

      broker的标识:broker.id=0 其余节点依次往后编写1,2,3...

      zk的连接路径:zookeeper.connect=node1:2181,node2:2181,node3:2181

    • 复制到各集群,并修改配置信息,本节点的id

      scp -r kibana-4.6.2-linux-x86_64 node2:/opt/sxt/kafka_2.11-0.8.2.1

    启动集群

    • 启动zookeeper集群

      • 启动 zkServer.sh start

      • 查看 zkServer.sh status

      • 关闭 zkServer.sh stop

    • 启动Kafka

      • cd /opt/sxt/kafka_2.11-0.8.2.1

      • bin/kafka-server-start.sh config/server.properties

    集群命令

    使用kafka的bin目录下的脚本执行命令

    创建与查看

    • 通过用zk创建topic,名称 test,3个分区,每个分区2份数据(leader+follower)

      ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --replication-factor 2 --partitions 3 --topic test

    • 通过用zk查看topic状态

      ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test

      查看结果为:

      Topic:test  PartitionCount:3    ReplicationFactor:2 Configs:
      Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
      Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
      Topic: test Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
      #列举了topic的名称,分片数
      #列举每个分片的信息,分片序号,Leader所在节点,副本所在节点,Isr中同步的副本

    topic使用

    • 查看已有topic

      ./kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list

    • 创建生产者(通过broker连接)

      ./kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test

      然后键盘输入向kafka插入数据

    • 创建消费者

      ./kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --from-beginning --topic test

      锁定页面接收生产者的数据 参数指定了偏移量从起始位置开始

    • ctrl+c退出

    备份策略

    分区备份

    • 每个partition设置1个leader+n个follower,随机分布在集群上。

    • leader处理读写请求,follower不对外服务,只在leader宕机后顶替运行。

    • follower主动向leader拉取数据执行数据同步。

    ISR同步

    leader动态维护一个ISR列表,ISR列表中记录与leader同步良好的follower。

    ISR作用:leader下线后,从ISR列表中的follower中选举新的leader

    动态根据指定参数判断follower记入或移出列表。

    • follower的fech拉取请求间隔时间(10s)

      replica.lag.time.max.ms=10000

    • leader与follower相差记录数(4000)

      replica.lag.max.messages=4000

    API

    本版本的官方API:https://kafka.apache.org/082/documentation.html,其他版本可在上级目录中查找。各种参数可以从中直接检索获取。

    producer生产者

    创建配置对象;获取producer对象,创建message对象,通过producer发送message。自动创建topic

    //创建配置文件,其中置入各种参数
    Properties conf=new Properties();
    //指定broker节点
    conf.put("metadata.broker.list", "node01:9092,node02:9092,node03:9092");
    //指定序列化器,用于将数据转为字节进行收发
    conf.put("serializer.class", StringEncoder.class.getName())
    //指定ask参数,接收响应,0 无响应发送,1 单节点接收后响应,all 所有节点接收后响应
    conf.put("acks",0);  
    //客户端缓存数据,批量发送,15k
    conf.put("batch.size", 16384);
    //传入配置,获取生产者对象
    Producer<String, String> producerForKafka = new Producer<>(new ProducerConfig(conf));
    //准备数据,topic与消息数据
    String topic="test";
    String value = "shsxt"
    String key = "123"
    //获取消息对象,分配分区时,根据均衡策略随机分配
    KeyedMessage<String, String> meg1 = new KeyedMessage<>(topic,value);
    //指定key的消息对象,分配分区时,对key进行hash取余,根据结果分配分区
    KeyedMessage<String, String> meg2 = new KeyedMessage<>(topic,key,value);
    //通过producer对象发数据到kafka
    producerForKafka.send(meg1);
    producerForKafka.send(meg2);

    consumer消费者

    High level consumer API (偏移量由zk控制,操作简单,灵活度低)

    Simple level consumer API (客户端自己控制存储偏移量,操作复杂,灵活度高)

    本次使用High模式API

    • 创建配置对象ConsumerConnector

      Properties props = new Properties();
      //配置zk的地址
      props.put("zookeeper.connect","node1:2181,node2:2181,node3:2181");
      //配置comsumer组
      props.put("group.id", "comsumer1");
      //ZK超时时间
      props.put("zookeeper.session.timeout.ms", "400");
      //消费者自动提交偏移量的时间间隔
      props.put("auto.commit.interval.ms", "10000");
      //第一次消费消息时的偏移量,从最小偏移量读取,但默认largest
      props.put("auto.offset.reset","smallest");
      //自动提交偏移量,默认是true,允许自动提交
      //为了数据安全,一般设置false,需要在代码中手动提交偏移量
      props.put("auto.commit.enable","true");
      //指定配置文件,创建consumer连接
      ConsumerConnector consumer=Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    • 指定线程读取数据

      //指定多组topic与对应的线程数
      /*注意读取线程数与topic1的分区的数量的对应关系*/
      Map<String, Integer> topics_lines = new HashMap<String, Integer>();
      topics_lines.put("topic1",1);//一个线程
      topics_lines.put("topic2",3);//三个线程
      //创建数据流,返回容器,二维数组结构(topic数*线程数)
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

      /*使用以下方式获取每个流*/
      //从容器中获取每个topic的所有线程的流
      List<KafkaStream<byte[], byte[]>> list = consumerMap.get("topic1");
      //通过线程所在topic的流容器,获取每个流
      KafkaStream stream1=list.get(0);
    • 从流中读取数据

      //获取数据流的迭代器
      ConsumerIterator<byte[], byte[]> it1 = stream1.iterator();
      //通过迭代器获取数据
      while (it.hasNext()){
      // 获取一条消息对象
         MessageAndMetadata<byte[], byte[]> value = it.next();
         //所在分区
         int partition = value.partition();
         //偏移量
         long offset = value.offset();
         //实际消息数据
         String data = new String(value.message());
      }

    关于手动提交偏移量

    • 在配置consumer连接时,关闭自动提交

    • props.put("auto.commit.enable","false");

    • 业务代码执行完毕提交偏移量(每写出一条消息,就要执行一次偏移量修改代码)

    • consumer.commitOffsets();

    缺陷处理

    消息丢失(生产者)

    原因1:kafka数据先存储在内存中,一段时间后溢写到硬盘中。那么节点宕机,在内存中的消息未持久化,随着内存一起丢失。

    原因2:分区主从备份,leader分区宕机,从分区未及时拉取同步,导致数据丢失

    处理方式1(不建议):修改持久化触发参数:log.flush.interval.messages(间隔数据数),log.flush.interval.ms(间隔时间)

    处理方式2(建议):生产者api中指定修改ack参数,1为默认

    • 0:无响应上传数据,速度快,数据丢失可能大

    • 1:leader接收到数据后向producer响应 均衡速度与数据安全

    • all:leader接收到数据并follower备份之后再响应 数据安全,速度较慢

    消息丢失(消费者)

    原因:在High level模式下,在消息消费之前自动提交偏移量:consumer自动向zk提交了偏移量,在消息处理时consumer下线,导致偏移量之前的部分数据没处理完毕。consumer再次上线,从zk获取自动提交的偏移量并向后读取,最终导致消息的丢失。

    解决:客户端每条消息处理完,代码中手动提交偏移量,关闭偏移量自动提交。

    重复消费(消费者)

    原因:在High level模式下,在消息消费完之后一段时间自动提交偏移量:自动提交偏移量的时间间隔较长,consumer会预先执行前一个偏移量之后的消息,此时consumer宕机,上线后载入偏移量,重复执行了前一个偏移量之后的部分消息。

    解决:代码中手动提交数偏移量

    高吞吐

    高吞量

    • pagecache(页缓存),基于系统内存的数据接收

    • 磁盘顺序写,相对磁盘随机存效率极大提高,对于磁盘存储尤为明显。

    高吐量

    • 零拷贝计数

      pagecache(页缓存,系统缓存) —— 网卡bufffer(数据)+ socket(描述符)—— comsumer

      传统拷贝流程:comsumer向kafka进程请求数据,进程向pagecache查询,若不存在则到磁盘中读取数据存入系统,系统内存将数据复制到用户缓存,用户缓存再将数据拷贝到socket缓存中,socket缓存再将数据载入网卡缓存,网卡通过网络响应数据。

      多次的数据copy与上下文环境切换将消耗较多时间

    高吞吐

    • producer消息存入速度与consumer读取数据速度维持均衡,保证在数据flush到磁盘前读取数据,实现只在pagecache内存层面的高速吞吐。

    消息持久化

    持久化目录

    • 数据持久化的目录默认在/tmp/kafka-logs中

    • 总目录下包含多个topic目录(test1),每个topic目录包含若干分区目录(test1-0)

    • 每个分区目录下包含若干组segment文件,segment文件包括记录偏移量和实际位置偏移的index文件与存储真实数据的log文件

      000000100.index 000000100.log 000000200.index 000000200.log

    • segment的两个文件以前一segment文件最大偏移量命名,查找指定偏移量时,通过二分法快速定位到数据文件。

    kafka参数

    broker全局参数

    • 数据传输限制

      • message.max.bytes:broker能接收消息的最大字节数,默认1000

      • replica.fetch.max.bytes:单条数据被拷贝最大字节数,默认1MB,大于broker最大数据接收量

    • 关于新建log文件,满足条件将消息数据写入一个新的log和index中

      • log.segment.bytes:单个log文件最大大小,默认1G

        • segment.bytes: 这个一个单topic生效参数,单个log文件最大大小

      • log.roll.hours:新建log文件的间隔时间,默认7天

    Consumer端参数

    fetch.message.max.bytes:消费者拉取单条数据的最大容量,一般大于上述message.max.bytes

    整合flume

    flume作为数据源向kafka存入消息

    1. flume的配置文件(conf/fk.conf)配置代码在下面

      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1

      # 配置数据源
      # 监听AVRO端口,接受外部AVRO客户端的事件流
      a1.sources.r1.type = avro
      # 绑定数据源的主机及端口
      a1.sources.r1.bind = node1
      a1.sources.r1.port = 41414

      # 配置sink
      a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
      # kafka的topic
      a1.sinks.k1.topic = testflume
      # kafka的节点和端口
      a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
      # 配置数据插入的ack等级(响应)
      a1.sinks.k1.requiredAcks = 1
      # 配置数据批量插入量
      a1.sinks.k1.batchSize = 20
      a1.sinks.k1.channel = c1


      # 配置channel,类型为内存(事件缓存)
      a1.channels.c1.type = memory
      # 配置缓存容量
      a1.channels.c1.capacity = 1000000
      # 配置数据发送容量
      a1.channels.c1.transactionCapacity = 10000

      # 绑定管道
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
    2. 在kafka目录下启动kafka:bin/kafka-server-start.sh config/server.properties

    3. 在flume目录下启动flume:指定配置文件路径conf/fk.conf

      bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console

    4. 使用API向flume导入数据

      参考:https://blog.csdn.net/u011244682/article/details/79097700

      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.api.NettyAvroRpcClient;
      import org.apache.flume.api.RpcClient;
      import org.apache.flume.api.RpcClientFactory;
      import org.apache.flume.event.EventBuilder;
      import java.nio.charset.Charset;
      import java.util.Properties;


      //创建配置
      Properties p= new Properties();
      p.put("client.type","default");//客户端类型
      //p.put("hosts","h1");
      //p.put("hosts.h1","0.0.0.0:41414");
      p.put("batch-size",100);//批处理
      p.put("connect-timeout",20000);//连接超时
      p.put("request-timeout",20000);//请求超时
      //获取rpc连接
      RpcClient client =RpcClientFactory.getInstance(p);
      //设置目标ip和端口
      client.init("node1", 41414);

      //不传入配置方式获取客户端 this.client = RpcClientFactory.getInstance(hostname, port);
         
      //准备数据  
      String data="Hello Flume!";
      // 创建事件对象封装数据
      Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
      // 发送事件(事件)
      client.append(event);

      //关闭连接
      client.close();

      }

       

    5. kafka连接flume

    6. 最终中kafka中查看数据是否导入

    7.  

     

     

  • 相关阅读:
    数据结构和算法(Golang实现)(14)常见数据结构-栈和队列
    数据结构和算法(Golang实现)(20)排序算法-选择排序
    数据结构和算法(Golang实现)(18)排序算法-前言
    数据结构和算法(Golang实现)(22)排序算法-希尔排序
    数据结构和算法(Golang实现)(21)排序算法-插入排序
    数据结构和算法(Golang实现)(27)查找算法-二叉查找树
    关于SpringMVC映射模型视图的几点小事
    关于spring中事务管理的几件小事
    关于spring中AOP的几件小事
    关于spring中bean配置的几件小事
  • 原文地址:https://www.cnblogs.com/javaxiaobu/p/11703016.html
Copyright © 2011-2022 走看看