zoukankan      html  css  js  c++  java
  • sparksql json 合并json数据

    java

     1 public class Demo {
     2     private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local");
     3     private static JavaSparkContext jsc = new JavaSparkContext(conf);
     4     private static SparkSession session = new SparkSession(jsc.sc());
     5 
     6     public static void main(String[] args) {
     7 
     8         // 加载students.json name,score
     9         Dataset<Row> score = session.read().json("./src/main/java/cn/tele/spark_sql/json/students.json");
    10 
    11         score.createOrReplaceTempView("scoreView");
    12 
    13         // name,score
    14         JavaRDD<Row> scoreRDD = session.sql("select * from scoreView where score > 80").javaRDD();
    15 
    16         // 创建信息json name,age
    17         JavaRDD<String> infoRDD = jsc.parallelize(Arrays.asList("{"name":"Leo","age":18}",
    18                 "{"name":"Marry","age":19}", "{"name":"Jack","age":20}"));
    19 
    20         Dataset<Row> info = session.read().json(infoRDD);
    21         info.createOrReplaceTempView("infoView");
    22 
    23         // 拼接sql
    24         List<Row> scoreList = scoreRDD.collect();
    25 
    26         String sql = "select * from infoView where name in (";
    27         for (int i = 0; i < scoreList.size(); i++) {
    28             sql += "'" + scoreList.get(i).getAs("name") + "'";
    29             if (i < scoreList.size() - 1) {
    30                 sql += ",";
    31             }
    32         }
    33 
    34         sql += ")";
    35 
    36         // 查询 分数>80的学生的name,age
    37 
    38         // 转换
    39         JavaPairRDD<String, Integer> tempRDD = session.sql(sql).javaRDD()
    40                 .mapToPair(new PairFunction<Row, String, Integer>() {
    41 
    42                     private static final long serialVersionUID = 1L;
    43 
    44                     @Override
    45                     public Tuple2<String, Integer> call(Row t) throws Exception {
    46                         return new Tuple2<String, Integer>(t.getAs("name"), Integer.valueOf(t.getAs("age").toString()));
    47                     }
    48                 });
    49 
    50         JavaPairRDD<String, Integer> scoreRDD2 = scoreRDD.mapToPair(new PairFunction<Row, String, Integer>() {
    51 
    52             private static final long serialVersionUID = 1L;
    53 
    54             @Override
    55             public Tuple2<String, Integer> call(Row t) throws Exception {
    56                 return new Tuple2<String, Integer>(t.getAs("name"), Integer.valueOf(t.getAs("score").toString()));
    57             }
    58         });
    59 
    60         // join
    61         JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD = tempRDD.join(scoreRDD2);
    62 
    63         // 遍历
    64         resultRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {
    65 
    66             private static final long serialVersionUID = 1L;
    67 
    68             @Override
    69             public void call(Tuple2<String, Tuple2<Integer, Integer>> t) throws Exception {
    70                 System.out.println("name:" + t._1 + "," + "age:" + t._2._1 + ",score:" + t._2._2);
    71             }
    72         });
    73 
    74         // 保存为json格式
    75         StructType schema = DataTypes
    76                 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false),
    77                         DataTypes.createStructField("age", DataTypes.IntegerType, false),
    78                         DataTypes.createStructField("score", DataTypes.IntegerType, false)));
    79 
    80         JavaRDD<Row> rowRDD = resultRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
    81 
    82             private static final long serialVersionUID = 1L;
    83 
    84             @Override
    85             public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception {
    86                 return RowFactory.create(v1._1, Integer.valueOf(v1._2._1), Integer.valueOf(v1._2._2));
    87             }
    88         });
    89 
    90         Dataset<Row> resultDS = session.createDataFrame(rowRDD, schema);
    91 
    92         resultDS.write().format("json").mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/json/result");
    93 
    94         session.stop();
    95         jsc.close();
    96     }
    97 }

    scala

     1 object Demo {
     2   def main(args: Array[String]): Unit = {
     3     val conf = new SparkConf().setAppName("demo").setMaster("local")
     4     val sc = new SparkContext(conf)
     5     val sqlContext = new SQLContext(sc)
     6 
     7     //加载score 信息
     8     val scoreDF = sqlContext.read.json("./src/main/scala/cn/tele/spark_sql/json/students.json")
     9 
    10     scoreDF.createOrReplaceTempView("scoreView")
    11 
    12     val arr = sqlContext.sql("select * from scoreView where score > 80").rdd.collect()
    13 
    14     //创建 学生信息
    15     val infoRDD = sc.parallelize(Array(
    16       "{"name":"Leo","age":20}",
    17       "{"name":"Marry","age":30}",
    18       "{"name":"Jack","age":21}"), 2)
    19 
    20     val infoDS = sqlContext.read.json(infoRDD)
    21 
    22     infoDS.createOrReplaceTempView("infoView")
    23 
    24     var sql = "select * from infoView where name in ("
    25     //拼接sql
    26     for (i <- 0 to arr.length - 1) {
    27       sql += "'" + arr(i).getAs[String]("name") + "'"
    28       if (i < arr.length - 1) {
    29         sql += ","
    30       }
    31     }
    32 
    33     sql += ")"
    34 
    35     val tempRDD = sqlContext.sql(sql).rdd.map(row => {
    36       (row.getAs[String]("name"), row.getAs[Long]("age").toInt)
    37     })
    38 
    39     val tempRDD2 = scoreDF.rdd.map(row => {
    40       (row.getAs[String]("name"), row.getAs[Long]("score").toInt)
    41     })
    42 
    43     //join
    44     val resultRDD = tempRDD.join(tempRDD2)
    45 
    46     //遍历
    47     resultRDD.foreach(t => {
    48       println("name:" + t._1 + "age:" + t._2._1 + "score:" + t._2._2)
    49     })
    50 
    51     val rowRDD = resultRDD.map(t => Row(t._1, t._2._1, t._2._2))
    52 
    53     //保存为json文件
    54     val schema = DataTypes.createStructType(Array(
    55       StructField("name", DataTypes.StringType, false),
    56       StructField("age", DataTypes.IntegerType, false),
    57       StructField("score", DataTypes.IntegerType, false)))
    58 
    59     val df = sqlContext.createDataFrame(rowRDD, schema)
    60 
    61     df.write.format("json").mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/json/result")
    62   }
    63 }
  • 相关阅读:
    svn cleanup failed–previous operation has not finished 解决方法
    开源SNS社区系统推荐
    从网络获取图片本地保存
    MS SQL Server 数据库连接字符串
    KeepAlive
    Configure Git in debian
    sqlserver query time
    RPi Text to Speech (Speech Synthesis)
    SQL Joins with C# LINQ
    search or reseed identity columns in sqlserver 2008
  • 原文地址:https://www.cnblogs.com/tele-share/p/10391828.html
Copyright © 2011-2022 走看看