zoukankan      html  css  js  c++  java
  • Spark 加载数据库mysql表中数据进行分析

    1.工程maven依赖包

     1  
     2 <properties>
     3     <spark_version>2.3.1</spark_version>
     4     <!-- elasticsearch-->
     5     <elasticsearch.version>5.5.2</elasticsearch.version>
     6     <fastjson.version>1.2.28</fastjson.version>
     7     <elasticsearch-hadoop.version>6.3.2</elasticsearch-hadoop.version>
     8     <elasticsearch-spark.version>5.5.2</elasticsearch-spark.version>
     9 </properties>
    10 <dependencies>
    11     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    12     <dependency>
    13         <groupId>org.apache.spark</groupId>
    14         <artifactId>spark-core_2.11</artifactId>
    15         <version>${spark_version}</version>
    16     </dependency>
    17     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    18     <dependency>
    19         <groupId>org.apache.spark</groupId>
    20         <artifactId>spark-sql_2.11</artifactId>
    21         <version>${spark_version}</version>
    22     </dependency>
    23     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn -->
    24     <dependency>
    25         <groupId>org.apache.spark</groupId>
    26         <artifactId>spark-yarn_2.11</artifactId>
    27         <version>${spark_version}</version>
    28     </dependency>
    29     <dependency>
    30         <groupId>org.elasticsearch</groupId>
    31         <artifactId>elasticsearch-spark-20_2.11</artifactId>
    32         <version>${elasticsearch-spark.version}</version>
    33     </dependency>
    34     <dependency>
    35         <groupId>mysql</groupId>
    36         <artifactId>mysql-connector-java</artifactId>
    37         <version>5.1.46</version>
    38     </dependency>
    39 </dependencies>

    2.spark加载数据库中数据

     1 public class GoodsFromMySQL {
     2 
     3     /**
     4      * 加载数据库数据
     5      *
     6      * @param sc           spark context
     7      * @param sparkSession spark session
     8      */
     9     public static void loadGoodsInfo(SparkContext sc, SparkSession sparkSession) {
    10         String url = "jdbc:mysql://x.x.x.x:3306/db-test";
    11 
    12         String sql = "(SELECT item_name as itemName, goods_category as goodsCategory FROM goods where dict_type='100203' and item_name " +
    13                 "is not null) as my-goods";
    14 
    15         SQLContext sqlContext = SQLContext.getOrCreate(sc);
    16         DataFrameReader reader = sqlContext.read().format("jdbc").
    17                 option("url", url).option("dbtable", sql).
    18                 option("driver", "com.mysql.jdbc.Driver").
    19                 option("user", "root").
    20                 option("password", "xxxxx");
    21 
    22 
    23         Dataset<Row> goodsDataSet = reader.load();
    24 
    25         // Looks the schema of this DataFrame.
    26         goodsDataSet.printSchema();
    27 
    28         goodsDataSet.write().mode(SaveMode.Overwrite).json("/data/app/source_new.json");
    29     }
    30 
    31 
    32     public static void main(String[] args) {
    33         SparkConf conf = new SparkConf().setAppName("my-app");
    34         SparkContext sc = new SparkContext(conf);
    35 
    36         SparkSession sparkSession = new SparkSession(sc);
    37 
    38         loadGoodsInfo(sc, sparkSession);
    39     }
    40 }

    3.spark支持加载多种数据库,仅需要用户依赖不同的数据库驱动包,并且代码进行微调即可

      根据以上java代码,仅需调整18行,更改驱动加载类即可。

  • 相关阅读:
    检索 COM 类工厂中 CLSID 为 {0002450000000000C000000000000046} 的组件时失败,原因是出现以下错误: 80070005。
    行列转换(sqlserver2005 的新方法)
    今天开始要详细的记录学习sharepoint 的进度和相关的一些资料
    SQL SERVER 2005 数据库状态为“可疑”的解决方法
    弹出窗口window.open()的参数列表
    C#术语&&C#关键字
    把一个 ASP.NET 程序转换为了 Web Services
    修饰符(C# 参考)
    C# 中的常用正则表达式
    1、String.format()与String.valueOf()区别 && 2、string.split()
  • 原文地址:https://www.cnblogs.com/woodylau/p/9475709.html
Copyright © 2011-2022 走看看