zoukankan      html  css  js  c++  java
  • Spark3学习【基于Java】1. SparkSql入门程序

    spark-sql是用来处理结构化数据的模块,是入门spark的首要模块。

    技术的学习无非就是去了解它的API,但是Spark有点难,因为它的例子和网上能搜到的基本都是Scala写的。我们这里使用Java。

    入门例子


    数据处理的第一个例子通常都是word count,就是统计一个文件里每个单词出现了几次。我们也来试一下。

    > 这个例子网上有很多,即使是通过spark实现的也不少;这里面大部分都是使用Scala写的,我没有试过;少部分是通过Java写的;
    Java里面的例子有一些是使用RDD实现的,只有极个别是通过DataSet来做的。但即使这一小撮例子,我也跑不通。

    所以我自己来尝试完成这个例子,看到别人用Scala写三五行就完成了,而我尝试了一整天几无进展。在网上东拼西凑熟悉Spark的Java API,终于略有进展。
    

    还是以我们前面的例子来改:

    1. String logFile = "words";
    2. SparkSession spark = SparkSession.builder().appName("Simple Application").master("local").getOrCreate();
    3. Dataset<String> logData = spark.read().textFile(logFile).cache();
    4. System.out.println("行数:" + logData.count());

    这里我不再使用之前的README文件,自己创建了一个words文件,内容随意写了一堆单词。

    执行程序,可以正常打印出来:

    接下来我们需要把句子分割成一个个单词合在一起,然后统计每个单词出现的次数。

    > 可能有人会说,这个简单,我用Java8的流一下就处理好了:
    

    把行集合通过flatMap处理,每一行通过split(" ")分割成一个独立的单词集合,再把结果通过自身groupBy一下就拿到终止数据结构Map了。
    

    最后把map的key和value的大小拿到就好了。
    

    的确,使用Java就是这样实现。但是Spark提供了一套和Java的流API名字和效果类似的工具,区别是Spark的是分布式API
    

    我们通过Spark的flatMap先来处理一下:

    1. Dataset<String> words = logData.flatMap((FlatMapFunction<String, String>) k -> Arrays.asList(k.split("\\s")).iterator(), Encoders.STRING());
    2. System.out.println("单词数:" + words.count());
    3. words.foreach(k -> {
    4. System.out.println("W:" + k);
    5. });

    不同于Java的流,spark这个flatMap的返回值是可以直接访问结果的:

    > 可能有人留意到spark中函数式方法的参数定义和Java差距较大。他们的参数不太一样,还多了个编码器。目前来讲我还不清楚为啥这样定义,不过印象中编码器也是spark3的重要优化内容。
    

    再Java中使用Scala的方法总是有些怪异,Lambda表达式前面总是需要强制类型转换,只是为了指明参数类型,否则需要new一个匿名类。

    这个也花了我不少时间,后来找到一个网页 org.apache.spark.sql.Dataset.flatMap java code examples | Tabnine

    再往后我迷茫了:

    1. KeyValueGroupedDataset<String, String> group = words.groupByKey((Function1<String, String>) k -> k, Encoders.STRING());

    这样我已经group好了,但是返回的不是DataSet,我也不知道这个返回有啥用,怎么拿到里面的内容呢?我费了好大劲没搞定。

    比如我发现count方法会返回一个DataSet:

    看起来正是我想要的,但是当我想把它输出竟然执行报错:

    1. count.foreach(t -> {
    2.     System.out.println(t);
    3. });

    别说foreach了,就算想看看里面的数量(就像一开始我们查看了文件有几行那样)都会报错,错误内容一样

    1. count.count();

    查了很多资料,大意是说spark的计算方法都是分布式的,各个任务之间需要通信,通信时需要序列化来传递信息。所以上面我们能看文件行数因为类型是String,有序列化标志;现在生成的是元组,不能序列化。我尝试了各种方法,甚至自己创建新类模拟了计算过程还是不行


    查了好久资料,比如 Job aborted due to stage failure: Task not serializable: | Databricks Spark Knowledge Base (gitbooks.io) 依然没有解决。偶然的机会找到一个令人激动的网站 Spark Groupby Example with DataFrame — SparkByExamples 终于解决了我的问题。

    使用DataFrame

    DataFrame虽然是spark提供的重要工具,但是再Java上并没有对应的类,只是把DataSet的泛型对象改成Row而已。注意这个Row没有泛型定义,所以里面有哪些列不知道

    可以从一开始就把DataSet转成DataFrame:

    但是可以看到要从Row里面拿数据比较麻烦。所以目前我只在需要序列化的地方转:

  • 相关阅读:
    dede自定义表单增加添加时间怎么弄
    今天微信群需要人家通过吗?是微信bug吗
    6.3.28微信需群主确认才可进群&发GIF动图功能内测开始了
    聚类分析初探
    一小时了解数据挖掘⑤数据挖掘步骤&常用的聚类、决策树和CRISP-DM概念
    Bayesian optimisation for smart hyperparameter search
    【模式识别】Learning To Rank之RankBoost
    UVA 816
    设计一个算法,输出从u到v的全部最短路径(採用邻接表存储)
    禅道——測试流程
  • 原文地址:https://www.cnblogs.com/somefuture/p/15637236.html
Copyright © 2011-2022 走看看