zoukankan      html  css  js  c++  java
  • Spark1.6.2 java实现读取txt文件插入MySql数据库代码

    package com.gosun.spark1;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;


    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.types.DataType;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.DateType;
    import org.apache.spark.sql.types.Metadata;
    import org.apache.spark.sql.types.StringType;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;

    public class MySqlWriter {

        public static void main(String[] args) throws ClassNotFoundException {
            
            long current = System.currentTimeMillis();
            //String master = "spark://192.168.31.34:7077";
            String localmaster = "local[5]";
            SparkConf sparkConf = new SparkConf().setAppName("mysqlJdbc").setMaster(localmaster);
            // spark应用的上下对象
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            SQLContext sqlContext = new SQLContext(sc);
            String url = "jdbc:mysql://192.168.31.16:3306/db?useUnicode=true&characterEncoding=utf-8";
            String table = "tb_user";
            Properties connectionProperties = new Properties();
            connectionProperties.put("user", "root");
            connectionProperties.put("password", "mysql");
            connectionProperties.put("driver", "com.mysql.jdbc.Driver");
            // 加载数据库中的数据
            JavaRDD<String> lines = sc.textFile( "F:/BaiduYunDownload/data/class9/user1.txt" );  
            JavaRDD<Row> personRDD  = lines.map(new Function<String, Row>() {  
                  
                private static final long serialVersionUID = 1L;  
     
                public Row call( String line )  
                    throws Exception {               
                    String[] split = line.split(" ");  
                    return RowFactory.create(String.valueOf(split[0]),String.valueOf(split[1]));  
                }  
            });  
            List<StructField> structFields = new ArrayList<StructField>();
            structFields.add(DataTypes.createStructField( "id", DataTypes.StringType, false ));
            structFields.add(DataTypes.createStructField( "name", DataTypes.StringType, true ));
            StructType structType = DataTypes.createStructType( structFields );  
            DataFrame usersDf = sqlContext.createDataFrame( personRDD, structType);  
            usersDf.write().mode(SaveMode.Append).mode(SaveMode.Overwrite).jdbc(url, table, connectionProperties);
            System.out.println((System.currentTimeMillis() - current) / 1000 + "s");
        }

        
    }

  • 相关阅读:
    seajs快速了解
    lazyload.js详解
    iScroll-js—“smooth scrolling for the web”
    Backbone学习笔记一Backbone中的MVC
    JMH基准测试框架
    idea 下运行安卓项目
    安卓
    C++
    看完
    四叉树的js实现
  • 原文地址:https://www.cnblogs.com/jingblogs/p/5714587.html
Copyright © 2011-2022 走看看