zoukankan      html  css  js  c++  java
  • sparksql进阶


    scala> val df=spark.read.json("/tmp/pdf1json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, fv: bigint ... 1 more field]


    scala> df.show
    +---+----+--------+
    |age|  fv|    name|
    +---+----+--------+
    | 50|6998|   xiliu|
    | 50| 866|zhangsan|
    | 20| 565|zhangsan|
    | 23| 565|  llihmj|
    +---+----+--------+

    scala> df.filter($"age">30).select("name","age").show
    +--------+---+
    |    name|age|
    +--------+---+
    |   xiliu| 50|
    |zhangsan| 50|
    +--------+---+


    scala> df.filter($"age">30).select($"name",$"age"+1 as "age").show
    +--------+---+
    |    name|age|
    +--------+---+
    |   xiliu| 51|
    |zhangsan| 51|
    +--------+---+

    scala> df.groupBy("age").count.show
    +---+-----+
    |age|count|
    +---+-----+
    | 50|    2|
    | 23|    1|
    | 20|    1|
    +---+-----+


    scala> val spark=SparkSession.builder().enableHiveSupport().getOrCreate()
    18/09/26 14:23:26 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
    spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@53b85e12

    scala> spark.sql("show databases").show
    +------------+
    |databaseName|
    +------------+
    |      dbtest|
    |     default|
    |      gamedw|
    |  hive_hbase|
    |     sqoopdb|
    |      testdb|
    |      userdb|
    +------------+

    scala> spark.sql("use gamedw").show
    ++
    ||
    ++
    ++

    scala> spark.sql("show tables").show
    +--------+----------------+-----------+
    |database|       tableName|isTemporary|
    +--------+----------------+-----------+
    |  gamedw|         account|      false|
    |  gamedw|account_ix_accid|      false|
    |  gamedw|        cityinfo|      false|
    |  gamedw|            cust|      false|
    |  gamedw|          cust00|      false|
    |  gamedw|           cust1|      false|
    |  gamedw|          cust_1|      false|
    |  gamedw|       cust_copy|      false|
    |  gamedw|      cust_index|      false|
    |  gamedw|       customers|      false|
    |  gamedw|       employees|      false|
    |  gamedw|         loginfo|      false|
    |  gamedw|            mess|      false|
    |  gamedw|       roleinfor|      false|
    |  gamedw|          t_name|      false|
    |  gamedw|         t_name1|      false|
    |  gamedw|         t_name2|      false|
    |  gamedw|table_index_test|      false|
    |  gamedw|      table_test|      false|
    |  gamedw|       tb_bucket|      false|
    +--------+----------------+-----------+
    only showing top 20 rows

    scala> spark.sql("select * from gamedw.account limit 10").show
    +-----------+--------+------+--------+--------------------+
    |accountname|   accid|platid|  dateid|          createtime|
    +-----------+--------+------+--------+--------------------+
    |    1004210| 1004210|     6|20180116|2018-01-16 10:39:...|
    |   20946754|20946754|     0|20170913|2017-09-13 10:02:...|
    |   20946766|20946766|     0|20170901|2017-09-01 16:51:...|
    |   20946793|20946793|     0|20171117|2017-11-17 16:51:...|
    |   20946796|20946796|     0|20180110|2018-01-10 13:30:...|
    |   20946962|20946962|     0|20171219|2017-12-19 15:43:...|
    |   20957641|20957641|     0|20171117|2017-11-17 17:44:...|
    |   20957642|20957642|     0|20171220|2017-12-20 15:32:...|
    |   20963649|20963649|     6|20171220|2017-12-20 10:13:...|
    |   20963674|20963674|    33|20171219|2017-12-19 22:59:...|
    +-----------+--------+------+--------+--------------------+

    scala> val df=spark.sql("select * from gamedw.account")
    df: org.apache.spark.sql.DataFrame = [accountname: bigint, accid: bigint ... 3 more fields]

    将df分桶排序保存到hive

    scala> df.write.bucketBy(10,"platid").sortBy("createtime").saveAsTable("acc_tb")
    18/09/26 14:28:07 WARN hive.HiveExternalCatalog: Persisting bucketed data source table `gamedw`.`acc_tb` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

    scala> spark.sql("show tables like 'acc_tb'").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    |  gamedw|   acc_tb|      false|
    +--------+---------+-----------+

    查看hive

    hive> show create table acc_tb;
    OK
    CREATE TABLE `acc_tb`(
      `col` array<string> COMMENT 'from deserializer')
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES (
      'path'='hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_tb')
    STORED AS INPUTFORMAT
      'org.apache.hadoop.mapred.SequenceFileInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
    LOCATION
      'hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_tb'
    TBLPROPERTIES (
      'COLUMN_STATS_ACCURATE'='false',
      'numFiles'='12',
      'numRows'='-1',
      'rawDataSize'='-1',
      'spark.sql.sources.provider'='parquet',
      'spark.sql.sources.schema.bucketCol.0'='platid',
      'spark.sql.sources.schema.numBucketCols'='1',
      'spark.sql.sources.schema.numBuckets'='10',
      'spark.sql.sources.schema.numParts'='1',
      'spark.sql.sources.schema.numSortCols'='1',
      'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"accountname","type":"long","nullable":true,"metadata":{"HIVE_TYPE_STRING":"bigint"}},{"name":"accid","type":"long","nullable":true,"metadata":{"HIVE_TYPE_STRING":"bigint"}},{"name":"platid","type":"integer","nullable":true,"metadata":{"HIVE_TYPE_STRING":"int"}},{"name":"dateid","type":"integer","nullable":true,"metadata":{"HIVE_TYPE_STRING":"int"}},{"name":"createtime","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"string"}}]}',
      'spark.sql.sources.schema.sortCol.0'='createtime',
      'totalSize'='1212087',
      'transient_lastDdlTime'='1537943287')
    Time taken: 0.369 seconds, Fetched: 27 row(s)
    hive> select * from acc_tb;
    OK
    Failed with exception java.io.IOException:java.io.IOException: hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00000.c000.snappy.parquet not a SequenceFile

    查看hdfs:

    [root@host ~]# hdfs dfs -ls /user/hive/warehouse/gamedw.db/acc_tb
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    Found 13 items
    -rw-r--r--   1 root supergroup          0 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/_SUCCESS
    -rw-r--r--   1 root supergroup      23357 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00000.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      89069 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00001.c000.snappy.parquet
    -rw-r--r--   1 root supergroup     299566 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00002.c000.snappy.parquet
    -rw-r--r--   1 root supergroup     272843 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00003.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      66923 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00004.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      67756 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00005.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      15025 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00006.c000.snappy.parquet
    -rw-r--r--   1 root supergroup     138852 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00007.c000.snappy.parquet
    -rw-r--r--   1 root supergroup      18598 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00008.c000.snappy.parquet
    -rw-r--r--   1 root supergroup     217081 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00009.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       1378 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00001-e275c19a-1728-49e2-bd6f-8598d76fe045_00007.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       1639 2018-09-26 14:28 /user/hive/warehouse/gamedw.db/acc_tb/part-00003-e275c19a-1728-49e2-bd6f-8598d76fe045_00006.c000.snappy.parquet

    hive查看数据:

    hive> select * from acc_tb;
    OK
    Failed with exception java.io.IOException:java.io.IOException: hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_tb/part-00000-e275c19a-1728-49e2-bd6f-8598d76fe045_00000.c000.snappy.parquet not a SequenceFile
    Time taken: 0.405 seconds

    发现无法查询

    在spar-shell尝试:
    scala> spark.sql("select * from acc_tb").show
    +-----------+--------+------+--------+--------------------+
    |accountname|   accid|platid|  dateid|          createtime|
    +-----------+--------+------+--------+--------------------+
    |   22753184|22753184|    60|20180110|2018-01-10 15:17:...|
    |   22755578|22755578|    60|20180111|2018-01-11 15:57:...|
    |   22764087|22764087|    60|20180115|2018-01-15 14:32:...|
    |   22766072|22766072|    60|20180116|2018-01-16 09:59:...|
    |   22766191|22766191|    60|20180116|2018-01-16 10:03:...|
    |   22766762|22766762|    60|20180116|2018-01-16 10:11:...|
    |   22766933|22766933|    60|20180116|2018-01-16 10:15:...|
    |   22767239|22767239|    60|20180116|2018-01-16 10:23:...|
    |   22767249|22767249|    60|20180116|2018-01-16 10:23:...|
    |   22767356|22767356|    60|20180116|2018-01-16 10:27:...|
    |   22767396|22767396|    60|20180116|2018-01-16 10:28:...|
    |   22767628|22767628|    60|20180116|2018-01-16 10:36:...|
    |   22767650|22767650|    60|20180116|2018-01-16 10:38:...|
    |   22767690|22767690|    60|20180116|2018-01-16 10:39:...|
    |   22767710|22767710|    60|20180116|2018-01-16 10:40:...|
    |   22767744|22767744|    60|20180116|2018-01-16 10:43:...|
    |   22767862|22767862|    60|20180116|2018-01-16 10:45:...|
    |   22767916|22767916|    60|20180116|2018-01-16 10:48:...|
    |   22768016|22768016|    60|20180116|2018-01-16 10:57:...|
    |   22768171|22768171|    60|20180116|2018-01-16 10:57:...|
    +-----------+--------+------+--------+--------------------+
    only showing top 20 rows

    成功

    分区分桶保存到hive表

    scala> df.write.partitionBy("platid").bucketBy(5,"dateid").sortBy("createtime").saveAsTable("acc_plat")
    18/09/26 14:48:39 WARN hive.HiveExternalCatalog: Persisting bucketed data source table `gamedw`.`acc_plat` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

    查看hive

    hive> show create table acc_plat;
    OK
    CREATE TABLE `acc_plat`(
      `col` array<string> COMMENT 'from deserializer')
    PARTITIONED BY (
      `platid` int)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES (
      'path'='hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_plat')
    STORED AS INPUTFORMAT
      'org.apache.hadoop.mapred.SequenceFileInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'
    LOCATION
      'hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_plat'
    TBLPROPERTIES (
      'spark.sql.partitionProvider'='catalog',
      'spark.sql.sources.provider'='parquet',
      'spark.sql.sources.schema.bucketCol.0'='dateid',
      'spark.sql.sources.schema.numBucketCols'='1',
      'spark.sql.sources.schema.numBuckets'='5',
      'spark.sql.sources.schema.numPartCols'='1',
      'spark.sql.sources.schema.numParts'='1',
      'spark.sql.sources.schema.numSortCols'='1',
      'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"accountname","type":"long","nullable":true,"metadata":{"HIVE_TYPE_STRING":"bigint"}},{"name":"accid","type":"long","nullable":true,"metadata":{"HIVE_TYPE_STRING":"bigint"}},{"name":"dateid","type":"integer","nullable":true,"metadata":{"HIVE_TYPE_STRING":"int"}},{"name":"createtime","type":"string","nullable":true,"metadata":{"HIVE_TYPE_STRING":"string"}},{"name":"platid","type":"integer","nullable":true,"metadata":{"HIVE_TYPE_STRING":"int"}}]}',
      'spark.sql.sources.schema.partCol.0'='platid',
      'spark.sql.sources.schema.sortCol.0'='createtime',
      'transient_lastDdlTime'='1537944522')
    Time taken: 0.22 seconds, Fetched: 27 row(s)

    hive> select * from acc_plat;
    OK
    Failed with exception java.io.IOException:java.io.IOException: hdfs://localhost:9000/user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00000.c000.snappy.parquet not a SequenceFile
    Time taken: 4.071 seconds

    查看hdfs:

    [root@host ~]# hdfs dfs -ls -R /user/hive/warehouse/gamedw.db/acc_plat
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    -rw-r--r--   1 root supergroup          0 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/_SUCCESS
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0
    -rw-r--r--   1 root supergroup       9695 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00000.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       2701 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00001.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       8622 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00002.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       2025 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00003.c000.snappy.parquet
    -rw-r--r--   1 root supergroup       8329 2018-09-26 14:48 /user/hive/warehouse/gamedw.db/acc_plat/platid=0/part-00000-ef876036-ec6e-4751-b28c-e8f77a3c5b31_00004.c000.snappy.parquet

    .........................................

    spark-shell 查询:


    scala> spark.sql("select * from acc_plat limit 10").show
    +-----------+--------+--------+--------------------+------+
    |accountname|   accid|  dateid|          createtime|platid|
    +-----------+--------+--------+--------------------+------+
    |   22596994|22596994|20171122|2017-11-22 11:04:...|     0|
    |   22600306|22600306|20171122|2017-11-22 15:42:...|     0|
    |   21667705|21667705|20171201|2017-12-01 11:06:...|     0|
    |   22631369|22631369|20171201|2017-12-01 17:15:...|     0|
    |   22631399|22631399|20171201|2017-12-01 17:26:...|     0|
    |   22631447|22631447|20171201|2017-12-01 17:46:...|     0|
    |   21345007|21345007|20171206|2017-12-06 10:59:...|     0|
    |   22659886|22659886|20171211|2017-12-11 14:21:...|     0|
    |   22659908|22659908|20171211|2017-12-11 14:32:...|     0|
    |   22659938|22659938|20171211|2017-12-11 14:46:...|     0|
    +-----------+--------+--------+--------------------+------+

    分区以parquet输出到指定目录

    scala> df.write.partitionBy("platid").format("parquet").save("/tmp/acc_platid")
     

    查看HDFS

    [root@host ~]# hdfs dfs -ls /tmp/acc_platid
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/root/hadoop/hadoop-2.7.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/root/hive/apache-hive-2.1.1/lib/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    Found 40 items
    -rw-r--r--   1 root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/_SUCCESS
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=0
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=1
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=10000
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=11
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=13
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=138
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=15
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=158
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=17
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=18
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=187
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=19
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=191
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=21
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=219
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=24294
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=247
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=27
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=277
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=286
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=287
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=288
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=289
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=291
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=295
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=3
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=33
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=35
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=36
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=38
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=4
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=46
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=50
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=6
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=60
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=83
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=89
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:58 /tmp/acc_platid/platid=9
    drwxr-xr-x   - root supergroup          0 2018-09-26 14:59 /tmp/acc_platid/platid=93
    [root@host ~]#

    cube

    scala> df.filter("dateid in(20180306,20180307)").cube("platid","dateid").count.orderBy("platid").show
    +------+--------+-----+
    |platid|  dateid|count|
    +------+--------+-----+
    |  null|    null|  271|
    |  null|20180306|  135|
    |  null|20180307|  136|
    |     1|20180306|   19|
    |     1|20180307|   31|
    |     1|    null|   50|
    |     3|20180306|    5|
    |     3|    null|   11|
    |     3|20180307|    6|
    |     4|    null|   41|
    |     4|20180306|   21|
    |     4|20180307|   20|
    |     6|20180306|    5|
    |     6|    null|   10|
    |     6|20180307|    5|
    |    13|    null|    3|
    |    13|20180307|    2|
    |    13|20180306|    1|
    |    15|20180307|   12|
    |    15|    null|   24|
    +------+--------+-----+
    only showing top 20 rows

    scala> df.filter("dateid in(20180306,20180307)").rollup("platid","dateid").count.orderBy("platid").show
    +------+--------+-----+
    |platid|  dateid|count|
    +------+--------+-----+
    |  null|    null|  271|
    |     1|    null|   50|
    |     1|20180306|   19|
    |     1|20180307|   31|
    |     3|20180307|    6|
    |     3|    null|   11|
    |     3|20180306|    5|
    |     4|    null|   41|
    |     4|20180307|   20|
    |     4|20180306|   21|
    |     6|20180306|    5|
    |     6|    null|   10|
    |     6|20180307|    5|
    |    13|    null|    3|
    |    13|20180307|    2|
    |    13|20180306|    1|
    |    15|20180307|   12|
    |    15|20180306|   12|
    |    15|    null|   24|
    |    18|20180307|   14|
    +------+--------+-----+
    only showing top 20 rows

    scala> val df=spark.read.json("/tmp/pdf1json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, fv: bigint ... 1 more field]


    scala> df.show
    +---+----+--------+
    |age|  fv|    name|
    +---+----+--------+
    | 50|6998|   xiliu|
    | 50| 866|zhangsan|
    | 20| 565|zhangsan|
    | 23| 565|  llihmj|
    +---+----+--------+

    scala> df.groupBy("age").pivot("fv").count.show
    +---+----+----+----+
    |age| 565| 866|6998|
    +---+----+----+----+
    | 50|null|   1|   1|
    | 23|   1|null|null|
    | 20|   1|null|null|
    +---+----+----+----+

    scala> val df=spark.sql("select * from gamedw.account")
    df: org.apache.spark.sql.DataFrame = [accountname: bigint, accid: bigint ... 3 more fields]


    scala> df.filter("dateid in(20180501,20180502,20180503)").groupBy("dateid").pivot("platid").count.show
    +--------+----+---+----+---+---+----+----+----+---+---+----+---+---+---+---+---+----+---+---+---+----+----+---+----+----+
    |  dateid|   0|  1|   3|  4|  6|   9|  11|  13| 15| 18|  19| 21| 27| 33| 35| 36|  38| 46| 60| 83| 138| 158|191| 277| 289|
    +--------+----+---+----+---+---+----+----+----+---+---+----+---+---+---+---+---+----+---+---+---+----+----+---+----+----+
    |20180501|null| 12|null|  9|  4|null|null|null|  6| 33|null|  1|  9|  3|  6|  1|   1|  6|  5|  7|   1|   1|  8|   5|   1|
    |20180502|null| 11|   1|  7|  4|null|   1|   2|  4| 25|   2|  1|  4|  1|  3|  3|null|  3|  5|  5|   1|null|  6|   1|null|
    |20180503|   1|  6|   1|  2|  3|   1|null|   1|  3| 19|   2|  1|  6|  6|  1|  1|null|  4|  2|  8|null|null| 19|null|   1|
    +--------+----+---+----+---+---+----+----+----+---+---+----+---+---+---+---+---+----+---+---+---+----+----+---+----+----+


    scala> df.filter("dateid in(20180501,20180502,20180503)").groupBy("platid").pivot("dateid").count.show
    +------+--------+--------+--------+
    |platid|20180501|20180502|20180503|
    +------+--------+--------+--------+
    |    27|       9|       4|       6|
    |     1|      12|      11|       6|
    |    13|    null|       2|       1|
    |     6|       4|       4|       3|
    |     3|    null|       1|       1|
    |   191|       8|       6|      19|
    |    19|    null|       2|       2|
    |    15|       6|       4|       3|
    |     9|    null|    null|       1|
    |    35|       6|       3|       1|
    |     4|       9|       7|       2|
    |   277|       5|       1|    null|
    |    38|       1|    null|    null|
    |   289|       1|    null|       1|
    |    21|       1|       1|       1|
    |    60|       5|       5|       2|
    |    33|       3|       1|       6|
    |    11|    null|       1|    null|
    |    83|       7|       5|       8|
    |   158|       1|    null|    null|
    +------+--------+--------+--------+
    only showing top 20 rows

    用户自定义函数:

    scala> val strlen=udf{(str:String)=>str.length}

    strlen: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

    scala> spark.udf.register("slen",strlen)

    res60: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

    scala> df.registerTempTable("df")
    warning: there was one deprecation warning; re-run with -deprecation for details

    scala> spark.sql("select name,slen(name) from df").show
    +--------+---------+
    |    name|UDF(name)|
    +--------+---------+
    |   xiliu|        5|
    |zhangsan|        8|
    |zhangsan|        8|
    |  llihmj|        6|
    +--------+---------+

  • 相关阅读:
    20200226 Java IO流——廖雪峰
    20200225 Java 多线程(2)-廖雪峰
    20200225 Java 多线程(1)-廖雪峰
    20200224 尚硅谷ElasticSearch【归档】
    20200224 一 概述
    20200222 尚硅谷Dubbo【归档】
    20200222 四、dubbo原理
    Improved robustness of reinforcement learning policies upon conversion to spiking neuronal network platforms applied to Atari Breakout game
    Reinforcement learning in populations of spiking neurons
    Solving the Distal Reward Problem through Linkage of STDP and Dopamine Signaling
  • 原文地址:https://www.cnblogs.com/playforever/p/9706748.html
Copyright © 2011-2022 走看看