zoukankan      html  css  js  c++  java
  • spark利用sparkSQL将数据写入hive两种通用方式实现及比较

    1.写在前面

    在利用spark计算引擎将kafka或其他源数据组件的数据入hive形成数仓的过程中有两种方式,一种方式是利用spark Rdd的API将数据写入hdfs形成hdfs文件,之后再将文件和hdfs文件和hive表做加载映射。第二种方式是利用sparkSQL将获取的数据Rdd转换成dataFrame,再将dataFrame写成缓存表,最后利用sparkSQL直接插入hive表中。这两种方式各有各自的优点。但大多数开发者更倾向于后者一次编码一步到位的方式。而对于利用sparkSQL写hive表官方有两种常见的API,第一种是利用JavaBean做映射,第二种是利用StructType创建Schema做映射,下面根据代码来分析这两种API 。

    2.样例数据

    原始数据:
    tom,1
    jim,2
    lily,3
    lucy,4
    写入hive数据
    字段 : word num
    值 : tom 1
    jim 2
    lily 3
    lucy 4

    3.JavaBean做映射方式

    	    String hiveTableColumns = "word,num";
                dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                    @Override
                    public void call(JavaRDD<String> rdd) throws Exception {
                        JavaRDD<TestBean> beanJavaRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, TestBean>() {
                            @Override
                            public Iterable<TestBean> call(Iterator<String> iterator) throws Exception {
                                List<TestBean> beans = new ArrayList<>();
                                while (iterator.hasNext()){
                                    String message = iterator.next().toString();
                                    TestBean bean = new TestBean();
                                    bean.setWord(message.split(",",-1)[0]);
                                    bean.setNum(message.split(",",-1)[1]);
                                    beans.add(bean);
                                }
                                return beans;
                            }
                        });
    
                        DataFrame dataFrame = session.createDataFrame(beanJavaRDD, TestBean.class);
                        dataFrame.registerTempTable("temp_test");
    
                        session.sql("insert into test partition(create_time_p=" + new SimpleDateFormat("yyyyMMdd").format(new Date())
                                + ") select " + hiveTableColumns + " from temp_test");
                    }
                });
    
    public class TestBean implements Serializable {
        private String word;
        private String num;
    
        public String getWord() {
            return word;
        }
    
        public void setWord(String word) {
            this.word = word;
        }
    
        public String getNum() {
            return num;
        }
    
        public void setNum(String num) {
            this.num = num;
        }
    }
    

    3.利用StructType创建Schema做映射方式

    	    String hiveTableColumns = "word,num";
                dStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                    @Override
                    public void call(JavaRDD<String> rdd) throws Exception {
                        JavaRDD<Row> rowJavaRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, Row>() {
                            @Override
                            public Iterable<Row> call(Iterator<String> iterator) throws Exception {
                                List<Row> rowList = new ArrayList<>();
                                while (iterator.hasNext()){
                                    String message = iterator.next().toString();
                                    rowList.add(RowFactory.create(message.split(",", -1)));
    
                                }
                                return rowList;
                            }
                        });
    
                        DataFrame dataFrame = session.createDataFrame(rowJavaRDD, getSchema(hiveTableColumns.split(",",-1)));
                        dataFrame.registerTempTable("temp_test");
    
                        session.sql("insert into " + databaseAndTableName + " partition(create_time_p=" + new SimpleDateFormat("yyyyMMdd").format(new Date())
                                + ") select " + hiveTableColumns + " from temp_test");
                    }
                });
    
          public static StructType getSchema(String[] columns) {
            List<StructField> schemaFields = new ArrayList<>();
            for (int i = 0; i < columns.length - 1; i++) {
                schemaFields.add(DataTypes.createStructField(columns[i], DataTypes.StringType, true));
            }
            return DataTypes.createStructType(schemaFields);
        }
    

    4.对比这两种方式

    这两种方式实现方式都相对简单,也比较简洁。对于很多大数据初学者可能首先会想到第一种方式。但是第一种方式不具备通用性,也就是新增一种类型数据。又需要新建bean,然后这里JavaRDD<TestBean> beanJavaRDD需要动态,这里DataFrame dataFrame = session.createDataFrame(beanJavaRDD, TestBean.class);也比较麻烦。最后发现根本无法通用多种类型的数据,如果数据有几百种类,这种方式就不够通用,每一类数据都需要对应的程序。而第二种方式就可以通用了,只需要将数据的字段抽取配置,一个类是可以兼容无论多少种数据的。所以在开发过程中还是推荐第二种方式。但是第一种方式也有自己的优点,不会出现字段与值对应错乱的问题。而第二种方式可能稍不小心会出现字段与值错乱的问题。

  • 相关阅读:
    windows下wamp多域名的配置
    数据库设计
    面向接口编程
    面向对象的设计原则
    javascript设计模式——适配器模式
    javascript设计模式——状态模式
    javascript设计模式——装饰者模式
    javascript设计模式——中介者模式
    javascript设计模式——职责链模式
    javascript设计模式——享元模式
  • 原文地址:https://www.cnblogs.com/jiashengmei/p/12859309.html
Copyright © 2011-2022 走看看