zoukankan      html  css  js  c++  java
  • spark 快速入门 java API

    1.1 transform

    l  map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

    l  filter(func) : 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD

    l  flatMap(func):和map差不多,但是flatMap生成的是多个结果

    l  mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition

    l  mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index

    l  sample(withReplacement,faction,seed):抽样

    l  union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合

    l  distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element

    l  groupByKey(numTasks):返回(K,Seq[V]),也就是Hadoopreduce函数接受的key-valuelist

    l  reduceByKey(func,[numTasks]):就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数

    l  sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型

    1.2 action

    l  reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

    l  collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组

    l  count():返回的是dataset中的element的个数

    l  first():返回的是dataset中的第一个元素

    l  take(n):返回前n个elements

    l  takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed

    l  saveAsTextFile(path):把dataset写到一个text file中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中

    l  saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

    l  countByKey():返回的是key对应的个数的一个map,作用于一个RDD

    l  foreach(func):对dataset中的每个元素都使用func

    以下是案例:

    package com.leoao;
    import org.apache.spark.SparkConf;
    /**
    * Created by chengtao on 16/12/27.
    */
    public class Test2 {
    public static void main( String[] args ) {
    SparkConf conf = new SparkConf().setAppName("App").setMaster("local[2]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    // JavaRDD<String> rdd = sc.textFile("/Users/chengtao/downloads/worldcount/ctTest.txt");

    String C = "c 3";
    String D = "d 4";
    String E = "e 5";
    ArrayList<String> listA = new ArrayList<String>();
    listA.add("a 1");
    listA.add("b 2");
    listA.add(C);
    listA.add(D);
    listA.add(E);
    JavaRDD<String> rdd = sc.parallelize(listA);
    System.out.println("listA ----> " + listA); // listA ----> [a 1, b 2, c 3, d 4, e 5]
    List list = rdd.collect();
    for (int i = 0; i < list.size(); i++) {
    System.out.println("rdd ----> " + list.get(i));
    }
    // rdd ----> a 1
    // rdd ----> b 2
    // rdd ----> c 3
    // rdd ----> d 4
    // rdd ----> e 5

    ArrayList<String> listb = new ArrayList<String>();
    listb.add("aa 11");
    listb.add("bb 22");
    listb.add(C);
    listb.add(D);
    listb.add(E);
    JavaRDD<String> rdd2 = sc.parallelize(listb);

    // -------transform
    testSparkCoreApiMap(rdd);
    testSparkCoreApiFilter(rdd);
    testSparkCoreApiFlatMap(rdd);
    testSparkCoreApiUnion(rdd,rdd2);
    testSparkCoreApiDistinct(rdd,rdd2);
    testSparkCoreApiMaptoPair(rdd);
    testSparkCoreApiGroupByKey(rdd,rdd2);
    testSparkCoreApiReduceByKey(rdd);
    // -------action
    testSparkCoreApiReduce(rdd);
    }

    //Map主要是对数据进行处理,不进行数据集的增减:本案例实现,打印所有数据,并在结束加上"test"
    private static void testSparkCoreApiMap(JavaRDD<String> rdd){
    JavaRDD<String> logData1=rdd.map(new Function<String,String>(){
    public String call(String s){
    return s + " test";
    }
    });
    List list = logData1.collect();
    for (int i = 0; i < list.size(); i++) {
    System.out.println(list.get(i));
    }
    }
    /*方法输出:
    a 1 test
    b 2 test
    c 3 test
    d 4 test
    e 5 test
    */

    //filter主要是过滤数据的功能,本案例实现:过滤含有a的那行数据
    private static void testSparkCoreApiFilter(JavaRDD<String> rdd){
    JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){
    public Boolean call(String s){
    if(!(s.contains("a"))){
    return true;
    }
    //return (s.split(" "))[0].equals("a");
    return false;
    }
    });
    List list = logData1.collect();
    for (int i = 0; i < list.size(); i++) {
    System.out.println(list.get(i));
    }
    }
    /*方法输出:
    b 2
    c 3
    d 4
    e 5
    */

    //flatMap 用户行转列,本案例实现:打印所有的字符
    private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){
    JavaRDD<String> words=rdd.flatMap(
    new FlatMapFunction<String, String>() {
    public Iterable<String> call(String s) throws Exception {
    return Arrays.asList(s.split(" "));
    }
    }
    );
    List list = words.collect();
    for (int i = 0; i < list.size(); i++) {
    System.out.println(list.get(i));
    }
    }
    /*方法输出:
    a
    1
    b
    2
    c
    3
    d
    4
    e
    5
    */

    //合并两个RDD
    private static void testSparkCoreApiUnion(JavaRDD<String> rdd,JavaRDD<String> rdd2){
    JavaRDD<String> unionRdd=rdd.union(rdd2);
    unionRdd.foreach(new VoidFunction<String>(){
    public void call(String lines){
    System.out.println(lines);
    }
    });
    }
    /*方法输出:
    a 1
    b 2
    c 3
    d 4
    e 5
    aa 11
    bb 22
    c 3
    d 4
    e 5
    */


    //对RDD去重
    private static void testSparkCoreApiDistinct(JavaRDD<String> rdd,JavaRDD<String> rdd2){
    JavaRDD<String> unionRdd=rdd.union(rdd2).distinct();
    unionRdd.foreach(new VoidFunction<String>(){
    public void call(String lines){
    System.out.println(lines);
    }
    });
    }
    /*方法输出:
    e 5
    d 4
    c 3
    aa 11
    a 1
    bb 22
    b 2
    */

    //把RDD映射为键值对类型的数据
    private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){
    JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
    @Override
    public Tuple2<String, Integer> call(String t) throws Exception {
    String[] st=t.split(" ");
    return new Tuple2(st[0], st[1]);
    }

    });

    pairRdd.foreach(new VoidFunction<Tuple2<String, Integer>>(){
    @Override
    public void call(Tuple2<String, Integer> t) throws Exception {
    System.out.println(t._2());
    }
    });
    }
    /*方法输出:
    1
    3
    2
    4
    5
    */


    // 对键值对类型的数据进行按键值合并
    private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd,JavaRDD<String> rdd1){
    JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
    @Override
    public Tuple2<String, Integer> call(String t) throws Exception {
    String[] st=t.split(" ");
    return new Tuple2(st[0], Integer.valueOf(st[1]));
    }
    });

    JavaPairRDD<String, Integer> pairRdd1=rdd1.mapToPair(new PairFunction<String,String,Integer>(){
    @Override
    public Tuple2<String, Integer> call(String t) throws Exception {
    String[] st=t.split(" ");
    return new Tuple2(st[0], Integer.valueOf(st[1]));
    }
    });

    JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd1).groupByKey();
    pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
    @Override
    public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
    Iterable<Integer> iter = t._2();
    for (Integer integer : iter) {
    System.out.println(integer);
    }
    }
    });
    }

    /*方法输出:
    5
    5
    1
    4
    4
    11
    22
    2
    3
    3
    */


    //对键值对进行按键相同的对值进行操作
    private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){
    JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
    @Override
    public Tuple2<String, Integer> call(String t) throws Exception {
    String[] st=t.split(" ");
    return new Tuple2(st[0], Integer.valueOf(st[1]));
    }
    });

    JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey(
    new Function2<Integer,Integer,Integer>(){
    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1+v2;
    }
    }
    ).sortByKey() ;
    pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){
    @Override
    public void call(Tuple2<String, Integer> t) throws Exception {
    System.out.println(t._2());
    }
    });
    }
    /*方法输出:
    2
    4
    6
    10
    8
    */

    // 对RDD进行递归调用
    private static void testSparkCoreApiReduce(JavaRDD<String> rdd){
    //由于原数据是String,需要转为Integer才能进行reduce递归
    JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){
    @Override
    public Integer call(String v1) throws Exception {
    return Integer.valueOf(v1.split(" ")[1]);
    }
    });

    Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){
    @Override
    public Integer call(Integer v1,Integer v2) throws Exception {
    return v1+v2;
    }
    });
    System.out.println("a ----> " + a);
    }
    /*方法输出:
    a ----> 15
    */
    }










  • 相关阅读:
    HTML5
    带参数
    类的无参方法
    类和对象
    Java新帮派——数组
    神竜出击 合三为一!
    校园欺凌——四位学生的乱伦之战!!!
    GC常见算法
    jstat
    SpringBoot2
  • 原文地址:https://www.cnblogs.com/ctaixw/p/6226187.html
Copyright © 2011-2022 走看看