zoukankan      html  css  js  c++  java
  • spark入门(二)RDD基础操作

    1 简述

    spark中的RDD是一个分布式的元素集合。

    在spark中,对数据的所有操作不外乎创建RDD,转化RDD以及调用RDD操作进行求值,而这些操作,spark会自动将RDD中的数据分发到集群上,并将操作并行执行。

    2 创建 RDD

    创建RDD分两种:读取外部数据集,在程序中对一个集合进行并行化。

    2.1 读取外部数据集:

    常用的方式是读取外部的数据集,比如文本文件读入为一个RDD:

    scalac版:

    val lines = sc.textFile("D:workspacescala_workspacedemo.txt")

    java版:

    JavaRDD<String> lines = sc.textFile("D:workspacejava_workspacedemo.txt");

    2.2 在程序中对集合进行并行化:

    最简单的方法是把集合传给SparkContext中的parallelize()方法:

    scalac版:

    val lines = sc.parallelize(["a", "b"])

     java版:

    JavaRDD<String> lines = sc.parallelize(Arrays.asList("a", "b"));

    3 转化操作

    RDD的转化操作是返回新的RDD的操作。

    假设有一个日文件log.txt,希望筛选出包含error的记录。使用rdd的filter()方法如下操作:

    scalac版:

    val rdd = sc.textFile("log.txt")
    val errorRDD = rdd .filter(line => line.contains("error"))

     java版:

    JavaRDD<String> rdd = sc.textFile("log.txt");
    JavaRDD<String> errorRDD = rdd.filter (
        new Function<String, Boolean>() {
            public Boolean call(String str) {
                return str.contains("errors");
            } 
        }   
    )        

    注意:filter()方法不会改变已有的rdd中的数据。

    4 向spark 传递函数

    4.1 scala

    在Scala中,我们可以传递定义的内联函数,引用方法:

     1 class SearchFunctions(val query: String) {
     2   def isMatch(s: String): Boolean = {
     3     s.contains(query)
     4   }
     5   def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
     6     rdd.map(isMatch)
     7   }
     8   def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
     9     rdd.map(x => x.split(query))
    10   }
    11   def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
    12     val query_ = this.query
    13     rdd.map(x => x.split(query_))
    14   }
    15 }

    4.2 java

     1 class Contains implements Function<String, Boolean>() {
     2     private String query;
     3 
     4     public Contains(String query) { this.query = query; }
     5 
     6     public Boolean call(String x) { return x.contains(query); }
     7 }
     8 
     9
    10 RDD<String> errors = lines.filter(new Contains("error"));

     

  • 相关阅读:
    [轉][Windows] 已啟用Win7遠端桌面,從家中連回去卻無法連線?
    [轉]False SQL Injection and Advanced Blind SQL Injection
    SQL Injection with INFORMATION_SCHEMA (Mysql)
    Exploiting hard filtered SQL Injections
    Mysql 5 以上有内置库 information_schema,存储着mysql的所有数据库和表结构信息
    12个月内自学完成4年麻省理工学院计算机科学的33门课程的scotthyoung所谓的超速学习理论&方法(费曼技巧)?
    SQLi filter evasion cheat sheet (MySQL)
    [轉]字符形注入
    [轉]渗透测试必备Firefox全套渗透装
    Phpexcel範例
  • 原文地址:https://www.cnblogs.com/zcjcsl/p/7923330.html
Copyright © 2011-2022 走看看