zoukankan      html  css  js  c++  java
  • Spark使用Java、Scala 读取mysql、json、csv数据以及写入操作

    一、pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>SparkSQL</groupId>
        <artifactId>com.sparksql.test</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
             <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.24</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.0</version>
            </dependency>
            <dependency>
                <groupId>net.sf.json-lib</groupId>
                <artifactId>json-lib</artifactId>
                <version>2.4</version>
                <classifier>jdk15</classifier>
            </dependency>
    
        </dependencies>
    
    </project>
    

    二、spark代码

    2.1 Java方式

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    
    import java.util.Properties;
    
    /**
     * Created by Administrator on 2017/11/6.
     */
    public class SparkMysql {
        public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class);
    
        public static void main(String[] args) {
            JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
            SQLContext sqlContext = new SQLContext(sparkContext);
            //读取mysql数据
            readMySQL(sqlContext);
    
            //停止SparkContext
            sparkContext.stop();
        }
            private static void readMySQL(SQLContext sqlContext){
            //jdbc.url=jdbc:mysql://localhost:3306/database
            String url = "jdbc:mysql://localhost:3306/test";
            //查找的表名
            String table = "user_test";
            //增加数据库的用户名(user)密码(password),指定test数据库的驱动(driver)
            Properties connectionProperties = new Properties();
            connectionProperties.put("user","root");
            connectionProperties.put("password","123456");
            connectionProperties.put("driver","com.mysql.jdbc.Driver");
    
            //SparkJdbc读取Postgresql的products表内容
            System.out.println("读取test数据库中的user_test表内容");
            // 读取表中所有数据
            DataFrame jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*");
            //显示数据
            jdbcDF.show();
        }
    }
    

    2.2 Scala方式

    import java.util.Properties
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object Passanger {
      def main(args: Array[String]): Unit = {
        //创建sparkSession
        val spark: SparkSession = SparkSession.builder().appName("Vehicle").master("local[4]").getOrCreate()
        //创建properties对象 设置连接mysql的信息
        val prop: Properties = new Properties()
        prop.setProperty("user", "root")
        prop.setProperty("password", "123456")
    
        //读取mysql数据
        val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/11", "12", prop)
        mysqlDF.createOrReplaceTempView("passenger")
        vehicleDF.createOrReplaceTempView(("vehicle"))
        spark.stop()
      }
    }
        
    

    三、写入数据到mysql中

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructType;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * Created by Administrator on 2017/11/6.
     */
    public class SparkMysql {
        public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class);
    
        public static void main(String[] args) {
            JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
            SQLContext sqlContext = new SQLContext(sparkContext);
            //写入的数据内容
            JavaRDD<String> personData = sparkContext.parallelize(Arrays.asList("1 tom 5","2 jack 6","3 alex 7"));
            //数据库内容
            String url = "jdbc:mysql://localhost:3306/test";
            Properties connectionProperties = new Properties();
            connectionProperties.put("user","root");
            connectionProperties.put("password","123456");
            connectionProperties.put("driver","com.mysql.jdbc.Driver");
            /**
             * 第一步:在RDD的基础上创建类型为Row的RDD
             */
            //将RDD变成以Row为类型的RDD。Row可以简单理解为Table的一行数据
            JavaRDD<Row> personsRDD = personData.map(new Function<String,Row>(){
                public Row call(String line) throws Exception {
                    String[] splited = line.split(" ");
                    return RowFactory.create(Integer.valueOf(splited[0]),splited[1],Integer.valueOf(splited[2]));
                }
            });
    
            /**
             * 第二步:动态构造DataFrame的元数据。
             */
            List structFields = new ArrayList();
            structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true));
            structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
            structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));
    
            //构建StructType,用于最后DataFrame元数据的描述
            StructType structType = DataTypes.createStructType(structFields);
    
            /**
             * 第三步:基于已有的元数据以及RDD<Row>来构造DataFrame
             */
            DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType);
    
            /**
             * 第四步:将数据写入到person表中
             */
            personsDF.write().mode("append").jdbc(url,"person",connectionProperties);
    
            //停止SparkContext
            sparkContext.stop();
        }
     }
    

    四、DataFrameLoadTest

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{SQLContext, SaveMode}
    
    object DataFrameLoadTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("DataFrameLoadTest").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        //数据源load sparkSQL默认的文件的格式parquet(列式的文件存储格式)文件
        //sqlContext.read.load("url")
        //可以指定一下文件类型
        //sqlContext.read.format("json").load("url")
        //指定存储格式
        //sqlContext.read.load().write.json()
                                //write.jdbc()
                                //write.parquet()
                                //write.save() 使用默认
                                //write.format("json").save()
        //如果存储目录存在
        /*
        * mode(SaveMode.Append) 追加
        * mode(SaveMode.ErrorIfExists) 报错(默认)
        * mode(SaveMode.Overwrite)重写
        * mode(SaveMode.Ignore)不更新
        * */
        //sqlContext.read.load("url").write.mode(SaveMode.ErrorIfExists).format("json").save()
    
    
        //数据源之jdbc     使用mysql
        //postgresql类似于mysql关系型数据库  很多公司用他作为hive的元数据库
        sqlContext.read.format("jdbc").options(
          Map("url"->"jdbc:mysql://hadoop4:3306/sparksqltest","dbtable"->"t_1211","user"->"root","password"->"mysql")
        ).load().show()
        //spark-shell --driver-class-path /usr/local/soft/spark/mysql-connector-java-5.1.44-bin.jar
      }
    }
    

    五、读取数据库中的数据写到

    import java.util.Properties
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * 读取数据库中的数据写到
      * 1.数据库中
      * 2.文本文件中 注意:Text data source supports only a single column
      * 3.json文件中
      * 4.CSV文件中
      * 5.paruet文件中 注:工作中最长用,因为存储列的元数据,可以读取某一列数据
      *
      * Created by lym on 2019/2/11
      */
    object JDBCDataSource {
        def main(args: Array[String]): Unit = {
            // 1.创建sparkSession
            val spark: SparkSession = SparkSession.builder().appName(s"${this.getClass.getName}").master("local[*]").getOrCreate()
            // 2.读取数据库中的数据
            val is = Thread.currentThread().getContextClassLoader.getResourceAsStream("config.properties")
            var properties = new Properties()
            properties.load(is)
            val df: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3307/demo?charactorEncoding=utf-8", "bigdata", properties)
            // 3.将数据输出目标中
            df.createTempView("bigdata")
            val dfRes: DataFrame = spark.sql("select * from bigdata where num > 1000")
            // 3.1将数据写入到数据库中
    //        dfRes.write.jdbc("jdbc:mysql://localhost:3307/demo?characterEncoding=utf-8", "bigdata1", properties)
            // 3.2将数据写入到text文件中
    //        dfRes.write.text("D:/logs/test/eee")
            // 3.3将数据写入到json文件中
    //        dfRes.write.json("D:/logs/test/fff")
            // 3.4将数据写入到CSV文件中
    //        dfRes.write.csv("D:/logs/test/ggg")
            // 3.5将数据写入到paruet文件中
            dfRes.write.parquet("D:/logs/test/hhh")
    
            // 4.展示数据
            dfRes.show()
            // 5.释放资源
            spark.stop()
        }
    }
    

    六、通过jdbc方式编程

    import java.sql.DriverManager
    
    /**
      * 通过jdbc方式编程
      */
    object SparkSQLThriftserverApp {
      def main(args: Array[String]): Unit = {
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        val conn = DriverManager.getConnection("jdbc:hive2://192.168.126.136:10000/lzc","root","");
        val pstmt = conn.prepareStatement("select * from user");
        val rs = pstmt.executeQuery();
        while(rs.next()) {
          println("id:" + rs.getInt("id") + " name:" + rs.getString("name"));
        }
        rs.close();
        pstmt.close();
        conn.close();
      }
    }
    

    七、spark:scala读取mysql的4种方法

    import java.sql.DriverManager
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.JdbcRDD
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SQLContext
    import java.util.Properties
     
    object SparkOnMysql {
     
      def main(args: Array[String]) {
        
        val sparkConf = new SparkConf().setMaster("spark://OPENFIRE-DEV:7080").setAppName("spark sql test");
        val sc = new SparkContext(sparkConf);
        val sqlContext = new SQLContext(sc);
        
        //1. 不指定查询条件
        //这个方式链接MySql的函数原型是:
        //我们只需要提供Driver的url,需要查询的表名,以及连接表相关属性properties。下面是具体例子:
        val url = "jdbc:mysql://192.168.0.101:3306/sas_vip?user=root&password=123456";
        val prop = new Properties();
        val df = sqlContext.read.jdbc(url, "stock", prop);
        println("第一种方法输出:"+df.count());
        println("1.------------->" + df.count());
        println("1.------------->" + df.rdd.partitions.size);
        
        //2.指定数据库字段的范围
        //这种方式就是通过指定数据库中某个字段的范围,但是遗憾的是,这个字段必须是数字,来看看这个函数的函数原型:
        /* def jdbc(
        url: String,
        table: String,
        columnName: String,
        lowerBound: Long,
        upperBound: Long,
        numPartitions: Int,
        connectionProperties: Properties): DataFrame*/
        //前两个字段的含义和方法一类似。columnName就是需要分区的字段,这个字段在数据库中的类型必须是数字;
        //lowerBound就是分区的下界;upperBound就是分区的上界;numPartitions是分区的个数。同样,我们也来看看如何使用:
        val lowerBound = 1;
        val upperBound = 6;
        val numPartitions = 2;
        val url1 = "jdbc:mysql://192.168.0.101:3306/sas_vip?user=root&password=123456";
        val prop1 = new Properties();
        val df1 = sqlContext.read.jdbc(url1, "stock", "id", lowerBound, upperBound, numPartitions, prop1);
        println("第二种方法输出:" + df1.rdd.partitions.size);
        df1.collect().foreach(println)
        
         /*这个方法可以将iteblog表的数据分布到RDD的几个分区中,分区的数量由numPartitions参数决定,在理想情况下,每个分区处理相同数量的数据,我们在使用的时候不建议将这个值设置的比较大,因为这可能导致数据库挂掉!但是根据前面介绍,这个函数的缺点就是只能使用整形数据字段作为分区关键字。
    这个函数在极端情况下,也就是设置将numPartitions设置为1,其含义和第一种方式一致。*/
        
        //3.根据任意字段进行分区
        //基于前面两种方法的限制, Spark 还提供了根据任意字段进行分区的方法,函数原型如下:
        /*def jdbc(
        url: String,
        table: String,
        predicates: Array[String],
        connectionProperties: Properties): DataFrame*/
        //这个函数相比第一种方式多了predicates参数,我们可以通过这个参数设置分区的依据,来看看例子:
        //这个函数相比第一种方式多了predicates参数,我们可以通过这个参数设置分区的依据,来看看例子:
        val predicates = Array[String]("id <= 2", "id >= 4 and id <= 5 ")
        val url2 = "jdbc:mysql://192.168.0.101:3306/sas_vip?user=root&password=123456"
        val prop2 = new Properties()
        val df2 = sqlContext.read.jdbc(url, "stock", predicates, prop2)
        println("第三种方法输出:"+df2.rdd.partitions.size+","+predicates.length);
        df2.collect().foreach(println)
        //最后rdd的分区数量就等于predicates.length。
       
        
        //4.通过load获取
        //Spark还提供通过load的方式来读取数据。
        val url3 = "jdbc:mysql://192.168.0.101:3306/sas_vip?user=root&password=123456"
        val df3 = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "stock").load()
        println("第四种方法输出:"+df3.rdd.partitions.size);
        df.collect().foreach(println)
     
        sc.stop()
      }
    }
    

    提交作业:

    spark-submit --class com.wonhigh.liuzx.SparkOnMysql --master spark://dev-app-209-211:7080 /usr/local/wonhigh/miu-tag-spark-0.0.1-SNAPSHOT.jar
    

    八、读取csv数据插入到MySQL

    将csv的编码格式转为utf-8,否则spark读取中文乱码,转码方法见:https://jingyan.baidu.com/article/fea4511a092e53f7bb912528.html

    在这里插入图片描述

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SaveMode
    import java.util.Properties
    
    /**
     * 从USER_T.csv读取数据并插入的mysql表中
     */
    object MysqlInsertDemo {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("MysqlInsertDemo").master("local").getOrCreate()
        val df = spark.read.option("header", "true").csv("src/main/resources/scala/USER_T.csv")
        df.show()
        val url = "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8"
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "Root-123456")
        df.write.mode(SaveMode.Append).jdbc(url, "USER_T", prop)
      }
    
    

    部分博文原文信息

    fengzhimohan

    spark:scala读取mysql的4种方法

  • 相关阅读:
    在vmware workstation10上安装ubuntu14.04,出现以下问题
    经典句
    杂文
    matlab里textread出现错误“Trouble reading floating point number from file (row 1, field 1)”
    Nginx配置杂记(转)
    mysql经典案例分析
    Git查看、删除、重命名远程分支和tag(转)
    nginx下开启pathinfo模式
    ubuntu-apache如何解决跨域资源访问
    c语言插入排序
  • 原文地址:https://www.cnblogs.com/aixing/p/13327368.html
Copyright © 2011-2022 走看看