zoukankan      html  css  js  c++  java
  • 解决spark streaming集成kafka时只能读topic的其中一个分区数据的问题

    1. 问题描述

    我创建了一个名称为myTest的topic,该topic有三个分区,在我的应用中spark streaming以direct方式连接kakfa,但是发现只能消费一个分区的数据,多次更换comsumer group依然如此。

    2 环境配置

    kafka集群环境,

    主机 IP 操作系统 kakfa
    node1 192.168.1.101 Centos 6.5 kafka_2.11-0.10.1.1
    node2 192.168.1.102 Centos 6.5 kafka_2.11-0.10.1.1
    node3 192.168.1.103 Centos 6.5 kafka_2.11-0.10.1.1

    应用依赖:spark版本是2.1.1、kakfa版本是0.10.1.1;
    maven依赖配置如下

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>$2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.1.1</version>
    </dependency>
    

    相关配置代码(Java)如下:

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092");
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
    Set<String> topics = new HashSet<String>(Arrays.asList("testTopic"));
    JavaInputDStream<ConsumerRecord<Object, Object>> dStream = KafkaUtils.createDirectStream(
    	jssc,
    	LocationStrategies.PreferConsistent(),
    	ConsumerStrategies.Subscribe(topics, kafkaParams));
    

    3. 解决方案

    经过查阅相关资料发现是由于Kafka 0.10.1.1的bug导致的。其实不仅仅是0.10.1.1,另外0.10.1.0和0.10.0.2也有这个问题。详细描述参考https://issues.apache.org/jira/browse/KAFKA-4547
    最后我将kafka版本降到了0.10.0.1,解决了这个问题。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    
  • 相关阅读:
    高并发秒杀系统架构设计 · 抢购、微信红包、一元夺宝
    Linux服务器集群系统(一)
    keepalived+nginx双机热备+负载均衡
    kafka的一些常用命令
    基于Keepalived实现LVS双主高可用集群
    如何生动形象、切中要点地讲解 OSI 七层模型和两主机传输过程
    MyBatis动态SQL foreach标签实现批量插入
    详解Vue生命周期
    centos 解压压缩包到指定目录
    门罗币(MONERO)钱包生成教程
  • 原文地址:https://www.cnblogs.com/leekeggs/p/10401811.html
Copyright © 2011-2022 走看看