zoukankan      html  css  js  c++  java
  • Spark中使用Dataset的groupBy/agg/join/broadcast hasjoin/sql broadcast hashjoin示例(java api)

    Dataset的groupBy agg示例

    Dataset<Row> resultDs = dsParsed
    .groupBy("enodeb_id", "ecell_id")
    .agg(
        functions.first("scan_start_time").alias("scan_start_time1"),
        functions.first("insert_time").alias("insert_time1"),
        functions.first("mr_type").alias("mr_type1"),
        functions.first("mr_ltescphr").alias("mr_ltescphr1"),
        functions.first("mr_ltescpuschprbnum").alias("mr_ltescpuschprbnum1"),
        functions.count("enodeb_id").alias("rows1"))
    .selectExpr(
        "ecell_id", 
        "enodeb_id",
        "scan_start_time1 as scan_start_time",
        "insert_time1 as insert_time",
        "mr_type1 as mr_type",
        "mr_ltescphr1 as mr_ltescphr",
        "mr_ltescpuschprbnum1 as mr_ltescpuschprbnum",
        "rows1 as rows");
            

    Dataset Join示例:

            Dataset<Row> ncRes = sparkSession.read().option("delimiter", "|").option("header", true).csv("/user/csv");
            Dataset<Row> mro=sparkSession.sql("。。。");
    
            Dataset<Row> ncJoinMro = ncRes
                    .join(mro, mro.col("id").equalTo(ncRes.col("id")).and(mro.col("calid").equalTo(ncRes.col("calid"))), "left_outer")
                    .select(ncRes.col("id").as("int_id"), 
                            mro.col("vendor_id"),
                             。。。
    );

     join condition另外一种方式:

    leftDfWithWatermark.join(rightDfWithWatermark, 
      expr(""" leftDfId = rightDfId AND leftDfTime >= rightDfTime AND leftDfTime <= rightDfTime + interval 1 hour"""),
      joinType = "leftOuter" )

    BroadcastHashJoin示例:

    package com.dx.testbroadcast;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.functions;
    
    import java.io.*;
    
    public class Test {
        public static void main(String[] args) {
            String personPath = "E:\person.csv";
            String personOrderPath = "E:\personOrder.csv";
            //writeToPersion(personPath);
            //writeToPersionOrder(personOrderPath);
    
            SparkConf conf = new SparkConf();
            SparkSession sparkSession = SparkSession.builder().config(conf).appName("test-broadcast-app").master("local[*]").getOrCreate();
    
            Dataset<Row> person = sparkSession.read()
                    .option("header", "true")
                    .option("inferSchema", "true") //是否自动推到内容的类型
                    .option("delimiter", ",").csv(personPath).as("person");
            person.printSchema();
    
            Dataset<Row> personOrder = sparkSession.read()
                    .option("header", "true")
                    .option("inferSchema", "true") //是否自动推到内容的类型
                    .option("delimiter", ",").csv(personOrderPath).as("personOrder");
            personOrder.printSchema();
    
            // Default `inner`. Must be one of:`inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`,`right`, `right_outer`, `left_semi`, `left_anti`.
            Dataset<Row> resultDs = personOrder.join(functions.broadcast(person), personOrder.col("personid").equalTo(person.col("id")),"left");
            resultDs.explain();
    resultDs.show(10); }
    private static void writeToPersion(String personPath) { BufferedWriter personWriter = null; try { personWriter = new BufferedWriter(new FileWriter(personPath)); personWriter.write("id,name,age,address "); for (int i = 0; i < 10000; i++) { personWriter.write("" + i + ",person-" + i + "," + i + ",address-address-address-address-address-address-address" + i + " "); } } catch (Exception e) { e.printStackTrace(); } finally { if (personWriter != null) { try { personWriter.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static void writeToPersionOrder(String personOrderPath) { BufferedWriter personWriter = null; try { personWriter = new BufferedWriter(new FileWriter(personOrderPath)); personWriter.write("personid,name,age,address "); for (int i = 0; i < 1000000; i++) { personWriter.write("" + i + ",person-" + i + "," + i + ",address-address-address-address-address-address-address" + i + " "); } } catch (Exception e) { e.printStackTrace(); } finally { if (personWriter != null) { try { personWriter.close(); } catch (IOException e) { e.printStackTrace(); } } } } }

    打印结果:

    == Physical Plan ==
    *(2) BroadcastHashJoin [personid#28], [id#10], LeftOuter, BuildRight
    :- *(2) FileScan csv [personid#28,name#29,age#30,address#31] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/personOrder.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<personid:int,name:string,age:int,address:string>
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
       +- *(1) Project [id#10, name#11, age#12, address#13]
          +- *(1) Filter isnotnull(id#10)
             +- *(1) FileScan csv [id#10,name#11,age#12,address#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/person.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,age:int,address:string>
    
    +--------+--------+---+--------------------+---+--------+---+--------------------+
    |personid|    name|age|             address| id|    name|age|             address|
    +--------+--------+---+--------------------+---+--------+---+--------------------+
    |       0|person-0|  0|address-address-a...|  0|person-0|  0|address-address-a...|
    |       1|person-1|  1|address-address-a...|  1|person-1|  1|address-address-a...|
    |       2|person-2|  2|address-address-a...|  2|person-2|  2|address-address-a...|
    |       3|person-3|  3|address-address-a...|  3|person-3|  3|address-address-a...|
    |       4|person-4|  4|address-address-a...|  4|person-4|  4|address-address-a...|
    |       5|person-5|  5|address-address-a...|  5|person-5|  5|address-address-a...|
    |       6|person-6|  6|address-address-a...|  6|person-6|  6|address-address-a...|
    |       7|person-7|  7|address-address-a...|  7|person-7|  7|address-address-a...|
    |       8|person-8|  8|address-address-a...|  8|person-8|  8|address-address-a...|
    |       9|person-9|  9|address-address-a...|  9|person-9|  9|address-address-a...|
    +--------+--------+---+--------------------+---+--------+---+--------------------+
    only showing top 10 rows

    SparkSQL Broadcast HashJoin

            person.createOrReplaceTempView("temp_person");
            personOrder.createOrReplaceTempView("temp_person_order");
    
            Dataset<Row> sqlResult = sparkSession.sql(
                    " SELECT /*+ BROADCAST (t11) */" +
                    " t11.id,t11.name,t11.age,t11.address," +
                    " t10.personid as person_id,t10.name as persion_order_name" +
                    " FROM temp_person_order as t10 " +
                    " inner join temp_person as t11" +
                    " on t11.id = t10.personid ");
            sqlResult.show(10);
            sqlResult.explain();

    打印日志

    +---+--------+---+--------------------+---------+------------------+
    | id|    name|age|             address|person_id|persion_order_name|
    +---+--------+---+--------------------+---------+------------------+
    |  0|person-0|  0|address-address-a...|        0|          person-0|
    |  1|person-1|  1|address-address-a...|        1|          person-1|
    |  2|person-2|  2|address-address-a...|        2|          person-2|
    |  3|person-3|  3|address-address-a...|        3|          person-3|
    |  4|person-4|  4|address-address-a...|        4|          person-4|
    |  5|person-5|  5|address-address-a...|        5|          person-5|
    |  6|person-6|  6|address-address-a...|        6|          person-6|
    |  7|person-7|  7|address-address-a...|        7|          person-7|
    |  8|person-8|  8|address-address-a...|        8|          person-8|
    |  9|person-9|  9|address-address-a...|        9|          person-9|
    +---+--------+---+--------------------+---------+------------------+
    only showing top 10 rows
    
    19/06/24 09:35:50 INFO FileSourceStrategy: Pruning directories with: 
    19/06/24 09:35:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(personid#28)
    19/06/24 09:35:50 INFO FileSourceStrategy: Output Data Schema: struct<personid: int, name: string>
    19/06/24 09:35:50 INFO FileSourceScanExec: Pushed Filters: IsNotNull(personid)
    19/06/24 09:35:50 INFO FileSourceStrategy: Pruning directories with: 
    19/06/24 09:35:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(id#10)
    19/06/24 09:35:50 INFO FileSourceStrategy: Output Data Schema: struct<id: int, name: string, age: int, address: string ... 2 more fields>
    19/06/24 09:35:50 INFO FileSourceScanExec: Pushed Filters: IsNotNull(id)
    == Physical Plan ==
    *(2) Project [id#10, name#11, age#12, address#13, personid#28 AS person_id#94, name#29 AS persion_order_name#95]
    +- *(2) BroadcastHashJoin [personid#28], [id#10], Inner, BuildRight
       :- *(2) Project [personid#28, name#29]
       :  +- *(2) Filter isnotnull(personid#28)
       :     +- *(2) FileScan csv [personid#28,name#29] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/personOrder.csv], PartitionFilters: [], PushedFilters: [IsNotNull(personid)], ReadSchema: struct<personid:int,name:string>
       +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
          +- *(1) Project [id#10, name#11, age#12, address#13]
             +- *(1) Filter isnotnull(id#10)
                +- *(1) FileScan csv [id#10,name#11,age#12,address#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/person.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,age:int,address:string>
    19/06/24 09:35:50 INFO SparkContext: Invoking stop() from shutdown hook
  • 相关阅读:
    读书笔记-NIO的工作方式
    高精度运算-阶乘累积求和
    面试题-Java设计模式举例
    Web请求过程总结
    python 之栈的实现
    python之数据结构链表实现方式
    python 之分发包
    python之smtplib发邮件
    装饰器习题-接受参数的装饰器
    python之装饰器
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/9776521.html
Copyright © 2011-2022 走看看