zoukankan      html  css  js  c++  java
  • spark 分析sql内容再插入到sql表中

    package cn.spark.study.core.mycode_dataFrame;

    import java.sql.DriverManager;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;

    import java.sql.Connection;
    import java.sql.Statement;

    import scala.Tuple2;

    public class JDBCDataSource {
    public static void main(String[] args) {
    SparkConf conf = new SparkConf()
    .setAppName("JDBCDataSource")
    .setMaster("local")
    ;
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
    Map<String,String> options = new HashMap<String,String>();
    options.put("url", "jdbc:mysql://127.0.0.1:3306/testdb");
    options.put("dbtable","student_infos");
    //加载表信息
    DataFrame studentInfosDF = sqlContext.read().format("jdbc")
    .options(options).load();
    options.put("dbtable", "student_scores");
    DataFrame studentScoreDF = sqlContext.read().format("jdbc")
    .options(options).load();
    JavaPairRDD<String, Tuple2<Integer, Integer>> result_RDD = studentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

    @Override
    public Tuple2<String, Integer> call(Row row) throws Exception {
    return new Tuple2<String, Integer>(row.getString(0),Integer.valueOf(String.valueOf(row.get(1))));
    }
    })
    .join(studentScoreDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

    @Override
    public Tuple2<String, Integer> call(Row row) throws Exception {
    return new Tuple2<String, Integer>(row.getString(0),Integer.valueOf(String.valueOf(row.get(1))));
    }
    }));

    // result_RDD.foreach(new VoidFunction<Tuple2<String,Tuple2<Integer,Integer>>>() {
    //
    // @Override
    // public void call(Tuple2<String, Tuple2<Integer, Integer>> tuple)
    // throws Exception {
    // System.out.println(tuple._1 + ":" + tuple._2._1 + ":" + tuple._2._2);
    // }
    // });

    JavaRDD<Row> result_RDD_ROW= result_RDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
    @Override
    public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple)
    throws Exception {
    return RowFactory.create(tuple._1,tuple._2._1,tuple._2._2);
    }
    });
    List<StructField> list = new ArrayList<StructField>();
    list.add(DataTypes.createStructField("name", DataTypes.StringType, true));
    list.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
    list.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
    StructType st = DataTypes.createStructType(list);
    DataFrame result_df = sqlContext.createDataFrame(result_RDD_ROW, st);
    result_df.javaRDD().foreach(new VoidFunction<Row>() {
    private static final long serialVersionUID = 1L;

    @Override
    public void call(Row row) throws Exception {
    String sql = "insert into good_student_infos values("
    + "'" + String.valueOf(row.getString(0)) + "',"
    + Integer.valueOf(String.valueOf(row.get(1))) + ","
    + Integer.valueOf(String.valueOf(row.get(2))) + ")";
    System.out.println("sql:" + sql);
    Class.forName("com.mysql.jdbc.Driver");
    Connection conn = null;
    Statement stmt = null;
    try {
    conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/testdb","","");
    stmt = conn.createStatement();
    stmt.execute(sql);
    } catch (Exception e) {
    e.printStackTrace();
    }finally
    {
    if(stmt != null)
    {
    stmt.close();
    }
    if(conn != null)
    {
    conn.close();
    }
    }
    }
    });
    }
    }

    数据库准备

    -- create table student_info(name VARCHAR(20),age INTEGER);
    -- create table studnet_scores(name VARCHAR(20),score INTEGER);
    -- insert into student_info values("leo",18),("marry",17),("jack",19)
    -- insert into student_scores values("leo",88),("marry",99),("jack",60)
    -- ALTER TABLE student_info RENAME to student_infos;
    -- create table good_student_infos(name VARCHAR(20),age INTEGER,score INTEGER)

     最终生成的表

  • 相关阅读:
    spring boot三种方式设置跨域
    完整卸载Mysql
    【OBIEE】OBIEE集成Echarts作图
    【OBIEE】BIEE培训(一)
    【Oracle】Oracle物化视图
    【Oracle】oracle11g安装过程提示swap size 检查失败问题
    【Linux】centOS7下安装GUI图形界面
    【Nginx】Linux环境搭建nginx负载
    【oracle】Oracle创建带参数视图
    抢票:搭建github最火的12306项目
  • 原文地址:https://www.cnblogs.com/yaohaitao/p/5692183.html
Copyright © 2011-2022 走看看