zoukankan      html  css  js  c++  java
  • SparkStreaming+Kafka 处理实时WIFI数据

    转载自:http://www.cnblogs.com/bigbigtree/p/6908014.html

    业务背景

    通过实时抽取华为ESIGHT系统的wifi数据,与校园的学生数据、课程数据、地理位置数据等进行关联,进行校园大数据的流数据处理与分析。

    技术选型

    • Kafka调用ESIGHT的resutful API,接入无线数据;
    • Sparkstreaming将流数据与Hive中的其他校园数据关联分析
    • 使用ES For Hadoop将分析结果导出到ES集群中

    Kafka Producer

    技术常规,使用kafka接入ESIGHT数据,只需要注意

    • 默认的分区方法是否产生数据偏移
    • 如果偏移需要自定义kafka.producer.Partitioner

    SparkStreaming 接收Kafka数据流

    用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。
    接收数据的方式有两种:

    基于Receiver接收数据

    这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
    然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

    需要注意的问题有:

    • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
    • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
    • 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER

    直连方式读取kafka数据

    这种新的不基于Receiver的直接方式,是在Spark 1.3之后引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

    这种方式有如下优点:

    • 简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

    • 高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

    • 一次且仅一次(extract-once)的事务机制: 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。 基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

    Direct连接示例

    1. import org.apache.spark.streaming.kafka.*;
    2.  
    3.  JavaPairInputDStream<String, String> directKafkaStream =
    4.      KafkaUtils.createDirectStream(streamingContext,
    5.          [key class], [value class], [key decoder class], [value decoder class],
    6.          [map of Kafka parameters], [set of topics to consume]);

    但Direct连接方式为了能够进行异常恢复,需要考虑如何维护KafkaOffset的问题。通常由两种方式维护

    • 使用Spark的checkpoint机制,根据需要定期checkpoint并恢复。由于项目使用SparkSQL从Hive中拉取数据,可能由于SparkSQLContext的恢复处理不当,在恢复的时候会失败;
    • 通过SparkStreaming的API在Zookeeper中维护Kafka的Offset

    使用Zookeeper维护KafkaOffset示例

    1. import java.util.HashMap;
    2. import java.util.HashSet;
    3. import java.util.Map;
    4. import java.util.Set;
    5. import java.util.concurrent.atomic.AtomicReference;
    6. import org.apache.spark.SparkConf;
    7. import org.apache.spark.api.java.JavaRDD;
    8. import org.apache.spark.api.java.function.Function;
    9. import org.apache.spark.api.java.function.VoidFunction;
    10. import org.apache.spark.broadcast.Broadcast;
    11. import org.apache.spark.streaming.Duration;
    12. import org.apache.spark.streaming.api.java.JavaDStream;
    13. import org.apache.spark.streaming.api.java.JavaInputDStream;
    14. import org.apache.spark.streaming.api.java.JavaStreamingContext;
    15. import org.apache.spark.streaming.kafka.HasOffsetRanges;
    16. import org.apache.spark.streaming.kafka.KafkaCluster;
    17. import org.apache.spark.streaming.kafka.KafkaUtils;
    18. import org.apache.spark.streaming.kafka.OffsetRange;
    19.  
    20. import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig;
    21.  
    22. import kafka.common.TopicAndPartition;
    23. import kafka.message.MessageAndMetadata;
    24. import kafka.serializer.StringDecoder;
    25. import scala.Predef;
    26. import scala.Tuple2;
    27. import scala.collection.JavaConversions;
    28. import lombok.extern.slf4j.*;
    29.  
    30. @Slf4j
    31. public class KafkaOffsetExample {
    32.     private static KafkaCluster kafkaCluster = null;
    33.     private static HashMap<String, String> kafkaParam = new HashMap<String, String>();
    34.     private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null;
    35.     private static scala.collection.immutable.Set<String> immutableTopics = null;
    36.  
    37.     /** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */
    38.     private static JavaInputDStream<String> createKafkaDStream(JavaStreamingContext jssc,
    39.             Map<TopicAndPartition, Long> consumerOffsetsLong) {
    40.         KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset");
    41.         JavaInputDStream<String> message = KafkaUtils.createDirectStream(jssc, String.class, String.class,
    42.                 StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(),
    43.                 consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() {
    44.                     private static final long serialVersionUID = 1L;
    45.  
    46.                     @Override
    47.                     public String call(MessageAndMetadata<String, String> v1) throws Exception {
    48.                         return v1.message();
    49.                     }
    50.                 });
    51.         return message;
    52.     }
    53.  
    54.     private static Map<TopicAndPartition, Long> initConsumerOffset(String topic) {
    55.         Set<String> topicSet = new HashSet<String>();
    56.         topicSet.add(topic);
    57.         scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet);
    58.         immutableTopics = mutableTopics.toSet();
    59.         scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet = kafkaCluster
    60.                 .getPartitions(immutableTopics).right().get();
    61.          
    62.         // kafka direct stream 初始化时使用的offset数据
    63.         Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>();
    64.         if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) {
    65.             KafkaOffsetExample.log.warn("没有保存offset, 各个partition offset 默认为0");
    66.             Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
    67.             for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
    68.                 consumerOffsetsLong.put(topicAndPartition, 0L);
    69.             }
    70.         }
    71.         else {
    72.             KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset");
    73.             scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster
    74.                     .getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get();
    75.  
    76.             Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp);
    77.             Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
    78.  
    79.             KafkaOffsetExample.log.warn("put data in consumerOffsetsLong");
    80.             for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
    81.                 Long offset = (Long) consumerOffsets.get(topicAndPartition);
    82.                 consumerOffsetsLong.put(topicAndPartition, offset);
    83.             }
    84.         }
    85.         return consumerOffsetsLong;
    86.     }
    87.      
    88.     private static JavaDStream<String> getAndUpdateKafkaOffset(JavaInputDStream<String> message,
    89.             AtomicReference<OffsetRange[]> offsetRanges) {
    90.         JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
    91.             private static final long serialVersionUID = 1L;
    92.             public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
    93.                 OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    94.                 offsetRanges.set(offsets);
    95.                 for (int i = 0; i < offsets.length; i++)
    96.                     KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}",
    97.                             offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(),
    98.                             offsets[i].untilOffset());
    99.                 return rdd;
    100.             }
    101.         });
    102.         KafkaOffsetExample.log.warn("foreachRDD");
    103.         // output
    104.         javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    105.             private static final long serialVersionUID = 1L;
    106.  
    107.             public void call(JavaRDD<String> rdd) throws Exception {
    108.                 if (rdd.isEmpty()) {
    109.                     KafkaOffsetExample.log.warn("Empty RDD");
    110.                     return;
    111.                 }
    112.                 for (OffsetRange o : offsetRanges.get()) {
    113.                     // 封装topic.partition 与 offset对应关系 java Map
    114.                     TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
    115.                     Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>();
    116.                     topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());
    117.  
    118.                     KafkaOffsetExample.log.warn(
    119.                             "Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset());
    120.  
    121.                     // 转换java map to scala immutable.map
    122.                     scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions
    123.                             .mapAsScalaMap(topicAndPartitionObjectMap);
    124.                     scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap
    125.                             .toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
    126.                                 private static final long serialVersionUID = 1L;
    127.  
    128.                                 @Override
    129.                                 public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
    130.                                     return v1;
    131.                                 }
    132.                             });
    133.                     // 更新offset到kafkaCluster
    134.                     kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"),
    135.                             scalatopicAndPartitionObjectMap);
    136.                 }
    137.             }
    138.         });
    139.         return javaDStream;
    140.     }
    141.      
    142.     private static void initKafkaParams() {
    143.         kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST);
    144.         kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT);
    145.         kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET);
    146.         kafkaParam.put("group.id", WIFIConfig.GROUP_ID);
    147.     }
    148.      
    149.     private static KafkaCluster initKafkaCluster() {
    150.         KafkaOffsetExample.log.warn("transform java Map to scala immutable.map");
    151.         // transform java Map to scala immutable.map
    152.         scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam);
    153.         scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap
    154.                 .toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
    155.                     private static final long serialVersionUID = 1L;
    156.  
    157.                     @Override
    158.                     public Tuple2<String, String> apply(Tuple2<String, String> arg0) {
    159.                         return arg0;
    160.                     }
    161.                 });
    162.  
    163.         // init KafkaCluster
    164.         KafkaOffsetExample.log.warn("Init KafkaCluster");
    165.         return new KafkaCluster(scalaKafkaParam);
    166.     }
    167.      
    168.     public static void run() {
    169.         initKafkaParams();
    170.         kafkaCluster = initKafkaCluster();
    171.  
    172.         SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer");
    173.         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
    174.          
    175.         // 得到rdd各个分区对应的offset, 并保存在offsetRanges中
    176.         KafkaOffsetExample.log.warn("initConsumer Offset");
    177.         Map<TopicAndPartition, Long> consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC);
    178.         kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);
    179.          
    180.         JavaInputDStream<String> message = createKafkaDStream(jssc, consumerOffsetsLong);
    181.         final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
    182.         JavaDStream<String> javaDStream = getAndUpdateKafkaOffset(message, offsetRanges);
    183.          
    184.         javaDStream.print();
    185.          
    186.         jssc.start();
    187.         try {
    188.             jssc.awaitTermination();
    189.         } catch (InterruptedException e) {
    190.             e.printStackTrace();
    191.         }
    192.     }
    193.      
    194.     public static void main(String[] args) throws Exception {
    195.         String testPath = "E:\javaCodes\svn\SmartCampus\Trunk\smartcampus.etl.wifi\src\main\resources\WifiConfig.yaml";
    196.         WIFIConfig.init(testPath);
    197.         KafkaOffsetExample.log.warn(WIFIConfig.toStr());
    198.          
    199.         KafkaOffsetExample.run();
    200.     }
    201. }

    SparkStreaming 数据处理

    根据需要,将流式数据与Hive中的静态数据关联,结果通过Elasticsearch For Hadoop导出到ES集群中。

    如果静态数据需要定时更新,可以在创建数据流后,在foreachRDD逻辑中,根据实际情况定期更新静态数据。

    调优

    由于个人经验较少,处理的数据量不大,以下内容大多是纸上谈兵,仅供参考。

    合理的批处理时间(batchDuration)

    • 几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。
    • 如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。
    • 一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。
    • 在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,直达SparkStreaming刚刚能及时处理完上一个批处理的数据,这样就是目前情况的最优值。

    合理的Kafka拉取量(maxRatePerPartition重要)

    spark.streaming.kafka.maxRatePerPartition参数配置指定了每秒每一个topic的每一个分区获取的最大消息数。

    对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的。这个参数默认是没有上限的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time。

    缓存反复使用的Dstream(RDD)

    Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数。

    设置合理的GC

    长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

    1. 设置年老代为并发收集。
    2. --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

    设置合理的CPU资源数

    CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。

    但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

    设置合理的parallelism

    partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。 在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

    使用高性能的算子

    这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

    • 使用reduceByKey/aggregateByKey替代groupByKey

    • 使用mapPartitions替代普通map

    • 使用foreachPartitions替代foreach

    • 使用filter之后进行coalesce操作

    • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

    • 使用Kryo优化序列化性能 这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。 在Spark中,主要有三个地方涉及到了序列化:

    • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

    • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

    • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。
    官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

    1. // 创建SparkConf对象。
    2. val conf = new SparkConf().setMaster(...).setAppName(...)
    3. // 设置序列化器为KryoSerializer。
    4. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    5. // 注册要序列化的自定义类型。
    6. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

    参考

  • 相关阅读:
    日常排雷:redis报错 could not get a resource from the pool
    阿里云centos服务器tomcat启动后,浏览器请求无响应
    并发生产顺序单据号测试
    json 数据 格式,请求接口,部分字段无法注入
    baomidou 动态数据源@DS 使用问题
    SpringMVC框架深入(八)--SpringMVC原理
    Spring框架深入(七)--json数据交互
    框架理论深入(六)--拦截器
    Spring框架深入(五)--文件上传和异常处理
    int和Integer的区别
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723704.html
Copyright © 2011-2022 走看看