zoukankan      html  css  js  c++  java
  • Spark 学习笔记

    1. Spark的前世今生

     

    1. 基础语法详解

       

    3.条件控制与循环

     

    4.函数入门

     

    5.函数入门之默认参数

     

    6.函数入门之边长参数

     

    7.函数入门之过程lazy值和异常

     

    8.数组操作之Array、ArrayBuffer以及遍历数组

     

    9.数组转换

     

    10.Map与Tuple

     

    11.面向对象编程

     

    11.1 object类

    object 相当于单个实例,通常在里面放一些静态或者method,第一次调用object的方法时,就回执行object的constructor,也就是object内部不在method中的代码;object不能定义接收参数的constructor。

    Object 的constructor在第一次被调用时执行一次,以后再次调用就不会执行了。Object 通常作为单例模式的实现,或者放class的静态成员,如工具方法。

    11.1.1定义一个简单的类

     

    11.1.2 field的getter与setter

    定义类包含,定义类的field及方法。其格式如下

    class ClassName{ // 其中类名首字母要大写

    private var name = "Hys" // private修试符说明此变量只对本类有效

    def sayHello(){ print("Hello," + name)

    def getName = name // 获取name 的值

    }

    创建类的对象,并调用其方法

    val field_name = new ClassName // 创建类对象

    field_name.sayHello() // 调用类方法

    print(field_name.getName()) // 调用类成员

     

    11.1.3 构造函数-constructor

     

    11.1.4 内部类

     

    11.2 对象

     

     

    11.3 继承

    继承的主要有点:可以减少代码量

    11.3.1 关键字extends

    *父类中用final修饰的field和method是无法被子类继承的

    示例代码:

    class Person {

             private var name = "George"

    def getName = name

    }

    class Student extends Person {

    private var score = "A"

             def getScore = score

    }

    11.3.2 子类覆盖父类field和method

        1.override关键字

            使用此关键字覆盖可以帮助发现代码里的错误

        2.父类方法被覆盖之后还可以使用super调用父类方法

        示例代码:

            class Person{

    private var name = "George"

    def getName = name

    }

    class teacher extends Person{

    private var age = 20

    def getAge = age

    override def getName = "Hello " + super.getName

    }

        3. override field

        示例代码:

    class Person{

    val name:String = "Hys"

    def age:Int = 20

    }

    class Teacher extends Person{

    override val name:String = "George"

            override val age:Int = 30

    }

        4.isInstanceOf 和 asInstanceOf

         isInstanceOf :判断指定对象是否为指定类或其子类的对象,如果对象是null则一定返回false

         asInstanceOf:将指定对象转换为指定类型

        示例代码:

            class Person

    class Teacher extends Person

     

    val p:Person = new Teacher

    val t:Teacher = null

    if(p.isInstanceOf[Teacher]) t = p.asInstanceOf[Teacher]

        5.getClass 和 classOf[类]

         p.getClass 可以获取对象被实例化的类型

         classOf[类] 可以获取精确的类

        示例代码:

        p.getClass == classOf[Person] 判断p是否是被实例化为Person类型

        p.getClass == classOf[Teacher]

    6. 模式匹配进行类型判断

         *判断对象是否和该类及其子类的对象

    p match{

    case per:Person => println("it's Person's object")

    case _ => println("Unknown type")

    }

        7.关键字protected

         *父类中使用protected 修试的field 和 method在子类中直接就可以访问。

         *当使用protected[this]时只能在"子类 当前对象"中访问父类的,其它对象不可以,包括同类对象。

        代码示例:

        class Person{

    protected var name:String = "Hys"

    }

    class Teacher extends Person{

    def sayHello() = {println("Hello " + name)}

    }

    val t = new Teacher

    t.sayHello

    8. 调用父类的构造函数constructor

    *只能在子类的主constructor中调用父类的constructor;原因是辅构造函数只能调用本类的主构造函数或者辅构造函数

    * 父类中接收的参数,子类中就不要用任何val 或者var 来修饰,否在认为是要覆盖父类的field

    示例代码:

    class Person(val name:String, val age:Int)

    class Teacher(name:String, age:Int, var score:Double) extends Person(name,age){

    def this(name:String){

    this(name,0,0)

    }

    def this(age:Int){

    this("Hys",age,0)

    }

    }

    8.调用父类的constructor

    * 每个类的辅助构造函数(constructor)只能调用本类的辅助constructor或者主constructor,所以要调用父类构造函数必须使用子类的主构造函数。

    *如果是父类中接收的参数,子类接收时就不要用var或者val 来修饰了,否则会认位是覆盖父类的field 。

    示例代码:

    Class Person(val name:String, val age:Int){

        Println("Person's name is " + name)

        Println("Person's age is " + age)

    }

    Class Teacher(name:String, age:Int, var score:Double) extends Person(name, age){

        def this(name:String){

    this(name:String)

    }

    def this(age:Int){

    this("Hys ",age,0)

    }

    Println(name + " " + age + " " + score)

    }

    val t = new Teacher("George", 20, 100)

    Person's name is George

    Person's age is 20

    George 20 100.0

    9.匿名内部类

    *匿名子类:定义的没有名字的子类,同时直接创建其对象,然后将对象的引用赋予一个变量。然后匿名子类的对象也可以传给其它函数。

    示例代码:

    class Person(protected val name:String){

    def sayHello = "Hello, I'm " + name

    }

    val p = new Person("Hys") {

    override def sayHello = "Nice to meet you " + name} //匿名子类

    def greeting(p:person{def sayHello:String}){ //把匿名子类当成参数传给函数

    println(p.sayHello)

    }

    greeting(p) //调用匿名子类实例

    10.抽象类

    *抽象方法:定义时只有方法名和方法类型的方法。

    *抽象类:包含抽象方法的类就是抽象类,即只要类中有一个抽象方法此类就为抽象类。

    *抽象类必须用abstract关键词修饰。

    *抽象类不可以被实例化。

    *在子类中覆盖抽象类的方法时不需要用override关键字。

    代码示例:

    abstract class Person(){ //定义抽象类

    def sayHello:String

    }

    class Teacher(name:String) extends Person{ //继承抽象类

    def sayHello:String = {println("Hello" + name ); name} // 覆盖父类方法

    }

    val t = new Teacher("Hys")

    t.sayHello

    11.抽象field

    *抽象field:定义field时只给出类型没有给出初值,这样的field就是抽象field。

    *Scala会根据自己的规则为var或者val类型的field生成对应的getter和setter方法,但是父类中是没有field的,就父类中的抽象field不占内存。

    *子类必须覆盖父类的抽象field。

    *子类覆盖父类的抽象field不需要override关键字。

    示例代码(子类不覆盖父类抽象field报错):

    Abstract class Person{ //定义抽象类

    Val name:String

    }

    class Teacher extends Person{} //子类不重写代码,报错

    (子类正常继承父类)

    class Teacher extends Person{

    val name:String = "Hys" // 覆盖父类抽象方法

    }

    (子类把父类的抽象field继承为抽象field操作)

    class Teacher extends Person{ val name:String } // 把父类抽象方法继承为抽象方法,如果不用abstract修饰子类会有报错

    abstract class Teacher extends Person{ val name:String} //正确的把父类抽象方法继承为抽象方法

    11.4 Trait

    11.4.1基础知识

    1.将trait作为接口使用

    *类使用关键字extends继承

    *与Java中的接口一样

    *抽象方法不需要overrid管家你

    *Scala不支持对类进行多继承,但可以多继承trait,使用with关键字

    代码示例:

    trait HelloTrait{

    def sayHello(name:String)

    }

    trait MakeFriendsTrait{

    def makeFriends(p:Person)

    }

    class Person(val name:String) extends HelloTrait with MakeFriendsTrait with Cloneable with Serializable{

    def sayHello(name:String) = println("Hello " + name)

    def makeFriends(p:Person) = println("Hello, my name is " + name + " your name is " + p.name)

    }

    2.在trait中定义具体方法

    *就像类中定义方法一样

    *不同之处在子类中调用是直接使用方法名就可以

    trait computer{

    def playVideo()={println("Play video")}

    }

    class Person extends computer{

    playVideo()

    def play()={playVideo()}

    }

    val p = new Person

    p.playVideo

    3.在trait中定义具体字段

    *与类继承不同,trait定义的具体字段直接添加到了继承它的类中

    trait Person{

    val eyeNum:Int = 2

    }

    class Teacher(val name:String, val age:Int) extends Person{

    println("Hello "+ name + ", you have " + eysNum + "eyes")

    }

    4.在trait中定义抽象字段

    *trait中可以定义抽象字段,同时trait中的具体方法可以基于抽象字段来写,只不过继承trait的类必须覆盖抽象的field,提供具体的值

    trait SayHello {

    val name:String

    def sayHello()={"Hello " + name}

    }

     

    class Person extends SayHello{

    val name:String = "Hys" //覆盖trait中抽象字段

    def makeFriends(p:Person){

    sayHello()

    println(name + ", nice to meet you")

    }

    }

    11.4.2高级知识

    1.为实例对象混入trait

    *只有本对象可以调用

    trait Loger{

    def log(msg:String){print("loger trait")}

    def logTrait(){println("loger trait")}

    }

    trait MyLoger extends Loger{

    override def log(msg:String){println("log:" + msg)}

    }

    class Person(val name:String) extends Loger{

    def sayHello{println("Hi,I'm " + name );log("sayHello is invoked!")}

    }

    val p2 = new Person("George") with MyLoger //动态混入trait

    val p1 = new Person("Hys")

    scala> p2.logTrait()

    trait

    scala> p1.log("")

    loger trait

    2.trait调用链

    *继承同一个trait的多个trait,在具有相同method的最后都执行super.method,就可以从右向左依次执行每个被继承的多个trait中相同的方法,从而实现调用链。

    代码示例:

    trait Loger{

    def vs(){}

    def kk(){}

    }

    trait PersonReady extends Loger{

    override def vs(){

    println("I am ready")

    super.vs() //执行此方法

    }

    }

    trait PersonGo extends Loger{

    override def vs(){

    println("Please go!")

    super.vs()

    }

    }

    trait PersonEnd extends Loger{

    override def vs(){

    println("To be end!")

    super.vs()

    }

    }

    class PersonVs extends PersonEnd with PersonGo with PersonReady{

    def aURD(name:String){

    println("Are you ready")

    vs() //执行调用链

    }

    }

    3.在trait中覆盖抽象方法

    *当子trait调用super.method_name 时要要给子trait的方法加上abstract override修饰

    示例代码1(无super.method_name调用):

    trait Logger{

    def log(msg:String)

    }

    trait Mylogger extends Logger{

    override def log(msg:String){}

    }

    示例代码2(有super.method_name调用):

    trait Logger{

    def log(msg:String)

    }

    trait Mylogger extends Logger{

    abstract override def log(msg:String){super.log("kk")} // 当调用super.method_name时要用abstract修饰

    }

    4.混合使用trait的具体方法和抽象方法

    *可以让trait中具体方法依赖于抽象方法,而抽象方法则放到继承trait的类中去实现,就好像是一个模板

    trait Valid{

    def getName:String //在trait中定义抽象方法

    def valid:Boolean={ //在trait中定义具体方法

    getName == "Hys"

    }

    }

    class Person(val name:String) extends Valid{

    println(valid) //此语句需要执行的函数及次序是construceto->getName->vaild

    def getName = {println("Get the name.")

    name

    }

    }

    5.trait的构造机制

    *trait中不包含在任何方法中的代码就是trait的构造代码

    *构造代码执行优先级 父类构造函数->多个trait从左到右依次执行->父trait的构造代码->子类的构造函数

    示例代码:

    class Person{print("Person's constructor")}

    trait Logger{println(" Logger constructor")}

    trait Logger{println(" Logger constructor")}

    trait TimeLogger extends Logger{println("TimeLogger")}

    class Teacher extends Person with TimeLogger with MyLogger{println("Tescher class")}

    实例化演示:

    scala> val t = new Teacher

    Person's constructor

    Logger constructor

    TimeLogger

    Mylogger constructor

    Tescher class

    6.trait字段的初始化

    *trait不可以接收带参数的构造函数,如果想使用trait初始化field,可以使用Scala的提前定义特性

    示例代码:

    trait SayHello{

    val msg:String //需要初始化的字段

    println(msg.toString) // 对初始化字段的调用

    }

    class Person

    val p = new Person with SayHello //报错

    val p = new {val msg:String = "Hys" //提前定义

    } Person with SayHello

    或者

    class Person extends { val msg:String = "Hys" //提前定义

    } with SayHello

    还有办法就是使用lazy value

    *如果不用lazy仍然会报异常

    trait SayHello {

    lazy val msg:String = null

    println(msg:toString)

    }

    class Person extends SayHello //继承的时候没有用lazy,实例化时有异常发生

    val p = new Person //有异常跳出

    class Person extends SayHello{ override lazy val msg:String = "George"} //使用lazy覆盖该field,实例化时正常

    val p = new Person

    7.让trait继承类

    *trait也可以继承自class,此时class就回成为所有继承该trait的类的父类

    示例代码:

    class Hys { println("class Hys")}

    trait George extends Hys {println("trait George")} //trait 继承类

    class Person extends George{println("class Person")} //之类继承继承了类的trait,此时trait继承的类相当于继承了该trait的父类。

    scala> val p = new Person

    class Hys

    trait George

    class Person

    由构造函数执行内容和顺序可知Hys类是Person的父类

    12.函数式编程

    12.1 将函数值赋给变量

    *将函数赋值给变量时必须在函数后面加上空格和下划线

    def sayHello(name:String){println("Hello " + name)} //定义一个函数

    val sayHelloFunc = sayHello _ //把函数赋值给变量(注意格式)

    sayHelloFunc("Hys") //调用函数

    12.2 匿名函数

    *没有定义函数名的函数称为匿名函数

    *匿名函数可以直接赋值给变量

    *匿名函数也可以直接传入其它函数中

    *匿名函数的语法规则:(fieldc_name1:type, field_name2:type, ……, field_namen:type) => 函数体 或者 (参数名:参数类型) => 函数体

    val sayHelloFunc = (name:String) => println("Hello " + name) //直接把匿名函数赋值给变量, 箭头后面的是函数体

     

    12.3 高阶函数(higer-order function)

    *定义:接收其它函数作为参数的函数,返回函数的函数

    *语法1:def higer-order-func_name(func_name:type => return type,func_field:type){func_name(func_field)}(接收其它函数作为参数)

    *语法2:def higer-order-func_name(higer-order-func-filed_name:type) = (func_field:type) => 要返回的函数的主题

    示例代码1(接收其它函数作为参数):

    val sayHelloFunc = (name:String) => println("Hello " + name) //定义一个函数

    def greeting(func:String => Unit,name:String){func(name)} //定义一个高阶函数

    greeting(sayHelloFunc,"Hys") //调用高阶函数

    示例代码2(把函数作为返回值):

    def getGreetingFunc(msg:String) = (name:String) => println(msg + "," + name)

    val greetingFunc = getGreetingFunc("Hello")

    greetingFunc("Hys")

    **一个用高阶函数及匿名函数的一个实例

    Array(1,2,3,4,5).map((num:Int) => num*num)

    res2: Array[Int] = Array(1, 4, 9, 16, 25)

    以上函数的意义为,矩阵调用了map()函数,然后map()函数又调用了匿名函数求矩阵每一个函数的乘积,并返回一个新的矩阵。矩阵的map()函数即为一个高阶函数。

    12.4 高阶函数的类型推断

    *调用高阶函数可以自动推断出参数类型,而不需要写明类型;对于只有一个参数的函数,还可以省略其小括号;如果仅有一个参数在右侧的函数体内使用一次,则还可以将接收参数省略,并将参数用"_"来代替。

    代码示例:

    def greeting(func:(String) => Unit,name:String){func(name)} //定义一个高阶函数

    greeting((name:String) => println("Hello " + name),"Hys") //不省略时的代码

    greeting((name) => println("Hello " + name),"Hys") //省略参数类型

    greeting(name => println("Hello " + name),"Hys") //函数只有一个参数,把参数类型和小括号都省略掉

    def triple(func:(Int) => Int)={func(5)} //定义一个只有一个参数的高阶函数

    triple(num => num*9) //普通调用方法

    triple(9*_) //如果高阶函数的参数只有调用函数的参数,则调用时可以只有一个匿名函数作为参数,且被调用的函数的参数可以用"_"来代替

    12.5 Scala的常用高阶函数

    12.5.1

    12.5.2

    12.6 闭包

    12.6.1

    12.6.2

    12.7 SAM转换

    12.7.1

    12.7.2

    12.8 Currying函数

    12.8.1 又

    12.8.2

    12.9 return

    12.9.1

    12.9.2

     

     

     

    20.Scala编程详解隐式转换与隐式参数

    隐式转换:即将某种类型的对象转换为其它类型的对象。Scala根据隐式转换函数的签名,在程序中使用隐式转换函数接收的参数类型定义的对象,自动将其传入隐式转换函数,转换为另一种类型的对象并返回。

    核心:定义隐式转换函数,即implicit conversion function。

    隐式转换函数命名建议:以"one2one"的形式

    20.1隐式转换

    *在程序可见的范围内定义隐式转换函数,Scala自动使用隐式转换函数。

    *与普通函数的区别:要以implicit开头,同时最好要定义一是转换返回类型。

    代码示例:

    import scala.language.implicitConversions //导入与隐式转换有关的包

    class SpecialPerson(val name:String) //定义特殊人群的类

    class Student(val name:String) //定义学生类

    class Older(val name:String) //定义老人类

    implicit def object2SpecialPerson(obj:Object):SpecialPerson = { //定义隐式转换函数

    if (obj.getClass == classOf[Student]){val stu = obj.asInstanceOf[Student];new SpecialPerson(stu.name)}

    else if (obj.getClass == classOf[Older]){val older = obj.asInstanceOf[Older]; new SpecialPerson(older.name)}

    else Nil

    }

    def buySpecialTicket(p:SpecialPerson) = {ticketNumber += 1; "T-" + ticketNumber}

    val stu1 = new Student("Hys") //声明Student

    buySpecialTicket(stu1)

    val older1 = new Older("George")

    buySpecialTicket(older1)

     

    20.2使用隐式转换加强现有类型

    *使用隐式转换可以为某个类定义 一个加强版的类,源类可以通过隐式转换直接使用加强类中的方法。

    示例代码:

    import scala.language.implicitConversions //导入与隐式转换有关的包

    class Man(val name:String)

    class SuperMan(val name:String)

    implicit def man2superman(man:Man):SuperMan = new SuperMan(man.name) //隐式转换Man为SuperMan

    val hys = new Man("Hys") //实例化一个Man类

    hys.emiLaser //直接调用SuperMan类中的方法

     

    20.3隐式转换函数作用域和导入

    *Scala 使用的默认的隐式转换位置:一种是当前程序作用域内可以用唯一标识表示的隐式转换函数;一种是源类型,或者目标类型的伴生对象内的隐式转换函数。

    *如果隐式转换函数不在上述两种情况中,就必须手动import,某个包下的隐式转换函数。

    *仅仅在需要隐式转换的地方进行隐式转换,缩小隐式转换作用域,避免不必要的隐式转换。

     

    20.4隐式转换的发生时机

    *调用某个函数,但是给函数传入的参数的类型,与函数定义的接收参数的类型不匹配。

    *使用某个类型的对象,调用某个方法,但是这个方法不存在于该类型时。

    *使用某个类型的对象,调用对象中存在的某个方法,但是给方法传入的参数参数类型与已存在的方法定义的接收参数类型不同时。

     

    20.5隐式参数

    *隐式参数:在函数或者方法中,定义一个用implicit修饰的参数,此时Scala会尝试找到一个指定类型,用implicit修饰的对象,即隐式值,并注入参数。

    *Scala查找隐式参数的范围:一是当前作用域内可见的val或者var定义的隐式变量;一种是隐式参数类型的伴生对象内的隐式值。

    示例代码:

    class SignPen{def write(content:String) = println(content)} //定义签字笔对象

    implicit val sigPen = new SignPen //定义隐式参数

    def signForExam(name:String)(implicit signPen:SignPen){signPen.write(name + "come to exam in time.")} //定义一个函数并调用隐式参数

    signForExam("Hys") //调用使用了隐式参数的函数

     

    21.Actor入门

    *Scala Actor VS Java多线程

    *两者类似

    *不同之处:Actor 尽可能避免锁和共享状态,从而避免了多线程并发时出现资源征用的情况,进而提升多线程的性能。同时Actor模型还可以避免死锁等一系列传统多线程编程问题。

    21.1Actor的创建、启动和消息收发

    *Scala 通过Actor trait 来更方便的进行多线程编程。

    *Actor trait 类似于Java中的 Thread 和 Runnable,是基础的多线程接口。

    *通过重写Actor trait 的act 方法实现自己的线程执行体,类似于Java中的run方法。

    *使用Start()方法启动Actor;使用符号"!"向actor发送消息;Actor内部使用receive 和模式匹配接收消息。

    *Scala2.12已经移除actors模块。

    代码示例:

    class HelloActor extends Actor{ //创建继承Actor trait的类

    def act(){

    while(true){

    receive{

    case name:String => println("Hello," + name)

    val h_w = new HelloWorld(name)

    }

    }

    }

    }

    val h_a = new HelloActor //实例化HelloActor

    h_a.start() //启动h_a

    h_a!"George" //传入消息"George"

    21.2收发case class 类型的消息

    *Scala Actor 天然支持线程之间的精准通信,一个actor可以给其它的actor直接发送消息。

    *在Scala中,通常建议使用样例类,即case class来作为消息进行发送。

    *actor接收消息之后,可以使用模式匹配功能来进行不同消息的处理。

    示例代码:

    case class Login(username:String, password:String) //创建case class

    case class Register(username:String, password:String)

    class UserManagerActor extends Actor{ //创建类继承Actor

    def act(){

    while(true){

    receive{

    case Login(username,password) => println("login,username is " + username + ", password is " + password) //使用模式匹配处理消息

    case Register(username,password) => println("register,username is " + username + ", password is " + password)

    }

    }

    }

    }

    val userManagerActor = new UserManagerActor //实例化userManagerActor

    userManagerActor.start() // 启动actor

    userManagerActor!Register("George", "123") //传入case class 类型的参数

    userManagerActor!Login("leo", "123")

    21.3 Actor之间互相收发消息

    *两个Actor之间可以互相收发消息。

    *建议:不同Actor之间互相发送消息时带上自己的引用,这样当对方Actor收到自己的消息时,可以直接通过引用回复消息。

    示例代码:

    case class Message(content:String,sender:Actor) //定义消息类

    class LeoTelephoneActor extends Actor{ //定义第一个Actor

    def act(){

    while(true){

    receive{

    case Message(content,sender) => {println("leo telephont:" + content); sender!"I'm leo, please call me after 10 minuters."}

    }

    }

    }

    }

     

    class JackTelephoneActor(val leoTelephoneActor:Actor) extends Actor{ //定义第二个Actor

    def act(){

    leoTelephoneActor!Message("Hello,Leo,I'm jack.", this)

    receive{

    case response:String => println("Jack telephone: " + response)

    }

    }

    }

    val lt = new LeoTelephoneActor //实例化

    al jt = new JackTelephoneActor(lt)

    lt.start() //启动

    jt.start() //启动的同时进行相互通信

    21.4 同步消息和Future

    *使用"!?" 的方式发送消息,可以强制对方立即回复,从而实现消息同步,即val reply = actor!?message

    *使用"!!" 的方式发送消息,可以实现异步发送消息,即Future语法,val future = actor !! message and val reply = future()。

    22.使用Java开发worldcount程序

    22.1配置Maven环境

     

     

    22.2如何进行本地测试

    22.2.1创建SparkConf对象

    设置Spark应用配置信息

        SparkConf conf = new SparkConf()
    .setAppName("WorkCountLocal") //设置APP名称
    .setMaster("local"); // "local"代表本地执行

     

    22.2.2 创建JavaSparkContext对象

    此对象是Spark所有功能的入口,编写不同的Spark应用程序使用不同的SparkContext

    JavaSparkContext sc = new JavaSparkContext(conf);

    22.2.3 针对不同 的输入源创建一个初始化RDD

    JavaRDD<String> lines = sc.textFile("E:\files\LearnFiles\Spark\filesCreateByMyself\worldCountTest.txt"); //本地RDD创建

    JavaRDD<String> lines = sc.textFile("hdfs://hadoop-1:9000/wordcounttest/worldCountTest.txt"); //hdfs RDD创建
    								

    22.2.4 对RDD进行transformation操作

    将读入的每一行拆分成单个的单词

    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String s) throws Exception {
    List<String> list = new ArrayList<String>();
    String[] sp = s.split(" ");
    for(int j = 0 ;j < sp.length; j++){
    list.add(sp[j]);
    };
    Iterator<String> ill = list.iterator();
    return ill;
    }
    });

    // 接下来将每一个单词映射为(单词,1)的格式,这是一个tuple
    JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Integer> call(String word) throws Exception {
    return new Tuple2<String, Integer>(word, 1);
    }
    });

    //接下来使用reduce操作统计单词出现的次数
    //reduceByKey操作相当于把key相同的value值累加,最后以tuple的形式返回JavaRDD中的key及对应的value累加值
    final JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1 + v2;
    }
    });

    // 创建action执行程序
    wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
    @Override
    public void call(Tuple2<String, Integer> wordCount) throws Exception {
    System.out.println(wordCount._2 + "次"+ ": " + wordCount._1 );

    }
    });
    // 关闭sc
    sc.close();

     

     

     

    22.3如何使用Spark-submit提交到Spark集群执行(Spark-submit常用参数说明,Spark-submit类似于Hadoop的Hadoop jar命令)

    1.打包程序文件

    2.把jar包上传到集群

    3.编写spark-submit脚本

    4.执行脚本,提交Spark-APP到集群运行

    5.或者是直接命令行使用Spark-submit提交Spark-App

    案例:/usr/local/spark/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --num-executors 3 --executor-memory 200m --executor-cores 5 /root/spark-study/spark-study.ja

  • 相关阅读:
    MPI linux Ubuntu cluster 集群
    python cython c 性能对比
    TCAM CAM 说明 原理 结构 Verilog 硬件实现
    verilog 计算机网络 仿真 激励 pcap
    libtrace 安装 使用 修改
    Dream Spark ------spark on yarn ,yarn的配置
    Dream_Spark-----Spark 定制版:005~贯通Spark Streaming流计算框架的运行源码
    Dream_Spark-----Spark 定制版:003~Spark Streaming(三)
    Dream_Spark-----Spark 定制版:004~Spark Streaming事务处理彻底掌握
    Dream_Spark定制第二课
  • 原文地址:https://www.cnblogs.com/hongyunshui666/p/9602037.html
Copyright © 2011-2022 走看看