zoukankan      html  css  js  c++  java
  • java使用spark/spark-sql处理schema数据

    1、spark是什么?

    Spark是基于内存计算的大数据并行计算框架。

    1.1 Spark基于内存计算

    相比于MapReduce基于IO计算,提高了在大数据环境下数据处理的实时性。

    1.2 高容错性和高可伸缩性

    与mapreduce框架相同,允许用户将Spark部署在大量廉价硬件之上,形成集群。

    2、spark编程

    每一个spark应用程序都包含一个驱动程序(driver program ),他会运行用户的main函数,并在集群上执行各种并行操作(parallel operations)

    spark提供的最主要的抽象概念有两种: 
    弹性分布式数据集(resilient distributed dataset)简称RDD ,他是一个元素集合,被分区地分布到集群的不同节点上,可以被并行操作,RDDS可以从hdfs(或者任意其他的支持Hadoop的文件系统)上的一个文件开始创建,或者通过转换驱动程序中已经存在的Scala集合得到,用户也可以让spark将一个RDD持久化到内存中,使其能再并行操作中被有效地重复使用,最后RDD能自动从节点故障中恢复

    spark的第二个抽象概念是共享变量(shared variables),它可以在并行操作中使用,在默认情况下,当spark将一个函数以任务集的形式在不同的节点上并行运行时,会将该函数所使用的每个变量拷贝传递给每一个任务中,有时候,一个变量需要在任务之间,或者驱动程序之间进行共享,spark支持两种共享变量: 
    广播变量(broadcast variables),它可以在所有节点的内存中缓存一个值。 
    累加器(accumulators):只能用于做加法的变量,例如计算器或求和器

    3、spark-sql

    spark-sql是将hive sql跑在spark引擎上的一种方式,提供了基于schema处理数据的方式。

    4、代码详解

    java spark和spark-sql依赖。

    pom.xml

    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>

    基于spark1.6创建HiveContext客户端。在spark2.1已经开始使用sparksession了。请注意。

    package com.xiaoju.dqa.fireman.driver;
    import com.xiaoju.dqa.fireman.exception.SparkInitException;
    import com.xiaoju.dqa.fireman.utils.PropertiesUtil;
    import org.apache.spark.SparkConf;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.hive.HiveContext;
    
    import java.io.IOException;
    import java.util.Properties;
    
    public class SparkClient {
        private SparkConf sparkConf;
        private JavaSparkContext javaSparkContext;
    
        public SparkClient() {
            initSparkConf();
            javaSparkContext = new JavaSparkContext(sparkConf);
        }
    
        public SQLContext getSQLContext() throws SparkInitException {
            return new SQLContext(javaSparkContext);
        }
    
        public HiveContext getHiveContext() throws SparkInitException {
            return new HiveContext(javaSparkContext);
        }
    
        private void initSparkConf() {
            try {
                PropertiesUtil propUtil = new PropertiesUtil("fireman.properties");
                Properties prop = propUtil.getProperties();
                String warehouseLocation = System.getProperty("user.dir");
                sparkConf = new SparkConf()
                        .setAppName(prop.getProperty("spark.appname"))
                        .set("spark.sql.warehouse.dir", warehouseLocation)
                        .setMaster(prop.getProperty("spark.master"));
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    
    }

    驱动程序driver

    1、这里要实现可序列化接口,否则spark并不会识别这个类。

    2、这里在通过spark-sql读取到row数据之后,将schema解析出来,并且映射为hashmap。

    public class FiremanDriver implements Serializable {
        private String db;
        private String table;
    private HiveContext hiveContext;public FiremanDriver(String db, String table) {
            try {
                this.db = db;
                this.table = table;
                SparkClient sparkClient = new SparkClient();
                hiveContext = sparkClient.getHiveContext();
            } catch (SparkInitException ex) {
                ex.printStackTrace();
            }
        }
      
    public void check() { HashMap<String, Object> result = null; try { String query = String.format("select * from %s.%s", db ,table); System.out.println(query); DataFrame rows = hiveContext.sql(query); JavaRDD<Row> rdd = rows.toJavaRDD(); result = rdd.map(new Function<Row, HashMap<String, Object>>() { @Override public HashMap<String, Object> call(Row row) throws Exception { HashMap<String, Object> fuseResult = new HashMap<String, Object>(); HashMap<String, Object> rowMap = formatRowMap(row); // 实际map过程 return mapResult; } }).reduce(new Function2<HashMap<String, Object>, HashMap<String, Object>, HashMap<String, Object>>() { @Override public HashMap<String, Object> call(HashMap<String, Object> map1, HashMap<String, Object> map2) throws Exception { // reduce merge过程
                return mergeResult; } }); } catch (Exception ex) { ex.printStackTrace(); } }   // 读取shema,这里在通过spark-sql读取到row数据之后,将schema解析出来,并且映射为hashmap private HashMap<String, Object> formatRowMap(Row row){ HashMap<String, Object> rowMap = new HashMap<String, Object>(); try {         for (int i=0; i<row.schema().fields().length; i++) { String colName = row.schema().fields()[i].name(); Object colValue = row.get(i); rowMap.put(colName, colValue); }catch (Exception ex) { ex.printStackTrace(); } return rowMap; } public static void main(String[] args) { String db = args[0]; String table = args[1]; FiremanDriver firemanDriver = new FiremanDriver(db, table); firemanDriver.check(); } }
  • 相关阅读:
    转载:Cgroups 与 Systemd
    转载:linux cgroups 简介
    深入剖析Linux IO原理
    Tomcat zabbix监控、jmx监控、zabbix_java_gateway
    Tomcat 打开jmx
    vsftp、ftps 搭建
    Tomcat 调优
    Tomcat 部署及配置
    SVN 搭建
    Nginx 编译安装
  • 原文地址:https://www.cnblogs.com/kangoroo/p/6891540.html
Copyright © 2011-2022 走看看