zoukankan      html  css  js  c++  java
  • spark数据源操作

    Spark应用的数据源:

    1)Driver驱动中的一个集合(parallelizePairs  parallelize)

    2)从本地(file:///d:/test)或者网络(file:///hdfs:localhost:7777)存上获取

        textFile textWholeFiles

    3)流式数据源:Socket (socketTextStream)

    一、Spark封装的格式:

    1、普通文件

    2、JSON

    3、CSV

    如果CSV的所有数据字段均没有包含换行符,可以使用 textFile() 读取并解析数据,如果在字段中嵌有换行符,就需要用wholeTextFiles()完整读入每个文件,然后解析各段.

    由于在 CSV 中我们不会在每条记录中输出字段名,因此为了使输出保持一致,需要 创建一种映射关系。一种简单做法是写一个函数,用于将各字段转为指定顺序的数组。

    4、sequence file  二进制形式 键值对

    5、object file  JDK 序列化(看起来是对sequenceFile进行了简单封装,他允许存储只包含值的RDD,和sequenceFile不一样的是,对象文件是java序列化写出的,读取的对象不能改变(输出会依赖对象))

    普通文件file

    import java.io.Serializable;
    import java.io.StringReader;
    import java.util.ArrayList;
    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    
    import scala.Tuple2;
    import au.com.bytecode.opencsv.CSVReader;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    public class SparkIO_File {
        public static void main(String[] args) {
        	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		sc.setLogLevel("WARN");
    		fileTest(sc);
    		sc.stop();
    		sc.close();
    	}
    
        static void fileTest(JavaSparkContext sc){
        	//每行都是rdd
    //    	JavaRDD<String> rdd = sc.textFile("file:///E:/codes2016/workspace/Spark1/src/spark1106_StreamSpark/UpdateStateByKeyDemo.java");
    		//wholeTextFiles返回一个键值对类型,键为文件全路径,值为文件内容,分区数是2
        	
     
        	JavaPairRDD<String, String> rdd = sc.wholeTextFiles("file:///E:/codes2016/workspace/Spark1/src/spark1106_StreamSpark");
           	System.out.println("分区数:"+rdd.getNumPartitions());          //分区数为2
        	rdd.foreach(x->{
    			System.out.println("当前元素:" + x);
    		});
    		System.out.println(rdd.count());
    		rdd.saveAsTextFile("file:///d:/jsontext/filewholetext");
        }
    }
    

    json文件

    import java.io.Serializable;
    import java.io.StringReader;
    import java.util.ArrayList;
    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    
    import scala.Tuple2;
    import au.com.bytecode.opencsv.CSVReader;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    public class SparkIO_JSON {
        public static void main(String[] args) {
        	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		sc.setLogLevel("WARN");
    		writeJsonTest(sc);
    		sc.stop();
    		sc.close();
    	}
    
        //读JSON
        static void readJsonTest(JavaSparkContext sc){
        	//如果json文件中断了行就读不出来了,没截断的部分任然会显示
        	JavaRDD<String> input = sc.textFile("file:///d:/jsontext/jsonsong.json");
        	//使用wholetextfile就不会有断行的错误,因为读的是整个文件 
    //    	JavaRDD<String> input = sc.wholeTextFiles("file:///d:/jsontext/jsonsong.json");
    //    	JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson());
        	JavaRDD<Mp3Info> result = input.map(x->{
        		ObjectMapper mapper=new ObjectMapper();
        		return mapper.readValue(x, Mp3Info.class);
        	});
        	result.foreach(x->System.out.println(x));
        }
        //写JSON
        static void writeJsonTest(JavaSparkContext sc){
        	JavaRDD<String> input = sc.textFile("file:///d:/jsontext/jsonsong.json");
        	JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson()).
        			                      filter(
        			                          x->x.getAlbum().equals("怀旧专辑")
        			                      );
    //    	JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
        	JavaRDD<String> formatted = result.map(x->{
        		ObjectMapper mapper=new ObjectMapper();
        		return mapper.writeValueAsString(x);
        	});
        	result.foreach(x->System.out.println(x));
        	formatted.saveAsTextFile("file:///d:/jsontext/jsonsongout");
        }
    }
    
    class ParseJson implements FlatMapFunction<Iterator<String>, Mp3Info>, Serializable {
    	public Iterator<Mp3Info> call(Iterator<String> lines) throws Exception {
    		ArrayList<Mp3Info> people = new ArrayList<Mp3Info>();
    		ObjectMapper mapper = new ObjectMapper();
    		while (lines.hasNext()) {
    			String line = lines.next();
    			try {
    				people.add(mapper.readValue(line, Mp3Info.class));
    			} catch (Exception e) {
    			    e.printStackTrace();
    			}
    		}
    		return people.iterator();
    	}
    }
    
    class WriteJson implements FlatMapFunction<Iterator<Mp3Info>, String> {
    	public Iterator<String> call(Iterator<Mp3Info> song) throws Exception {
    		ArrayList<String> text = new ArrayList<String>();
    		ObjectMapper mapper = new ObjectMapper();
    		while (song.hasNext()) {
    			Mp3Info person = song.next();
    		    text.add(mapper.writeValueAsString(person));
    		}
    		return text.iterator();
    	}
    }
    
    class Mp3Info implements Serializable{
    	/*
    {"name":"上海滩","singer":"叶丽仪","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"}
    {"name":"一生何求","singer":"陈百强","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"}
    {"name":"红日","singer":"李克勤","album":"怀旧专辑","path":"mp3/shanghaitan.mp3"}
    {"name":"爱如潮水","singer":"张信哲","album":"怀旧专辑","path":"mp3/airucaoshun.mp3"}
    {"name":"红茶馆","singer":"陈惠嫻","album":"怀旧专辑","path":"mp3/redteabar.mp3"}
    	 */
    	private String name;
        private String album;
        private String path;
        private String singer;
    
        public String getSinger() {
    		return singer;
    	}
    	public void setSinger(String singer) {
    		this.singer = singer;
    	}    
        public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public String getAlbum() {
    		return album;
    	}
    	public void setAlbum(String album) {
    		this.album = album;
    	}
    	public String getPath() {
    		return path;
    	}
    	public void setPath(String path) {
    		this.path = path;
    	}
        @Override
    	public String toString() {
    		return "Mp3Info [name=" + name + ", album=" 
    	             + album + ", path=" + path + ", singer=" + singer + "]";
    	}
    }
    
    /*
    {"name":"上海滩","singer":"叶丽仪","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"}
    {"name":"一生何求","singer":"陈百强","album":"香港电视剧主题歌","path":"mp3/shanghaitan.mp3"}
    {"name":"红日","singer":"李克勤","album":"怀旧专辑","path":"mp3/shanghaitan.mp3"}
    {"name":"爱如潮水","singer":"张信哲","album":"怀旧专辑","path":"mp3/airucaoshun.mp3"}
    {"name":"红茶馆","singer":"陈惠嫻","album":"怀旧专辑","path":"mp3/redteabar.mp3"}
     */

    csv文件

    import java.io.StringReader;
    import java.io.StringWriter;
    import java.util.Arrays;
    import java.util.Iterator;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    
    import scala.Tuple2;
    import au.com.bytecode.opencsv.CSVReader;
    import au.com.bytecode.opencsv.CSVWriter;
    
    public class SparkIO_CSV {
        public static void main(String[] args) {
        	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		sc.setLogLevel("WARN");
    		readCsv2(sc);
    		sc.stop();
    		sc.close();
    	}
    
        static void readCsv1(JavaSparkContext sc) {
        	JavaRDD<String> csvFile1 = sc.textFile("file:///d:/jsontext/csvsong.csv");
    //    	csvFile1.foreach(x->System.out.println(x));
        	JavaRDD<String[]> csvData = csvFile1.map(new ParseLine());
        	csvData.foreach(x->{
    	    	                  for(String s : x){
    	    	                      System.out.println(s);
    	    	                  }
        	                   }
        	               );
        }
    
        static void writeCsv1(JavaSparkContext sc) {
        	JavaRDD<String> csvFile1 = sc.textFile("file:///d:/jsontext/csvsong.csv");
        	JavaRDD<String[]> parsedData = csvFile1.map(new ParseLine());
        	parsedData = parsedData.filter(x->x[2].equals("怀旧专辑"));  //过滤   如果在这里存文件的话,存的是数组类型的对象
        	parsedData.foreach(
        			            x->{
        			                long id = Thread.currentThread().getId();
    					            System.out.println("在线程 "+ id +" 中" + "打印当前数据元素:");
    					            for(String s : x){
    					                System.out.print(s+ " ");
    					            }
    					            System.out.println();
                                }
                            );
        	parsedData.map(x->{
        		 StringWriter stringWriter = new StringWriter();
        		 CSVWriter csvWriter = new CSVWriter(stringWriter);
        		 csvWriter.writeNext(x);  //把数组转换成为CSV的格式
        		 csvWriter.close();
        		 return stringWriter.toString();
        	}).saveAsTextFile("file:///d:/jsontext/csvout");
        }
        
        public static class ParseLine implements Function<String, String[]> {
       	    public String[] call(String line) throws Exception {
    	    	 CSVReader reader = new CSVReader(new StringReader(line));
    	    	 String[] lineData = reader.readNext();
    	    	 reader.close();    //关闭流资源
    //   	    	String[] lineData =line.split(","); //这样还有
       	    	return lineData;
       	    }
        }
    
        static void readCsv2(JavaSparkContext sc){
        	//如果文件中有断行,wholetextfile可以跳行
        	   JavaPairRDD<String, String> csvData = sc.wholeTextFiles("d:/jsontext/csvsong.csv");
        	   JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLineWhole());
        	   keyedRDD.foreach(x->
    				    	       {
    					               for(String s : x){
    					                   System.out.println(s);
    					               }
    				               }
                               );
        }
        public static class ParseLineWhole implements FlatMapFunction<Tuple2<String, String>, String[]> {
    	    public Iterator<String[]> call(Tuple2<String, String> file) throws Exception {
    		    CSVReader reader = new CSVReader(new StringReader(file._2()));
    		    Iterator<String[]> data = reader.readAll().iterator();
    		    reader.close();
    		    return data;
            }
        }
    }
    
    /*
    "上海滩","叶丽仪","香港电视剧主题歌","mp3/shanghaitan.mp3"
    "一生何求","陈百强","香港电视剧主题歌","mp3/shanghaitan.mp3"
    "红日","李克勤","怀旧专辑","mp3/shanghaitan.mp3"
    "爱如潮水","张信哲","怀旧专辑","mp3/airucaoshun.mp3"
    "红茶馆","陈惠嫻","怀旧专辑","mp3/redteabar.mp3"   
     */

    seq二进制文件

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.serializer.KryoSerializer;
    
    import scala.Tuple2;
    
    public class SparkIO_SeqFile {
        public static void main(String[] args) {
        	//多线程,开了两个线程
        	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO")
        			.set("spark.testing.memory", "2147480000")
        			.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		sc.setLogLevel("WARN");
    		//sequenceFile存取的是键值对,是序列化文本文件(将对象转换为二进制形式)
    		writeSeqFile(sc);
    		readSeqFile(sc);
    		sc.stop();
    		sc.close();
    	}
        
    	private static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
    	    public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
    	        return new Tuple2<String, Integer>(record._1.toString(), record._2.get());
    	    }
    	}
    	private static void writeSeqFile(JavaSparkContext sc) {
    	    List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>();
    	    data.add(new Tuple2<String, Integer>("ABC", 1));
    	    data.add(new Tuple2<String, Integer>("DEF", 3));
    	    data.add(new Tuple2<String, Integer>("GHI", 2));
    	    data.add(new Tuple2<String, Integer>("JKL", 4));
    	    data.add(new Tuple2<String, Integer>("ABC", 1));
    
    //	    JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(("d",1)),1);
    	    
    	    //设置分区数,有多少个分区数就有多少个输出文件
    	    JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data, 1);
    	    String dir = "file:///D:jsontext/sequenceFile";
    	    //sequenceFile将键值对使用maptoPair装换为文本类型的键值对
    	    JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
    	    //四个参数,文件名,输出键值对的类型,输出格式          saveAsNewAPIHadoopFile是新接口
    	    result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class, SequenceFileOutputFormat.class);
    	    
    	}
    
    	static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
    		public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
    		    return new Tuple2<Text, IntWritable>(new Text(record._1), new IntWritable(record._2));
    	    }
        }
    	private static void readSeqFile(JavaSparkContext sc) {
    		
    		//读取sequenceFile文件,输出到PairRDD,三个参数,文件名,输入键值对类型
    		JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(
    					                               "file:///D:/jsontext/sequenceFile", 
    					                               Text.class,
    					                               IntWritable.class);
    //		input.foreach(System.out::println);
    		//调用mapToPair将文件的键值对装换为string的键值对类型,输出
    	    JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
    	    result.foreach(x->System.out.println(x));
    	}
    
    	
    
    }
    

    object文件

    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    public class SparkIO_ObjFile {
        public static void main(String[] args) {
        	SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkIO").set("spark.testing.memory", "2147480000");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		sc.setLogLevel("WARN");		
    		writeObjFile(sc);
    		//文件所读取的对象是person对象,输出的形式为person对象,所以如果没有了person对象,foreach输出将会报错
    		readObjFile(sc);
    		sc.stop();
    		sc.close();
    	}
    
    	private static void readObjFile(JavaSparkContext sc) {
    		//object二进制文件读取为rdd
    		JavaRDD<Object> input = sc.objectFile("file:///D:/jsontext/objFile");
    		
    		//输出object文件时自动读取引用的person对象,如果person对象不存在,将会报错,终止操作
    		input.foreach(x->System.out.println(x));
    	}
        
    	private static void writeObjFile(JavaSparkContext sc) {
    	    List<Person> data = new ArrayList<Person>();
    	    data.add(new Person("ABC", 1));
    	    data.add(new Person("DEF", 3));
    	    data.add(new Person("GHI", 2));
    	    data.add(new Person("JKL", 4));
    	    data.add(new Person("ABC", 1));
    
    	    //设置分区数,多少个分区数有多少个个输出文件
    	    JavaRDD<Person> rdd = sc.parallelize(data, 2);
    	    //将文件保存为textFile类型,输出为文本文件,可见的文本为tostring方法
    	    String dir = "file:///D:/jsontext/textFile";
            rdd.saveAsTextFile(dir);  
            
            //输出为objectFile类型,为二进制文件,此文件保存的是对象的类型和值,类型为文本类型,值为二进制类型,使用saveAsObject方法存到文件
            //objectFile存储只包含值的rdd
    	    String dir1 = "file:///D:/jsontext/objFile";
            rdd.saveAsObjectFile(dir1);
    	}
    	
    	static class Person implements Serializable{
    		public Person(String name, int id) {
    			super();
    			this.name = name;
    			this.id = id;
    		}
    		@Override
    		public String toString() {
    			return "Person [name=" + name + ", id=" + id + "]";
    		}
    		String name;
    		int id;
    	}
    }

    二、Hadoop支持格式

        1、例如:KeyValueTextInputFormat 是最简单的 Hadoop 输入格式之一,可以用于从文本文件中读取 键值对数据。每一行都会被独立处理,键和值之间用制表符隔开。 

    newAPIHadoopFile/saveAsNewAPIHadoopFile

        2、非文件系统数据(HBase/MongoDB)

    使用newAPIHadoopDataset/saveAsNewAPIHadoopDataset

        3、Protocol buffer(简称 PB,https://github.com/google/protobuf

    三、文件压缩

    四、文件系统

    1、本地文件系统

    file:///D:/sequenceFile

    file:///home/sequenceFile

    Spark 支持从本地文件系统中读取文件,不过它要求文件在集群中所有节点的相同路径下 都可以找到。

    一些像 NFS、AFS 以及 MapR 的 NFS layer 这样的网络文件系统会把文件以本地文件系统 的形式暴露给用户。如果你的数据已经在这些系统中,那么你只需要指定输入为一个 file:// 路径;只要这个文件系统挂载在每个节点的同一个路径下,Spark 就会自动处理(如例 5-29 所示)。如果文件还没有放在集群中的所有节点上,你可以在驱动器程序中从本地读取该文件而无 需使用整个集群,然后再调用 parallelize 将内容分发给工作节点。不过这种方式可能会 比较慢,所以推荐的方法是将文件先放到像 HDFS、NFS、S3 等共享文件系统上。

    2、 网络文件系统

    file:///hdfs:localhost:7088/ sequenceFile

    五、数据库

    1、JDBC

    2、Cassandra

    3、HBase

    4、Elasticsearch

  • 相关阅读:
    一网友推荐的书:框架设计(第2版):CLR Via C#
    线程与WinForm设计,防冻结,卡住窗体
    通用数据库操作辅助类DbHelper
    高级着色器语言(High Level Shader Language,简称HLSL)
    无法将 匿名方法 转换为类型“System.Delegate”,因为它不是委托类型
    根据RGB,计算灰度值
    温故而知新:WinForm/Silverlight多线程编程中如何更新UI控件的值
    [原创视频]PHP在netbeans中的简单使用
    自己封装的ASP.NET的SQLITE数据库的操作类
    《JavaScript征途》读后感
  • 原文地址:https://www.cnblogs.com/apppointint/p/8885296.html
Copyright © 2011-2022 走看看