zoukankan      html  css  js  c++  java
  • 使用spark与MySQL进行数据交互的方法

    在项目中,遇到一个场景是,需要从Hive数据仓库中拉取数据,进行过滤、裁剪或者聚合之后生成中间结果导入MySQL。

    对于这样一个极其普通的离线计算场景,有多种技术选型可以实现。例如,sqoop,MR,HSQL。

    我们这里使用的spark,优点来说是两个:一是灵活性高,二是代码简洁。

    1)灵活性高

    相比sqoop和HSQL,spark可以更灵活的控制过滤和裁剪逻辑,甚至你可以通过外部的配置或者参数,来动态的调整spark的计算行为,提供定制化。

    2)代码简洁

    相比MR来说,代码量上少了很多。也无需实现MySQL客户端。

    我抽象了一下需求,做了如下一个demo。

    涉及的数据源有两个:Hive&MySQL;计算引擎:spark&spark-sql。我们的demo中分为两个步骤:

    1)从Hive中读取数据,交给spark计算,最终输出到MySQL;

    2)从MySQL中读取数据,交给spark计算,最终再输出到MySQL另一张表。

    1、 数据准备

    创建了Hive外部分区表

    关于分区和外部表这里不说了。

    CREATE EXTERNAL TABLE `gulfstream_test.accounts`(
      `id` string COMMENT '用户id', 
      `order_id` string COMMENT '订单id', 
      `status` bigint COMMENT '用户状态', 
      `count` decimal(16,9) COMMENT '订单数')
    COMMENT '用户信息'
    PARTITIONED BY ( 
      `year` string, 
      `month` string, 
      `day` string)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY '	' 
    STORED AS INPUTFORMAT 
      'org.autonavi.udf.CustomInputFormat' 
    OUTPUTFORMAT 
      'org.autonavi.udf.CustomHiveOutputFormat'
    LOCATION
      'hdfs://mycluster-tj/***/acounts'
    TBLPROPERTIES (
      'LEVEL'='1', 
      'TTL'='60', 
      'last_modified_by'='yangfan', 
      'last_modified_time'='2017-10-23', 
      'transient_lastDdlTime'='1508746808')

    建立分区,并指定分区路径

    这里分区使用的年月日三级分区。通过下面的命令将year=2017/month=10/day=23这个Hive分区的数据指向了location=hdfs://mycluster-tj/***/acounts/2017/10/23

    hive> alter table gulfstream_test.accounts add partition(year='2017', month='10', day='23') location 'hdfs://mycluster-tj/***/acounts/2017/10/23';

    查询一下分区是否建立成功

    可以看到分区已经有了。

    show partitions gulfstream_test.accounts;
    OK
    partition
    year=2017/month=10/day=23

    上传本地测试数据到hdfs

    hadoop fs -put a.txt  hdfs://mycluster-tj/***/acounts/2017/10/23

    看一下数据,取了前10行,原谅我数据比较假。

    [data_monitor@bigdata-arch-client10 target]$ hadoop fs -cat hdfs://mycluster-tj/***/acounts/2017/10/23/a | head -10
    0       0       0       0
    1       1       1       1
    2       2       2       2
    3       3       3       3
    4       4       4       4
    5       5       5       5
    6       6       6       6
    7       7       7       7
    8       8       8       8
    9       9       9       9

    在Hive中,也查一下前10条,是一样的。只是多了分区字段。

    hive (default)> select * from gulfstream_test.accounts where year=2017 and month=10 and day=23 limit 10;
    OK
    accounts.id     accounts.order_id       accounts.status accounts.count  accounts.year   accounts.month  accounts.day
    0       0       0       0       2017    10      23
    1       1       1       1       2017    10      23
    2       2       2       2       2017    10      23
    3       3       3       3       2017    10      23
    4       4       4       4       2017    10      23
    5       5       5       5       2017    10      23
    6       6       6       6       2017    10      23
    7       7       7       7       2017    10      23
    8       8       8       8       2017    10      23
    9       9       9       9       2017    10      23
    Time taken: 1.38 seconds, Fetched: 10 row(s)

    至此,测试数据准备好了。一共1000000条,1百万。

    2、代码

    1)POM依赖

    可以通过pom依赖来看一下笔者使用的组件版本。

    这里就不赘述了。

    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>

    打包方式

    <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <archive>
                            <manifest>
                                <!--这里要替换成jar包main方法所在类 -->
                                <mainClass>com.kangaroo.studio.algorithms.filter.LoadDB</mainClass>
    
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id> <!-- this is used for inheritance merges -->
                            <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>

    2)java spark代码

    先贴上代码,再说明

    package com.kangaroo.studio.algorithms.filter;
    
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.hive.HiveContext;
    
    import java.io.Serializable;
    import java.util.Properties;
    
    
    public class LoadDB implements Serializable {
    
        private SparkConf sparkConf;
        private JavaSparkContext javaSparkContext;
        private HiveContext hiveContext;
        private SQLContext sqlContext;
    
        /*
        *   初始化Load
        *   创建sparkContext, sqlContext, hiveContext
        * */
        public LoadDB() {
            initSparckContext();
            initSQLContext();
            initHiveContext();
        }
    
        /*
        *   创建sparkContext
        * */
        private void initSparckContext() {
            String warehouseLocation = System.getProperty("user.dir");
            sparkConf = new SparkConf()
                    .setAppName("from-to-mysql")
                    .set("spark.sql.warehouse.dir", warehouseLocation)
                    .setMaster("yarn-client");
            javaSparkContext = new JavaSparkContext(sparkConf);
        }
    
        /*
        *   创建hiveContext
        *   用于读取Hive中的数据
        * */
        private void initHiveContext() {
            hiveContext = new HiveContext(javaSparkContext);
        }
    
        /*
        *   创建sqlContext
        *   用于读写MySQL中的数据
        * */
        private void initSQLContext() {
            sqlContext = new SQLContext(javaSparkContext);
        }
    
        /*
        *   使用spark-sql从hive中读取数据, 然后写入mysql对应表.
        * */
        public void hive2db() {
            String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
            String table = "accounts";
            Properties props = new Properties();
            props.put("user", "root");
            props.put("password", "1234");
            String query = "select * from gulfstream_test.accounts where year=2017 and month=10 and day=23";
            DataFrame rows = hiveContext.sql(query).select("id", "order_id", "status", "count");;
            rows.write().mode(SaveMode.Append).jdbc(url, table, props);
        }
    
        /*
        *   使用spark-sql从db中读取数据, 处理后再回写到db
        * */
        public void db2db() {
            String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
            String fromTable = "accounts";
            String toTable = "accountsPart";
            Properties props = new Properties();
            props.put("user", "root");
            props.put("password", "1234");
            DataFrame rows = sqlContext.read().jdbc(url, fromTable, props).where("count < 1000");
            rows.write().mode(SaveMode.Append).jdbc(url, toTable, props);
        }
    
    
        public static void main(String[] args) {
            LoadDB loadDB = new LoadDB();
            System.out.println(" ---------------------- start hive2db ------------------------");
            loadDB.hive2db();
            System.out.println(" ---------------------- finish hive2db ------------------------");
            System.out.println(" ---------------------- start db2db ------------------------");
            loadDB.db2db();
            System.out.println(" ---------------------- finish db2db ------------------------");
        }
    }

    说明:

    • hive2db

    核心动作是使用hiveContext.sql(query)执行了hiveSQL,过滤出Hive表中year=2017/month=10/day=23分钟的数据,返回一个DataFrame对象。

    DataFrame是spark-sql数据处理的核心。对DataFrame的操作推荐这样一篇博客。你可以去使用这些方法,实现复杂的逻辑。

    对DataFrame对象,我们使用了select裁剪了其中4列数据(id, order_id, status, count)出来,不过不裁剪的话,会有7列(加上分区的year,month,day)。

    然后将数据以SaveMode.Append的方式,写入了mysql中的accounts表。

    SaveMode.Append方式,数据会追加,而不会覆盖。如果想覆盖,还有一个常用的SaveMode.Overwrite。推荐这样一篇博客

    最终accounts中的数据有1000000条,百万。

    • db2db

    db2db从刚刚生成的MySQL表accounts中读取出数据,也是返回了一个dataframe对象,通过执行where过滤除了其中id<1000的数据,这里正好是1000条。

    然后写入了accountsPart。最终accountsPart数据应该有1000条。

    3)编译和执行

     编译完成后,生成jar包from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar 

    使用默认参数提交到yarn队列。

    spark-submit --queue=root.zhiliangbu_prod_datamonitor from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar 

     片刻之后,观察输出。已经全部finish了。

    4)查看一下结果

    我们到mysql中瞅一瞅。

    accounts表

    有没有注意到,其实不用建立mysql表!这个过程会自动给你创建,相当于if not exists。

    细心的你可能已经注意到了,hive里的string类型,到了MySQL中变成了Text。有个兄弟说,如果你手动创建了表,并且字段设置为String会报错,我没有试,只是记录了一下。

    CREATE TABLE `accounts` (
      `id` text,
      `order_id` text,
      `status` bigint(20) DEFAULT NULL,
      `count` decimal(16,9) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

    简单看一下里面有多少数据。1百万

    MariaDB [big_data]> select count(1) from accounts ;    
    +----------+
    | count(1) |
    +----------+
    |  1000000 |
    +----------+
    1 row in set (0.32 sec)

    acountsPart表

     CREATE TABLE `accountsPart` (
      `id` text,
      `order_id` text,
      `status` bigint(20) DEFAULT NULL,
      `count` decimal(16,9) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

    查看有多少数据,1000条,果然是没有问题的

    MariaDB [big_data]> select count(1) from accountsPart;
    +----------+
    | count(1) |
    +----------+
    |     1000 |
    +----------+
    1 row in set (0.00 sec)

    到此为止。

  • 相关阅读:
    Js注释和对象
    卸载Oracle
    数据库设计三大范式
    Oracle用户管理
    Oracle权限管理详解
    Linux 性能监控之CPU&内存&I/O监控Shell脚本2
    Linux 性能监控之CPU&内存&I/O监控Shell脚本1
    Windows Win7建立wifi热点,手机共享WIFI上网
    Mariadb MySQL逻辑条件判断相关语句、函数使用举例介绍
    Mariadb MySQL、Mariadb中GROUP_CONCAT函数使用介绍
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7728636.html
Copyright © 2011-2022 走看看