zoukankan      html  css  js  c++  java
  • SparkStreaming+Kafka 处理实时WIFI数据

     


    觉得有用的话,点个赞啊~~~ O(∩_∩)O~~


    业务背景

    通过实时抽取华为ESIGHT系统的wifi数据,与校园的学生数据、课程数据、地理位置数据等进行关联,进行校园大数据的流数据处理与分析。

    技术选型

    • Kafka调用ESIGHT的resutful API,接入无线数据;
    • Sparkstreaming将流数据与Hive中的其他校园数据关联分析
    • 使用ES For Hadoop将分析结果导出到ES集群中

    Kafka Producer

    技术常规,使用kafka接入ESIGHT数据,只需要注意

    • 默认的分区方法是否产生数据偏移
    • 如果偏移需要自定义kafka.producer.Partitioner

    SparkStreaming 接收Kafka数据流

    用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。
    接收数据的方式有两种:

    基于Receiver接收数据

    这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
    然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

    需要注意的问题有:

    • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
    • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
    • 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER

    直连方式读取kafka数据

    这种新的不基于Receiver的直接方式,是在Spark 1.3之后引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

    这种方式有如下优点:

    • 简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

    • 高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

    • 一次且仅一次(extract-once)的事务机制: 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。 基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

    Direct连接示例

    import org.apache.spark.streaming.kafka.*;
    
     JavaPairInputDStream<String, String> directKafkaStream =
         KafkaUtils.createDirectStream(streamingContext,
             [key class], [value class], [key decoder class], [value decoder class],
             [map of Kafka parameters], [set of topics to consume]);

    但Direct连接方式为了能够进行异常恢复,需要考虑如何维护KafkaOffset的问题。通常由两种方式维护

    • 使用Spark的checkpoint机制,根据需要定期checkpoint并恢复。由于项目使用SparkSQL从Hive中拉取数据,可能由于SparkSQLContext的恢复处理不当,在恢复的时候会失败;
    • 通过SparkStreaming的API在Zookeeper中维护Kafka的Offset

    使用Zookeeper维护KafkaOffset示例

    
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.atomic.AtomicReference;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.broadcast.Broadcast;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.HasOffsetRanges;
    import org.apache.spark.streaming.kafka.KafkaCluster;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import org.apache.spark.streaming.kafka.OffsetRange;
    
    import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig;
    
    import kafka.common.TopicAndPartition;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import scala.Predef;
    import scala.Tuple2;
    import scala.collection.JavaConversions;
    import lombok.extern.slf4j.*;
    
    @Slf4j
    public class KafkaOffsetExample {
    	private static KafkaCluster kafkaCluster = null;
    	private static HashMap<String, String> kafkaParam = new HashMap<String, String>();
    	private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null;
    	private static scala.collection.immutable.Set<String> immutableTopics = null;
    
    	/** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */
    	private static JavaInputDStream<String> createKafkaDStream(JavaStreamingContext jssc,
    			Map<TopicAndPartition, Long> consumerOffsetsLong) {
    		KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset");
    		JavaInputDStream<String> message = KafkaUtils.createDirectStream(jssc, String.class, String.class,
    				StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(),
    				consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() {
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public String call(MessageAndMetadata<String, String> v1) throws Exception {
    						return v1.message();
    					}
    				});
    		return message;
    	}
    
    	private static Map<TopicAndPartition, Long> initConsumerOffset(String topic) {
    		Set<String> topicSet = new HashSet<String>();
    		topicSet.add(topic);
    		scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet);
    		immutableTopics = mutableTopics.toSet();
    		scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet = kafkaCluster
    				.getPartitions(immutableTopics).right().get();
    		
    		// kafka direct stream 初始化时使用的offset数据
    		Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>();
    		if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) {
    			KafkaOffsetExample.log.warn("没有保存offset, 各个partition offset 默认为0");
    			Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
    			for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
    				consumerOffsetsLong.put(topicAndPartition, 0L);
    			}
    		}
    		else {
    			KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset");
    			scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster
    					.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get();
    
    			Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp);
    			Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
    
    			KafkaOffsetExample.log.warn("put data in consumerOffsetsLong");
    			for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
    				Long offset = (Long) consumerOffsets.get(topicAndPartition);
    				consumerOffsetsLong.put(topicAndPartition, offset);
    			}
    		}
    		return consumerOffsetsLong;
    	}
    	
    	private static JavaDStream<String> getAndUpdateKafkaOffset(JavaInputDStream<String> message, 
    			AtomicReference<OffsetRange[]> offsetRanges) {
    		JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
    			private static final long serialVersionUID = 1L;
    			public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
    				OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    				offsetRanges.set(offsets);
    				for (int i = 0; i < offsets.length; i++)
    					KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}",
    							offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(),
    							offsets[i].untilOffset());
    				return rdd;
    			}
    		});
    		KafkaOffsetExample.log.warn("foreachRDD");
    		// output
    		javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    			private static final long serialVersionUID = 1L;
    
    			public void call(JavaRDD<String> rdd) throws Exception {
    				if (rdd.isEmpty()) {
    					KafkaOffsetExample.log.warn("Empty RDD");
    					return;
    				}
    				for (OffsetRange o : offsetRanges.get()) {
    					// 封装topic.partition 与 offset对应关系 java Map
    					TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
    					Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>();
    					topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());
    
    					KafkaOffsetExample.log.warn(
    							"Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset());
    
    					// 转换java map to scala immutable.map
    					scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions
    							.mapAsScalaMap(topicAndPartitionObjectMap);
    					scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap
    							.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
    								private static final long serialVersionUID = 1L;
    
    								@Override
    								public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
    									return v1;
    								}
    							});
    					// 更新offset到kafkaCluster
    					kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"),
    							scalatopicAndPartitionObjectMap);
    				}
    			}
    		});
    		return javaDStream;
    	}
    	
    	private static void initKafkaParams() {
    		kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST);
    		kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT);
    		kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET);
    		kafkaParam.put("group.id", WIFIConfig.GROUP_ID);
    	}
    	
    	private static KafkaCluster initKafkaCluster() {
    		KafkaOffsetExample.log.warn("transform java Map to scala immutable.map");
    		// transform java Map to scala immutable.map
    		scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam);
    		scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap
    				.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
    					private static final long serialVersionUID = 1L;
    
    					@Override
    					public Tuple2<String, String> apply(Tuple2<String, String> arg0) {
    						return arg0;
    					}
    				});
    
    		// init KafkaCluster
    		KafkaOffsetExample.log.warn("Init KafkaCluster");
    		return new KafkaCluster(scalaKafkaParam);
    	}
    	
    	public static void run() {
    		initKafkaParams();
    		kafkaCluster = initKafkaCluster();
    
    		SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer");
    		JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
    		
    		// 得到rdd各个分区对应的offset, 并保存在offsetRanges中
    		KafkaOffsetExample.log.warn("initConsumer Offset");
    		Map<TopicAndPartition, Long> consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC);
    		kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);
    		
    		JavaInputDStream<String> message = createKafkaDStream(jssc, consumerOffsetsLong);
    		final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
    		JavaDStream<String> javaDStream = getAndUpdateKafkaOffset(message, offsetRanges);
    		
    		javaDStream.print();
    		
    		jssc.start();
    		try {
    			jssc.awaitTermination();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public static void main(String[] args) throws Exception {
    		String testPath = "E:\javaCodes\svn\SmartCampus\Trunk\smartcampus.etl.wifi\src\main\resources\WifiConfig.yaml";
    		WIFIConfig.init(testPath);
    		KafkaOffsetExample.log.warn(WIFIConfig.toStr());
    		
    		KafkaOffsetExample.run();
    	}
    }
    

      



    SparkStreaming 数据处理

    根据需要,将流式数据与Hive中的静态数据关联,结果通过Elasticsearch For Hadoop导出到ES集群中。

    如果静态数据需要定时更新,可以在创建数据流后,在foreachRDD逻辑中,根据实际情况定期更新静态数据。

    调优

    由于个人经验较少,处理的数据量不大,以下内容大多是纸上谈兵,仅供参考。

    合理的批处理时间(batchDuration)

    • 几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。
    • 如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。
    • 一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。
    • 在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,直达SparkStreaming刚刚能及时处理完上一个批处理的数据,这样就是目前情况的最优值。

    合理的Kafka拉取量(maxRatePerPartition重要)

    spark.streaming.kafka.maxRatePerPartition参数配置指定了每秒每一个topic的每一个分区获取的最大消息数。

    对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的。这个参数默认是没有上限的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time。

    缓存反复使用的Dstream(RDD)

    Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数。

    设置合理的GC

    长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

    设置年老代为并发收集。
    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

    设置合理的CPU资源数

    CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。

    但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

    设置合理的parallelism

    partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。 在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

    使用高性能的算子

    这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

    • 使用reduceByKey/aggregateByKey替代groupByKey

    • 使用mapPartitions替代普通map

    • 使用foreachPartitions替代foreach

    • 使用filter之后进行coalesce操作

    • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

    • 使用Kryo优化序列化性能 这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。 在Spark中,主要有三个地方涉及到了序列化:

    • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

    • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

    • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

    对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。
    官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

    以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

    // 创建SparkConf对象。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    // 设置序列化器为KryoSerializer。
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

    参考

  • 相关阅读:
    Windows各系统关闭更新(winXP/win2003/win7/win8/win2012/win10)
    SSH框架搭建与整合
    Servlet转发和重定向response.sendRedirecte()区别 (转)
    el 表达式用法(转)
    数组和集合(collection)调用syso输出时,结果不一致问题
    Java 接口和抽象类可以被new么?
    Dbutils
    dbutils使用---QueryRunner(query_update)、BeanListBeanHandler、MapListMapHandler、ScalarHandler
    Hadoop参数:fs.defaultFS、 dfs.name.dir 、 dfs.data.dir
    工厂设计模式(三种)详解
  • 原文地址:https://www.cnblogs.com/bigbigtree/p/6908014.html
Copyright © 2011-2022 走看看