zoukankan      html  css  js  c++  java
  • sparksql进阶


    scala> val df=spark.read.json("/tmp/pdf1json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, fv: bigint ... 1 more field]


    scala> df.show
    +---+----+--------+
    |age|  fv|    name|
    +---+----+--------+
    | 50|6998|   xiliu|
    | 50| 866|zhangsan|
    | 20| 565|zhangsan|
    | 23| 565|  llihmj|
    +---+----+--------+

    scala> df.filter($"age">30).select("name","age").show
    +--------+---+
    |    name|age|
    +--------+---+
    |   xiliu| 50|
    |zhangsan| 50|
    +--------+---+


    scala> df.filter($"age">30).select($"name",$"age"+1 as "age").show
    +--------+---+
    |    name|age|
    +--------+---+
    |   xiliu| 51|
    |zhangsan| 51|
    +--------+---+

    scala> df.groupBy("age").count.show
    +---+-----+
    |age|count|
    +---+-----+
    | 50|    2|
    | 23|    1|
    | 20|    1|
    +---+-----+


    scala> val spark=SparkSession.builder().enableHiveSupport().getOrCreate()
    18/09/26 14:23:26 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
    spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@53b85e12

    scala> spark.sql("show databases").show
    +------------+
    |databaseName|
    +------------+
    |      dbtest|
    |     default|
    |      gamedw|
    |  hive_hbase|
    |     sqoopdb|
    |      testdb|
    |      userdb|
    +------------+

    scala> spark.sql("use gamedw").show
    ++
    ||
    ++
    ++

    scala> spark.sql("show tables").show
    +--------+----------------+-----------+
    |database|       tableName|isTemporary|
    +--------+----------------+-----------+
    |  gamedw|         account|      false|
    |  gamedw|account_ix_accid|      false|
    |  gamedw|        cityinfo|      false|
    |  gamedw|            cust|      false|
    |  gamedw|          cust00|      false|
    |  gamedw|           cust1|      false|
    |  gamedw|          cust_1|      false|
    |  gamedw|       cust_copy|      false|
    |  gamedw|      cust_index|      false|
    |  gamedw|       customers|      false|
    |  gamedw|       employees|      false|
    |  gamedw|         loginfo|      false|
    |  gamedw|            mess|      false|
    |  gamedw|       roleinfor|      false|
    |  gamedw|          t_name|      false|
    |  gamedw|         t_name1|      false|
    |  gamedw|         t_name2|      false|
    |  gamedw|table_index_test|      false|
    |  gamedw|      table_test|      false|
    |  gamedw|       tb_bucket|      false|
    +--------+----------------+-----------+
    only showing top 20 rows

    scala> spark.sql("select * from gamedw.account limit 10").show
    +-----------+--------+------+--------+--------------------+
    |accountname|   accid|platid|  dateid|          createtime|
    +-----------+--------+------+--------+--------------------+
    |    1004210| 1004210|     6|20180116|2018-01-16 10:39:...|
    |   20946754|20946754|     0|20170913|2017-09-13 10:02:...|
    |   20946766|20946766|     0|20170901|2017-09-01 16:51:...|
    |   20946793|20946793|     0|20171117|2017-11-17 16:51:...|
    |   20946796|20946796|     0|20180110|2018-01-10 13:30:...|
    |   20946962|20946962|     0|20171219|2017-12-19 15:43:...|
    |   20957641|20957641|     0|20171117|2017-11-17 17:44:...|
    |   20957642|20957642|     0|20171220|2017-12-20 15:32:...|
    |   20963649|20963649|     6|20171220|2017-12-20 10:13:...|
    |   20963674|20963674|    33|20171219|2017-12-19 22:59:...|
    +-----------+--------+------+--------+--------------------+

    scala> val df=spark.sql("select * from gamedw.account")
    df: org.apache.spark.sql.DataFrame = [accountname: bigint, accid: bigint ... 3 more fields]

    将df分桶排序保存到hive

    scala> df.write.bucketBy(10,"platid").sortBy("createtime").saveAsTable("acc_tb")
    18/09/26 14:28:07 WARN hive.HiveExternalCatalog: Persisting bucketed data source table `gamedw`.`acc_tb` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

    scala> spark.sql("show tables like 'acc_tb'").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    |  gamedw|   acc_tb|      false|
    +--------+---------+-----------+

    查看hive

    hive> show create table acc_tb;
    OK
    CREATE TABLE `acc_tb`(
      `col` array<string> COMMENT 'from deserializer')
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES (
      'path'='hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_tb')
    STORED AS INPUTFORMAT
      'org.apache.hadoop.mapred.SequenceFileInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
    LOCATION
      'hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_tb'
    TBLPROPERTIES (
      'COLUMN_STATS_ACCURATE'='false',
      'numFiles'='12',
      'numRows'='-1',
      'rawDataSize'='-1',
      'spark.sql.sources.provider'='parquet',
      'spark.sql.sources.schema.bucketCol.0'='platid',
      'spark.sql.sources.schema.numBucketCols'='1',
      'spark.sql.sources.schema.numBuckets'='10',
      'spark.sql.sources.schema.numParts'='1',
      'spark.sql.sources.schema.numSortCols'='1',
      'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"accountname","type":"long","nullable":true,"metadata":{"HIVE_TYPE_STRING":"bigint"}},{"name":"accid","type":"long","nullable":true,"metadata":{"HIVE_TYPE_STRING":"bigint"}},{"name":"platid","type":"integer","nullable":true,"metadata":{"HIVE_TYPE_STRING":"int"}},{"name":"dateid","type":"integer","nullable":true,"metadata":{"HIVE_TYPE_STRING":"int"}},{"name":"createtime","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"string"}}]}',
      'spark.sql.sources.schema.sortCol.0'='createtime',
      'totalSize'='1212087',
      'transient_lastDdlTime'='1537943287')
    Time taken: 0.369 seconds, Fetched: 27 row(s)
    hive> select * from acc_tb;
    OK
    Failed with exception java.io.IOException:java.io.IOException: hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00000.c000.snappy.parquet not a SequenceFile

    查看hdfs:

    [root@host ~]# hdfs dfs -ls /user/hive/warehouse/gamedw.db/acc_tb
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    Found 13 items
    -rw-r--r--   1 root supergroup          0 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/_SUCCESS
    -rw-r--r--   1 root supergroup      23357 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00000.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      89069 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00001.c000.snappy.parquet
    -rw-r--r--   1 root supergroup     299566 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00002.c000.snappy.parquet
    -rw-r--r--   1 root supergroup     272843 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00003.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      66923 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00004.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      67756 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00005.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      15025 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00006.c000.snappy.parquet
    -rw-r--r--   1 root supergroup     138852 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00007.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      18598 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00008.c000.snappy.parquet
    -rw-r--r--   1 root supergroup     217081 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00009.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       1378 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00001-e275c19a-1728-49e2-bd6f-8598d76fe045_00007.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       1639 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00003-e275c19a-1728-49e2-bd6f-8598d76fe045_00006.c000.snappy.parquet

    hive查看数据:

    hive> select * from acc_tb;
    OK
    Failed with exception java.io.IOException:java.io.IOException: hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00000.c000.snappy.parquet not a SequenceFile
    Time taken: 0.405 seconds

    发现无法查询

    在spar-shell尝试:
    scala> spark.sql("select * from acc_tb").show
    +-----------+--------+------+--------+--------------------+
    |accountname|   accid|platid|  dateid|          createtime|
    +-----------+--------+------+--------+--------------------+
    |   22753184|22753184|    60|20180110|2018-01-10 15:17:...|
    |   22755578|22755578|    60|20180111|2018-01-11 15:57:...|
    |   22764087|22764087|    60|20180115|2018-01-15 14:32:...|
    |   22766072|22766072|    60|20180116|2018-01-16 09:59:...|
    |   22766191|22766191|    60|20180116|2018-01-16 10:03:...|
    |   22766762|22766762|    60|20180116|2018-01-16 10:11:...|
    |   22766933|22766933|    60|20180116|2018-01-16 10:15:...|
    |   22767239|22767239|    60|20180116|2018-01-16 10:23:...|
    |   22767249|22767249|    60|20180116|2018-01-16 10:23:...|
    |   22767356|22767356|    60|20180116|2018-01-16 10:27:...|
    |   22767396|22767396|    60|20180116|2018-01-16 10:28:...|
    |   22767628|22767628|    60|20180116|2018-01-16 10:36:...|
    |   22767650|22767650|    60|20180116|2018-01-16 10:38:...|
    |   22767690|22767690|    60|20180116|2018-01-16 10:39:...|
    |   22767710|22767710|    60|20180116|2018-01-16 10:40:...|
    |   22767744|22767744|    60|20180116|2018-01-16 10:43:...|
    |   22767862|22767862|    60|20180116|2018-01-16 10:45:...|
    |   22767916|22767916|    60|20180116|2018-01-16 10:48:...|
    |   22768016|22768016|    60|20180116|2018-01-16 10:57:...|
    |   22768171|22768171|    60|20180116|2018-01-16 10:57:...|
    +-----------+--------+------+--------+--------------------+
    only showing top 20 rows

    成功

    分区分桶保存到hive表

    scala> df.write.partitionBy("platid").bucketBy(5,"dateid").sortBy("createtime").saveAsTable("acc_plat")
    18/09/26 14:48:39 WARN hive.HiveExternalCatalog: Persisting bucketed data source table `gamedw`.`acc_plat` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

    查看hive

    hive> show create table acc_plat;
    OK
    CREATE TABLE `acc_plat`(
      `col` array<string> COMMENT 'from deserializer')
    PARTITIONED BY (
      `platid` int)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES (
      'path'='hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_plat')
    STORED AS INPUTFORMAT
      'org.apache.hadoop.mapred.SequenceFileInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
    LOCATION
      'hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_plat'
    TBLPROPERTIES (
      'spark.sql.partitionProvider'='catalog',
      'spark.sql.sources.provider'='parquet',
      'spark.sql.sources.schema.bucketCol.0'='dateid',
      'spark.sql.sources.schema.numBucketCols'='1',
      'spark.sql.sources.schema.numBuckets'='5',
      'spark.sql.sources.schema.numPartCols'='1',
      'spark.sql.sources.schema.numParts'='1',
      'spark.sql.sources.schema.numSortCols'='1',
      'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"accountname","type":"long","nullable":true,"metadata":{"HIVE_TYPE_STRING":"bigint"}},{"name":"accid","type":"long","nullable":true,"metadata":{"HIVE_TYPE_STRING":"bigint"}},{"name":"dateid","type":"integer","nullable":true,"metadata":{"HIVE_TYPE_STRING":"int"}},{"name":"createtime","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"string"}},{"name":"platid","type":"integer","nullable":true,"metadata":{"HIVE_TYPE_STRING":"int"}}]}',
      'spark.sql.sources.schema.partCol.0'='platid',
      'spark.sql.sources.schema.sortCol.0'='createtime',
      'transient_lastDdlTime'='1537944522')
    Time taken: 0.22 seconds, Fetched: 27 row(s)

    hive> select * from acc_plat;
    OK
    Failed with exception java.io.IOException:java.io.IOException: hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00000.c000.snappy.parquet not a SequenceFile
    Time taken: 4.071 seconds

    查看hdfs:

    [root@host ~]# hdfs dfs -ls -R /user/hive/warehouse/gamedw.db/acc_plat
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    -rw-r--r--   1 root supergroup          0 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/_SUCCESS
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0
    -rw-r--r--   1 root supergroup       9695 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00000.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       2701 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00001.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       8622 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00002.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       2025 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00003.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       8329 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00004.c000.snappy.parquet

    .........................................

    spark-shell 查询:


    scala> spark.sql("select * from acc_plat limit 10").show
    +-----------+--------+--------+--------------------+------+
    |accountname|   accid|  dateid|          createtime|platid|
    +-----------+--------+--------+--------------------+------+
    |   22596994|22596994|20171122|2017-11-22 11:04:...|     0|
    |   22600306|22600306|20171122|2017-11-22 15:42:...|     0|
    |   21667705|21667705|20171201|2017-12-01 11:06:...|     0|
    |   22631369|22631369|20171201|2017-12-01 17:15:...|     0|
    |   22631399|22631399|20171201|2017-12-01 17:26:...|     0|
    |   22631447|22631447|20171201|2017-12-01 17:46:...|     0|
    |   21345007|21345007|20171206|2017-12-06 10:59:...|     0|
    |   22659886|22659886|20171211|2017-12-11 14:21:...|     0|
    |   22659908|22659908|20171211|2017-12-11 14:32:...|     0|
    |   22659938|22659938|20171211|2017-12-11 14:46:...|     0|
    +-----------+--------+--------+--------------------+------+

    分区以parquet输出到指定目录

    scala> df.write.partitionBy("platid").format("parquet").save("/tmp/acc_platid")
     

    查看HDFS

    [root@host ~]# hdfs dfs -ls /tmp/acc_platid
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    Found 40 items
    -rw-r--r--   1 root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/_SUCCESS
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=0
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=1
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=10000
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=11
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=13
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=138
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=15
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=158
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=17
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=18
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=187
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=19
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=191
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=21
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=219
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=24294
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=247
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=27
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=277
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=286
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=287
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=288
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=289
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=291
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=295
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=3
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=33
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=35
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=36
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=38
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=4
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=46
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=50
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=6
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=60
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=83
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=89
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=9
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=93
    [root@host ~]#

    cube

    scala> df.filter("dateid in(20180306,20180307)").cube("platid","dateid").count.orderBy("platid").show
    +------+--------+-----+
    |platid|  dateid|count|
    +------+--------+-----+
    |  null|    null|  271|
    |  null|20180306|  135|
    |  null|20180307|  136|
    |     1|20180306|   19|
    |     1|20180307|   31|
    |     1|    null|   50|
    |     3|20180306|    5|
    |     3|    null|   11|
    |     3|20180307|    6|
    |     4|    null|   41|
    |     4|20180306|   21|
    |     4|20180307|   20|
    |     6|20180306|    5|
    |     6|    null|   10|
    |     6|20180307|    5|
    |    13|    null|    3|
    |    13|20180307|    2|
    |    13|20180306|    1|
    |    15|20180307|   12|
    |    15|    null|   24|
    +------+--------+-----+
    only showing top 20 rows

    scala> df.filter("dateid in(20180306,20180307)").rollup("platid","dateid").count.orderBy("platid").show
    +------+--------+-----+
    |platid|  dateid|count|
    +------+--------+-----+
    |  null|    null|  271|
    |     1|    null|   50|
    |     1|20180306|   19|
    |     1|20180307|   31|
    |     3|20180307|    6|
    |     3|    null|   11|
    |     3|20180306|    5|
    |     4|    null|   41|
    |     4|20180307|   20|
    |     4|20180306|   21|
    |     6|20180306|    5|
    |     6|    null|   10|
    |     6|20180307|    5|
    |    13|    null|    3|
    |    13|20180307|    2|
    |    13|20180306|    1|
    |    15|20180307|   12|
    |    15|20180306|   12|
    |    15|    null|   24|
    |    18|20180307|   14|
    +------+--------+-----+
    only showing top 20 rows

    scala> val df=spark.read.json("/tmp/pdf1json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, fv: bigint ... 1 more field]


    scala> df.show
    +---+----+--------+
    |age|  fv|    name|
    +---+----+--------+
    | 50|6998|   xiliu|
    | 50| 866|zhangsan|
    | 20| 565|zhangsan|
    | 23| 565|  llihmj|
    +---+----+--------+

    scala> df.groupBy("age").pivot("fv").count.show
    +---+----+----+----+
    |age| 565| 866|6998|
    +---+----+----+----+
    | 50|null|   1|   1|
    | 23|   1|null|null|
    | 20|   1|null|null|
    +---+----+----+----+

    scala> val df=spark.sql("select * from gamedw.account")
    df: org.apache.spark.sql.DataFrame = [accountname: bigint, accid: bigint ... 3 more fields]


    scala> df.filter("dateid in(20180501,20180502,20180503)").groupBy("dateid").pivot("platid").count.show
    +--------+----+---+----+---+---+----+----+----+---+---+----+---+---+---+---+---+----+---+---+---+----+----+---+----+----+
    |  dateid|   0|  1|   3|  4|  6|   9|  11|  13| 15| 18|  19| 21| 27| 33| 35| 36|  38| 46| 60| 83| 138| 158|191| 277| 289|
    +--------+----+---+----+---+---+----+----+----+---+---+----+---+---+---+---+---+----+---+---+---+----+----+---+----+----+
    |20180501|null| 12|null|  9|  4|null|null|null|  6| 33|null|  1|  9|  3|  6|  1|   1|  6|  5|  7|   1|   1|  8|   5|   1|
    |20180502|null| 11|   1|  7|  4|null|   1|   2|  4| 25|   2|  1|  4|  1|  3|  3|null|  3|  5|  5|   1|null|  6|   1|null|
    |20180503|   1|  6|   1|  2|  3|   1|null|   1|  3| 19|   2|  1|  6|  6|  1|  1|null|  4|  2|  8|null|null| 19|null|   1|
    +--------+----+---+----+---+---+----+----+----+---+---+----+---+---+---+---+---+----+---+---+---+----+----+---+----+----+


    scala> df.filter("dateid in(20180501,20180502,20180503)").groupBy("platid").pivot("dateid").count.show
    +------+--------+--------+--------+
    |platid|20180501|20180502|20180503|
    +------+--------+--------+--------+
    |    27|       9|       4|       6|
    |     1|      12|      11|       6|
    |    13|    null|       2|       1|
    |     6|       4|       4|       3|
    |     3|    null|       1|       1|
    |   191|       8|       6|      19|
    |    19|    null|       2|       2|
    |    15|       6|       4|       3|
    |     9|    null|    null|       1|
    |    35|       6|       3|       1|
    |     4|       9|       7|       2|
    |   277|       5|       1|    null|
    |    38|       1|    null|    null|
    |   289|       1|    null|       1|
    |    21|       1|       1|       1|
    |    60|       5|       5|       2|
    |    33|       3|       1|       6|
    |    11|    null|       1|    null|
    |    83|       7|       5|       8|
    |   158|       1|    null|    null|
    +------+--------+--------+--------+
    only showing top 20 rows

    用户自定义函数:

    scala> val strlen=udf{(str:String)=>str.length}

    strlen: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

    scala> spark.udf.register("slen",strlen)

    res60: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

    scala> df.registerTempTable("df")
    warning: there was one deprecation warning; re-run with -deprecation for details

    scala> spark.sql("select name,slen(name) from df").show
    +--------+---------+
    |    name|UDF(name)|
    +--------+---------+
    |   xiliu|        5|
    |zhangsan|        8|
    |zhangsan|        8|
    |  llihmj|        6|
    +--------+---------+

  • 相关阅读:
    Spark系列文章(三):搭建Spark开发环境IDEA
    MAC下搭建Hadoop运行环境
    Spark系列文章(二):Spark运行环境构建
    Spark系列文章(一):Spark初识
    Mac配置Maven及IntelliJ IDEA Maven配置
    《VC++深入详解》学习笔记 第十八章 ActiveX控件
    《VC++深入详解》学习笔记 第十七章 进程间通信
    Git 常用指令
    BAT脚本
    让Git的输出更友好: 多种颜色和自定义log格式
  • 原文地址:https://www.cnblogs.com/playforever/p/9706748.html
Copyright © 2011-2022 走看看