zoukankan      html  css  js  c++  java
  • 利用SparkSQL(java版)将离线数据或实时流数据写入hive的用法及坑点

    1. 通常利用SparkSQL将离线或实时流数据的SparkRDD数据写入Hive,一般有两种方法。第一种是利用org.apache.spark.sql.types.StructType和org.apache.spark.sql.types.DataTypes来映射拆分RDD的值;第二种方法是利用rdd和Java bean来反射的机制。下面对两种方法做代码举例

    2. 利用org.apache.spark.sql.types.StructType和org.apache.spark.sql.types.DataTypes来映射拆分RDD的值

    		JavaRDD<Row> resultRdd = rdd.map(new Function<String[], Row>() {
                        @Override
                        public Row call(String[] line) throws Exception {
                            if (line != null && line.length > 0) {
                                return helper.createRow(line);
                            }
                            return null;
                        }
                    });
    
      
            StructType structType = helper.createSchame();
            Dataset<Row> dataFrame = session.createDataFrame(resultRdd, structType);
            DataFrameWriter<Row> writer = dataFrame.coalesce(1).write().format(TableHelperInter.TABLE_FORMAT_TYPE).mode(SaveMode.Append);
            String tableName = hiveDataBaseName + "." + helper.getTableName();
            writer.insertInto(tableName);
    

    这种方法的有点是写入简单,不必去考虑字段映射有误,但缺点是需要去写一个TableHelperInter,而且这种方式对字段的类型要求严格,在做字段类型和字段校验时比对时一旦字段过多会及其复杂,所以不推崇这种写法

    3. 利用rdd和Java bean来反射

    来一个完整的程序

    public class SparkSQLTest {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("yarn").setAppName("SparkSQL_test");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            String line = "1102,jason,20,male,15927384023,developer,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20";
            String line2 = "1103,jason1,21,male,15927352023,developer1,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20";
    
            List<String> list = new ArrayList<String>();
            list.add(line);
            list.add(line2);
    
            JavaRDD<String> rdd = sc.parallelize(list);
            JavaRDD<Person> rddResult = rdd.map(new Function<String, Person>() {
                @Override
                public Person call(String s) throws Exception {
                    String[] message = s.split(",");
                    Person person = new Person();
                    person.setNo(message[0]);
                    person.setName(message[1]);
                    person.setAge(message[2]);
                    person.setGender(message[3]);
                    person.setPhone(message[4]);
                    person.setJob(message[5]);
                    person.setCol7(message[6]);
                    person.setCol8(message[7]);
                    person.setCol9(message[8]);
                    person.setCol10(message[9]);
                    person.setCol11(message[10]);
                    person.setCol12(message[11]);
                    person.setCol13(message[12]);
                    person.setCol14(message[13]);
                    person.setCol15(message[14]);
                    person.setCol16(message[15]);
                    person.setCol17(message[16]);
                    person.setCol18(message[17]);
                    person.setCol19(message[18]);
                    person.setCol20(message[19]);
                    person.setCreate_time_p(DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now()));
    
                    return person;
                }
            });
    
            //这行代码必须在实例SparkSession不然会出错
            SparkSession.clearDefaultSession();
            SparkSession session = SparkSession.builder()
                    .config("hive.metastore.uris", "localhost:9083")
                    .config("spark.sql.warehouse.dir", "/apps/hive/warehouse")
                    .config("hive.exec.dynamic.partition", true)
                    .config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
                    .config("hive.exec.dynamic.partition.mode", "nonstrict")
                    .enableHiveSupport()
                    .getOrCreate();
    
            Dataset dataset = session.createDataFrame(rddResult,Person.class);
            dataset.registerTempTable("person_temp_table");
            session.sql("insert into qwrenzixing.person_table20 partition (create_time_p="+DateTimeFormatter.ofPattern("yyyyMMdd")
                    .format(LocalDate.now())+") select no,name,age,gender,phone,job,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20 from person_temp_table");
        }
    }
    

    这种方法比较简洁,为了避免去做繁琐的字段比对和校验。可以将字段类型以string写入hive。同时通过SparkSession操作SQL的方法是spark2.0后的。这里是将dataset写成一张临时表,再将临时表的值查询出来insert into到hive表中。但将DataSet通过SparkSQL写成一张临时表的操作,Spark原生提供了四个关于这种操作API

    dataset.registerTempTable("temp_table");
    dataset.createGlobalTempView("temp_table");
    dataset.createOrReplaceTempView("temp_table");
    dataset.createTempView("temp_table");
    

    4. 关于这四个将DataSet写成一张临时表的作用和坑点

    1>. dataset.registerTempTable("temp_table")这个方法建议在离线,批处理中使用,在实时流式计算中会导致后续写入hive值与字段不匹配乱序的问题
    2>. dataset.createGlobalTempView("temp_table")这个方法是创建一个全局临时表,意思就是别的spark-submit也可以用,这种场景很少,而且无法用在实时流式计算中,因为创建一次表后不能再创建会包表已经存在的错误
    3>. dataset.createOrReplaceTempView("temp_table");这个其实比较好理解,如果存在就覆盖
    4>. dataset.createTempView("temp_table"); 这个方法当spark程序没有结束时不能重复创建

    这里的创建临时表在spark程序结束后临时表不存在,所以spark streaming程序要特别注意用法

    5. 关于Spark SQL的一个坑点

    在mysql中insert into有两种方式

    INSERT INTO table_name VALUES (value1, value2,....)
    
    INSERT INTO table_name (column1, column2,...) VALUES (value1, value2,....)
    

    要注意第二种写法在SparkSQL会报错,SparkSQL不支持这种写法,只支持第一种写法。这个是为什么其实也很好理解,每个人想法不一样。第一次使用要避免这个坑点

    最后附上我在利用SparkSQL将kafka数据写入hive的重要环节的代码:

    		String tableName = hiveDataBaseName + ".test_data";
            Dataset dataFrame = session.createDataFrame(resultRdd, SJGJEntity.class);
            // createOrReplaceTempView API方式将数据写入hive 不存在值与字段名错乱的问题
            dataFrame.createOrReplaceTempView("temp_table");
                 
            session.sql("insert into " + tableName + " partition(create_time_p=" + DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDate.now())
                        + ") select base_name,base_num,serviceCode,phoneno,called_phoneno,call_time,call_longth,lac,ci,xpoint,ypoint,imei,imsi,insert_time,call_address," + "source_table,mark_type,companyId,type,createKafkaTime from temp_table");
    
  • 相关阅读:
    IIS 7的 MIME设置自定义下载文件
    jsp用js写时间
    CSS3 必须要知道的10 个顶级命令
    前端必读:浏览器内部工作原理
    文件下载的后台代码
    cookies的简单使用 客户端保存临时数据和上传文件的就Query.uploadify(2.xx版本)的使用
    Myeclipse 如何解决反应慢的问题
    时间插件,validate验证的简单jsp例子
    索引的一些总结
    jQuery选择器中含有空格和没有空格的区别
  • 原文地址:https://www.cnblogs.com/jiashengmei/p/11045887.html
Copyright © 2011-2022 走看看