zoukankan      html  css  js  c++  java
  • Spark常用算子-value数据类型的算子

    package com.test;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    
    /**
     * Value数据类型的Transformation算子
     * @author FengZhen
     *
     */
    public class SparkValue {
    	
    	public static void main(String[] args){
    		//SparkConf conf = new SparkConf().setAppName(SparkValue.class.getName()).setMaster("local[2]");
    	    SparkConf conf = new SparkConf().setAppName(SparkValueTest.class.getName());
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		// 数据
    		JavaRDD<String> ds = sc.textFile("hdfs://bjqt/data/labeldata/datalabel.csv");
    		/**
    		 * 一、输入分区与输出分区一对一型
    		 * 1、map算子
    		 * 2、flatMap算子
    		 * 3、mapPartitions算子
    		 * 4、glom算子
    		 */
    		
    		/**
    		 * 1.map算子
    		 * 将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。
    		 * 源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。
    		 */
    		JavaRDD<String> nameRDD = ds.map(new Function<String, String>() {
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public String call(String v1) throws Exception {
    				String[] values = v1.split(",");
    				return values[0];
    			}
    		});
    		List<String> nameList = nameRDD.collect();
    		System.out.println(nameList);
    		
    		/**
    		 * 2.flatMap算子
    		 * 将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,
    		 * 内部创建 FlatMappedRDD(this,sc.clean(f))。
    		 */
    		JavaRDD<String> flatMapRDD = ds.flatMap(new FlatMapFunction<String, String>() {
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Iterator<String> call(String t) throws Exception {
    				t = t.replace("‘", "");
    				String[] values = t.split(",");
    				List<String> result = Arrays.asList(values);
    				return result.iterator();
    			}
    		});
    		List<String> flatMapList = flatMapRDD.collect();
    		System.out.println(flatMapList);
    		
    		/**
    		 * 3.mapPartitions算子
    		 * mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 行 操 作。 
    		 * 内 部 实 现 是 生 成MapPartitionsRDD
    		 * 做过滤
    		 */
    		JavaRDD<String> mapPartitionsRDD = nameRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Iterator<String> call(Iterator<String> t) throws Exception {
    				List<String> nameList = new ArrayList<String>();
    				while (t.hasNext()) {
    					String name = (String) t.next();
    					if (name.startsWith("丁")) {
    						nameList.add(name);
    					}
    				}
    				return nameList.iterator();
    			}
    		});
    		List<String> mapPartitionsList = mapPartitionsRDD.collect();
    		System.out.println(mapPartitionsList);
    		
    		/**
    		 * 4.glom算子
    		 * glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。
    		 */
    		JavaRDD<List<String>> glomRDD = nameRDD.glom();
    		List<List<String>> glomList = glomRDD.collect();
    		System.out.println(glomList);
    		
    		/**
    		 * 二、输入分区与输出分区多对一型 
    		 * 5.union算子
    		 * 6.cartesian算子
    		 */
    		/**
    		 * 5.union算子
    		 *  使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,
    		 *  并不进行去重操作,保存所有元素。如果想去重可以使用 distinct()。
    		 *  同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。
    		 */
    		JavaRDD<String> mapPartitionsRDD1 = nameRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Iterator<String> call(Iterator<String> t) throws Exception {
    				List<String> nameList = new ArrayList<String>();
    				while (t.hasNext()) {
    					String name = (String) t.next();
    					if (name.startsWith("王")) {
    						nameList.add(name);
    					}
    				}
    				return nameList.iterator();
    			}
    		});
    		JavaRDD<String> unionRDD = mapPartitionsRDD.union(mapPartitionsRDD1);
    		List<String> unionList = unionRDD.collect();
    		System.out.println(unionList);
    		
    		/**
    		 * 6.cartesian算子
    		 * 对两个RDD内 的 所 有 元 素 进 行 笛 卡 尔 积 操作。 操 作 后, 内 部 实 现 返 回CartesianRDD。
    		 */
    		 JavaPairRDD<String, String> cartesianRDD = mapPartitionsRDD.cartesian(mapPartitionsRDD1);
    		 Map<String, String> cartesianMap = cartesianRDD.collectAsMap();
    		 System.out.println(cartesianMap);
    		 
    		 
    		 /**
    		  * 三、输入分区与输出分区多对多型
    		  * 7、grouBy算子
    		  */
    		 /**
    		  * 7.grouBy算子
    		  * groupBy :将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
      			函数实现如下:
      				1)将用户函数预处理:
      				val cleanF = sc.clean(f)
      				2)对数据 map 进行函数操作,最后再进行 groupByKey 分组操作。
         			this.map(t => (cleanF(t), t)).groupByKey(p)
      				其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。
    		  */
    		 JavaPairRDD<String, Iterable<String>> groupRDD = nameRDD.groupBy(new Function<String, String>() {
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public String call(String v1) throws Exception {
    				return v1.substring(0, 1);
    			}
    		 });
    		 Map<String, Iterable<String>> groupMap = groupRDD.collectAsMap();
    		 System.out.println(groupMap);
    		 
    		 /**
    		  * 四、输出分区为输入分区子集型
    		  * 8、filter算子
    		  * 9、distinct算子
    		  * 10、subtract算子
    		  * 11、sample算子
    		  * 12、takeSample算子
    		  */
    		 
    		 /**
    		  * 8.filter算子
    		  * filter 函数功能是对元素进行过滤,对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉。 
    		  * 内 部 实 现 相 当 于 生 成 FilteredRDD(this,sc.clean(f))。
    		  * 下面代码为函数的本质实现:
    		  * deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
    		  */
    		 JavaRDD<String> filterRDD = nameRDD.filter(new Function<String, Boolean>() {
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Boolean call(String v1) throws Exception {
    				if (v1.contains("二")) {
    					return true;
    				}
    				return false;
    			}
    		 });
    		 List<String> filterMap = filterRDD.collect();
    		 System.out.println(filterMap);
    		 
    		 /**
    		  * 9.distinct算子
    		  * distinct将RDD中的元素进行去重操作
    		  */
    		 JavaRDD<String> repeatRDD = filterRDD.union(filterRDD);
    		 List<String> repeatMap = repeatRDD.collect();
    		 System.out.println(repeatMap);
    		 JavaRDD<String> distinctRDD = repeatRDD.distinct();
    		 List<String> distinctMap = distinctRDD.collect();
    		 System.out.println(distinctMap);
    		 
    		 /**
    		  * 10.subtract算子
    		  * subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素
    		  */
    		 JavaRDD<String> subRDD1 = nameRDD.filter(new Function<String, Boolean>() {
    			private static final long serialVersionUID = 1L;
    
    			@Override
    			public Boolean call(String v1) throws Exception {
    				if (v1.contains("丁") || v1.contains("齐")) {
    					return true;
    				}
    				return false;
    			}
    		 });
    		 JavaRDD<String> subRDD2 = nameRDD.filter(new Function<String, Boolean>() {
    			private static final long serialVersionUID = 1L;
    
    				@Override
    				public Boolean call(String v1) throws Exception {
    					if (v1.contains("丁")) {
    						return true;
    					}
    					return false;
    				}
    			 });
    		 JavaRDD<String> subtractRDD = subRDD1.subtract(subRDD2);
    		 List<String> subtractList = subtractRDD.collect();
    		 System.out.println(subtractList);
    		 
    		 /**
    		  * 11.sample算子
    		  *  sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。
    		  *  用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。
    		  *  内部实现是生成 SampledRDD(withReplacement, fraction, seed)。
    		  *  函数参数设置:
    		  *  ‰   withReplacement=true,表示有放回的抽样。
    		  *  ‰   withReplacement=false,表示无放回的抽样。
    		  */
    		 JavaRDD<String> sampleRDD = nameRDD.sample(false, 0.01, 5);
    		 List<String> sampleList = sampleRDD.collect();
    		 System.out.println(sampleList);
    		 
    		 /**
    		  * 12.takeSample算子
    		  * takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,
    		  * 而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行Collect(),
    		  * 返回结果的集合为单机的数组。
    		  */
    		 List<String> takeSampleList = nameRDD.takeSample(false, 1, 5);
    		 System.out.println(takeSampleList);
    		
    		 sc.close();
    	}
    }
    

      

  • 相关阅读:
    java将汉语转换为拼音工具类
    Maven 参数说明
    tcp socket的backlog参数
    Java 运行中jar包冲突,定位使用哪个jar包
    Java CMS GC
    数据仓库
    compareTo
    java程序性能分析之thread dump和heap dump
    npm 与 package.json 快速入门教程
    基本 Java Bean
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/9254566.html
Copyright © 2011-2022 走看看