zoukankan      html  css  js  c++  java
  • spark java dataset api没有提供迭代器

    spark  java dataset api没有提供迭代器

    所以处理一些列表内部数据关联转换,而不是只处理单条数据的转换得换成javaRdd api。

    下面是一个简单例子。通过进入宿舍的时间 找到后面的第一条离开宿舍的记录。并合并成一条完整的宿舍的 进出记录。

    这种业务,直接获得列表的当前对象,不能获得整个列表是很难实现的。

     
    static final Encoder<AccessInout> inoutEncoder = Encoders.bean(AccessInout.class);

    public static void main(String[] args) {
    SparkSession spark = SparkSession
    .builder().master("local")
    .appName("Java Spark SQL data sources example")
    .config("spark.some.config.option", "some-value")
    .getOrCreate();

    runJdbcDatasetExample(spark);

    //spark.stop();
    }



    private static void runJdbcDatasetExample(SparkSession spark) {


    Properties connectionProperties = new Properties();
    connectionProperties.put("user", "root");
    connectionProperties.put("password", "0000000");

    String dbUrl="jdbc:mysql://192.168.100.4:3306/datacenter?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8";


    //1、查询原始数据
    final Dataset<Row> allRecord = spark.read()
    .jdbc(dbUrl,
    "( select a.kid,a.outid,a.ioflag,a.OpDT as 'indate',a.OpDT as 'outdate',a.school_code ,a.faculty_code,a.major_code,a.class_code,a.sex from access_record_inout_temp2 a " +
    // " where a.outid like'1622%'" +//测试代码,只查询16届学生数据
    " ) t", connectionProperties);

    final Dataset<AccessInout> allRecordInout=allRecord.as(inoutEncoder);
    //2、将原始数据注册成视图
    allRecord.createOrReplaceTempView("view_access_record_inout_temp2");

    allRecord.printSchema();


    //3、按照学号分区。 并且分组内 按照进入宿舍的时间排序。
    Dataset<AccessInout> allRecordSort= allRecordInout.repartition(allRecordInout.col("outid"))
    .sortWithinPartitions(allRecordInout.col("indate"));

    4、由于Dataset接口没有提供Iterator,无法实现相关逻辑,这里换成rdd来实现。(scala接口是提供了迭代器的哦)
         JavaRDD<AccessInout> rdd= allRecordSort.toJavaRDD();


    JavaRDD<AccessInout> resultRdd= rdd.mapPartitions((FlatMapFunction<Iterator<AccessInout>,AccessInout>)(Iterator<AccessInout> in)->{
    List<AccessInout> result=new ArrayList<AccessInout>();
    while(in.hasNext()){
    AccessInout first= in.next();
    if("0".endsWith(first.getIoflag())){//第一条记录是:进入宿舍的记录
    if(in.hasNext()){
    AccessInout second= in.next();
    //取比入记录大的最小的一条出记录的时间,作为入记录的出时间。(排序后,后面一条就是最小记录)
    if(first.getOutid().endsWith(second.getOutid())&&("1".endsWith(second.getIoflag()))&&first.getIndate().before(second.getIndate())){
    first.setOutdate(second.getIndate());
    result.add(first);
    }
    }
    }
    }
    return result.iterator();
    });
    List<AccessInout> list= resultRdd.top(100);
    for(AccessInout temp :list){
    System.out.println(temp.getOutid()+"||"+temp.getIoflag()+"||"+temp.getIndate()+"||"+temp.getOutdate());
    }

    resultRdd.saveAsTextFile("D:\2018\AccessInout.txt");
  • 相关阅读:
    WebForm捆绑压缩js和css(WebForm Bundling and Minification)
    2017.4.15
    .NET Core + docker入门
    微软官方教程视频入口
    C# struct和class
    法线的变换的一些概念
    透视投影矩阵的一些概念
    视图矩阵ViewMatrix一些概念
    矩阵一些概念
    向量一些概念
  • 原文地址:https://www.cnblogs.com/double-kill/p/8596979.html
Copyright © 2011-2022 走看看