zoukankan      html  css  js  c++  java
  • spark保存读取csv SequenceFile

    前言

    Spark读取和保存文件格式是非常多的,json,csv,haoop SequenceFile ,hbase等等。本文就是简单的spark读取文件

    spark 读写csv

    使用opencsv jar包读取,先在maven配置。
    读取方式因逐行读取、以单个文件为key读取整个文件,代码实现略有不同

    逐行读取

    package com.learn.hadoop.spark.doc.analysis.chpater.datasave;
    
    import com.opencsv.CSVReader;
    import com.opencsv.CSVWriter;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    
    import java.io.StringReader;
    import java.io.StringWriter;
    import java.util.Arrays;
    
    public class DataSaveTest02Csv {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("local").setAppName("DataSaveTest02");
            JavaSparkContext sc  = new JavaSparkContext(conf);
            String inputfile = "D:\my\code\github\learncode\spark\src\main\resources\res\sparksave\";
            //读取文件或者文件夹下所有文件,以每行记录读取
            JavaRDD<String> rdd = sc.textFile(inputfile);
            //打印读取的内容,打印出来的是所有行的string
            System.out.println(rdd.collect().toString());
            //JavaRDD<String[]> csvData =rdd.map(new ParseLine2());
            JavaRDD<String[]> csvData =rdd.map(new Function<String, String[]>() {
                @Override
                public String[] call(String s) throws Exception {
                    CSVReader reader = new CSVReader(new StringReader(s));
                    return reader.readNext();
                }
            });
            //输出csv 每行的数组
            csvData.foreach(f-> System.out.println(Arrays.asList(f).toString()));
    
            //test write
            String outfile ="C:\Users\juncai\Desktop\out";
            //创建一个JavaRDD<String []>,直接就赋值
            JavaRDD<String []> outrdd =csvData;
            //一行一行的去存
            outrdd.map(new Function<String[],String>(){
                @Override
                public String call(String[] strings) throws Exception {
                    StringWriter stringWriter = new StringWriter();
                    CSVWriter csvWriter = new CSVWriter(stringWriter);
                    csvWriter.writeNext(strings);
                    return stringWriter.toString();
                }
            }).saveAsTextFile(outfile);
        }
    }
    /*
    目录下两个文件,相同的内容
        1,jack,male,29
        2,linda,female,29
    
    输出
        [1,jack,male,29, 2,linda,female,29, 1,jack,male,29, 2,linda,female,29]
        [1, jack, male, 29]
        [2, linda, female, 29]
        [1, jack, male, 29]
        [2, linda, female, 29]
     */
    
    

    单个文件为key读取整个文件

    textFiles与wholeTextFiles方法区别就是,wholeTextFiles文件为key读取整个文件,是键值对的输出。
    可以看下输出读取文件的时候的输出的差别

    package com.learn.hadoop.spark.doc.analysis.chpater.datasave;
    
    import com.opencsv.CSVReader;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import scala.Tuple2;
    
    import java.io.StringReader;
    import java.util.Arrays;
    import java.util.Iterator;
    
    /**
     * 测试spark 数据保存和读取
     * 读取csv文件
     */
    public class DataSaveTest01Csv {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("local").setAppName("DataSaveTest01");
            JavaSparkContext sc  = new JavaSparkContext(conf);
            //wholeTextFiles输出整个目录的文件,每个文件就是一个记录,不是按照行读取
            String inputfile = "D:\my\code\github\learncode\spark\src\main\resources\res\sparksave\";
            //读取文件或者目录下的数据,以文件为一个单独的记录读取
            JavaPairRDD<String,String> csvData =sc.wholeTextFiles(inputfile);
            //打印读取的文件与文件内容键值对
            System.out.println(csvData.collect());
            //JavaRDD<String []>keyedRdd =csvData.flatMap(new ParseLine());
            //只输出文件中的内容,不作其他的处理
            JavaRDD<String []>keyedRdd =csvData.flatMap(new FlatMapFunction<Tuple2<String, String>, String[]>() {
                @Override
                public Iterator<String[]> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                    CSVReader reader = new CSVReader(new StringReader(stringStringTuple2._2));
                    return reader.readAll().iterator();
                }
            });
            //keyedRdd.foreach(x -> System.out.println(x);输出的是对象
            keyedRdd.foreach(x -> System.out.println(Arrays.asList(x).toString()));
        }
    
    }
    /*
    目录下两个文件,相同的内容
        1,jack,male,29
        2,linda,female,29
    
    输出
        [(file:/D:/my/code/github/learncode/spark/src/main/resources/res/sparksave/datasave - 副本.csv,1,jack,male,29
        2,linda,female,29
        ), (file:/D:/my/code/github/learncode/spark/src/main/resources/res/sparksave/datasave.csv,1,jack,male,29
        2,linda,female,29
        )]
        [1, jack, male, 29]
        [2, linda, female, 29]
        [1, jack, male, 29]
        [2, linda, female, 29]
     */
    
    

    Sequence的读写

    SequenceFile 是由没有相对关系结构的键值对文件组成的常用 Hadoop 格式。SequenceFile 也是Hadoop MapReduce 作业中常用的输入输出格式,
    所以如果你在使用一个已有的 Hadoop 系统,数据很有可能是以 SequenceFile 的格式供你使用的。由于 Hadoop 使用了一套自定义的序列化框架,
    因此 SequenceFile 是由实现 Hadoop 的 Writable接口的元素组成。

    package com.learn.hadoop.spark.doc.analysis.chpater.datasave;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.SequenceFileAsBinaryOutputFormat;
    import org.apache.hadoop.mapred.SequenceFileOutputFormat;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    
    import java.util.Arrays;
    
    public class DataSaveTest03Sequence {
        public static void main(String[] args) {
            SparkConf conf =new SparkConf().setMaster("local").setAppName("DataSaveTest03Sequence");
            JavaSparkContext sc = new JavaSparkContext(conf);
            //序列化键值对
            JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("string one",1),
                    new Tuple2<String,Integer>("string two",2)),1);
    
            //返回SequenceFile所支持的格式的键值对
            JavaPairRDD<Text,IntWritable>  result = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, Text, IntWritable>() {
                @Override
                public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                    return new Tuple2<Text, IntWritable>(new Text(stringIntegerTuple2._1),new IntWritable(stringIntegerTuple2._2));
                }
            });
            //输出键值对
            result.saveAsHadoopFile("C:\Users\juncai\Desktop\out", Text.class,
                    IntWritable.class, SequenceFileOutputFormat.class);
    
            //test read
            String filepath = "D:\my\code\github\learncode\spark\src\main\resources\res\saprksavesequence\part-00000";
            //SequenceFile是键值对的hadoop文件
            //直接读取hadoop文件,转化为hadoop键值对
            JavaPairRDD<Text,IntWritable> input = sc.sequenceFile(filepath,Text.class,IntWritable.class,1);
            input.foreach(f-> System.out.println(f.toString()));
            //转为普通的键值对。maiToPair是键值对转换函数
            JavaPairRDD<String ,Integer> outRdd = input.mapToPair(new PairFunction<Tuple2<Text, IntWritable>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> textIntWritableTuple2) throws Exception {
                    return new Tuple2<String, Integer>(textIntWritableTuple2._1.toString(),textIntWritableTuple2._2.get());
                }
            });
            outRdd.foreach(f-> System.out.println(f.toString()));
        }
    }
    
  • 相关阅读:
    第十六届全国大学智能汽车竞赛竞速比赛规则-讨论稿
    从0到1设计一台8bit计算机
    在 CentOS7 上安装 MongoDB
    sea.js五分钟上手
    自动调试自动编译五分钟上手
    自动调试自动编译五分钟上手
    在react底下安装环境
    在react底下安装环境
    推荐几款好用的云笔记软件
    推荐几款好用的云笔记软件
  • 原文地址:https://www.cnblogs.com/JuncaiF/p/12460813.html
Copyright © 2011-2022 走看看