zoukankan      html  css  js  c++  java
  • (八)map,filter,flatMap算子-Java&Python版Spark

    mapfilterflatMap算子

    视频教程:

    1、优酷

    2、YouTube

    1、map

    map是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD

     

    java:

     1 package com.bean.spark.trans;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 import org.apache.spark.SparkConf;
     6 import org.apache.spark.api.java.JavaRDD;
     7 import org.apache.spark.api.java.JavaSparkContext;
     8 import org.apache.spark.api.java.function.Function;
     9 /**
    10  * 
    11  * @author RedBean
    12  *map
    13  */
    14 public class TraMap {
    15     public static void main(String[] args) {
    16         SparkConf conf = new SparkConf();
    17         conf.setMaster("local");
    18         conf.setAppName("map");
    19         System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6");
    20         JavaSparkContext sc = new JavaSparkContext(conf);
    21         List<Integer> number = Arrays.asList(0,1,2,3,4,5,6,7,8,9);
    22         JavaRDD<Integer> numberRDD = sc.parallelize(number);
    23         JavaRDD<Integer> results = numberRDD.map(new Function<Integer, Integer>() {
    24             @Override
    25             public Integer call(Integer s) throws Exception {
    26                 // TODO Auto-generated method stub
    27                 return s * 5;
    28             }
    29         });
    30         System.out.println(results.collect());
    31     }
    32 }

    python:

     1 # -*- coding:utf-8 -*-
     2 
     3 
     4 from __future__ import print_function
     5 from pyspark import SparkConf
     6 from pyspark import SparkContext
     7 import os
     8 
     9 if __name__ == '__main__':
    10     os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6'
    11     conf = SparkConf().setAppName('mapTest').setMaster('local')
    12     sc = SparkContext(conf=conf)
    13     data = sc.parallelize([1,2,3,4,5,6])
    14     def myMap(l):
    15         return l * 5
    16     print(data.map(myMap).collect())

     

    2、filter

    返回一个新的数据集,由经过func函数后返回值为true的原元素组成

    java:

     1 package com.bean.spark.trans;
     2 
     3 import java.util.Arrays;
     4 import java.util.List;
     5 
     6 import org.apache.spark.SparkConf;
     7 import org.apache.spark.api.java.JavaRDD;
     8 import org.apache.spark.api.java.JavaSparkContext;
     9 import org.apache.spark.api.java.function.Function;
    10 
    11 public class TraFilter {
    12     public static void main(String[] args) {
    13         SparkConf conf = new SparkConf();
    14         conf.setMaster("local");
    15         conf.setAppName("filter");
    16         System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6");
    17         JavaSparkContext sc = new JavaSparkContext(conf);
    18         List<Integer> number = Arrays.asList(0,1,2,3,4,5,6,7,8,9);
    19         JavaRDD<Integer> numberRDD = sc.parallelize(number);
    20         JavaRDD<Integer> results = numberRDD.filter(new Function<Integer, Boolean>() {
    21             
    22             @Override
    23             public Boolean call(Integer s) throws Exception {
    24                 // TODO Auto-generated method stub
    25                 return s % 2 == 0;
    26             }
    27         });
    28         System.out.println(results.collect());
    29     }
    30 }

    python:

     1 # -*- coding:utf-8 -*-
     2 
     3 
     4 from __future__ import print_function
     5 from pyspark import SparkConf
     6 from pyspark import SparkContext
     7 import os
     8 
     9 if __name__ == '__main__':
    10     os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6'
    11     conf = SparkConf().setAppName('filterTest').setMaster('local')
    12     sc = SparkContext(conf=conf)
    13     data = sc.parallelize([1,2,3,4,5,6])
    14     def filterFun(l):
    15         return l > 2
    16     print(data.filter(filterFun).collect())

     

    3、flatMap

    将一条 rdd数据使用你定义的函数给分解成多条 rdd数据。

    java:

     1 package com.bean.spark.trans;
     2 
     3 import java.util.Arrays;
     4 import java.util.Iterator;
     5 
     6 import org.apache.spark.SparkConf;
     7 import org.apache.spark.api.java.JavaRDD;
     8 import org.apache.spark.api.java.JavaSparkContext;
     9 import org.apache.spark.api.java.function.FlatMapFunction;
    10 
    11 public class TraFlatMap {
    12     public static void main(String[] args) {
    13         SparkConf conf = new SparkConf();
    14         conf.setMaster("local");
    15         conf.setAppName("FlatMap");
    16         System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6");
    17         JavaSparkContext sc = new JavaSparkContext(conf);
    18         JavaRDD<String> context = sc.textFile("D:/tools/data/flatMap/flatMap.txt");
    19         JavaRDD<String> results = context.flatMap(new FlatMapFunction<String, String>() {
    20             @Override
    21             public Iterator<String> call(String s) throws Exception {
    22                 // TODO Auto-generated method stub
    23                 return Arrays.asList(s).iterator();
    24             }
    25         });
    26         System.out.println(results.collect());
    27         
    28     }
    29 }

    python:

     1 # -*- coding:utf-8 -*-
     2 
     3 
     4 from __future__ import print_function
     5 from pyspark import SparkConf
     6 from pyspark import SparkContext
     7 import os
     8 
     9 if __name__ == '__main__':
    10     os.environ['SPARK_HOME'] = 'D:/tools/spark-2.0.0-bin-hadoop2.6'
    11     conf = SparkConf().setAppName('filterTest').setMaster('local')
    12     sc = SparkContext(conf=conf)
    13     data = sc.parallelize(["Hello World","Spark Hadoop Storm","java python c"])
    14     def flatFun(l):
    15         return l.split(" ")
    16     print(data.flatMap(flatFun).collect())

     

  • 相关阅读:
    c# 门禁随笔
    DataTable到Access
    C#文件上传
    C#操作文件
    JavaScript 全局封装
    jsavascript 目录的操作(摘抄)
    12-STM32 ADC原理
    11-STM32 高级定时器TIM1/8
    10-STM32 基本定时器TIM
    9-STM32 WWDG窗口看门狗
  • 原文地址:https://www.cnblogs.com/LgyBean/p/6251413.html
Copyright © 2011-2022 走看看