zoukankan      html  css  js  c++  java
  • 寒假学习进度

    spark中wordcount的实现

    (1)//aggregateByKey
    def wordcount3(sc:SparkContext)={

    val fileRDD: RDD[String] = sc.textFile("D:\\qq text\\1791028291\\FileRecv\\《飘》英文版.txt")
    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )
    val wordmap: RDD[(String, Int)] = wordRDD.map((_, 1))
    val group: RDD[(String, Int)] = wordmap.aggregateByKey(0)(_ + _,_ + _)

    group.collect().foreach(println)
    }

    //foldByKey
    def wordcount4(sc:SparkContext)={

    val fileRDD: RDD[String] = sc.textFile("D:\\qq text\\1791028291\\FileRecv\\《飘》英文版.txt")
    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )
    val wordmap: RDD[(String, Int)] = wordRDD.map((_, 1))
    val group: RDD[(String, Int)] = wordmap.foldByKey(0)(_ + _)

    group.collect().foreach(println)
    }
    //combineByKey
    def wordcount5(sc:SparkContext)={

    val fileRDD: RDD[String] = sc.textFile("D:\\qq text\\1791028291\\FileRecv\\《飘》英文版.txt")
    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )
    val wordmap: RDD[(String, Int)] = wordRDD.map((_, 1))
    val group: RDD[(String, Int)] = wordmap.combineByKey(
    v => v,
    (x: Int, y) => x + y,
    (x: Int, y: Int) => x + y
    )

    group.collect().foreach(println)
    }

    //countByKey
    def wordcount6(sc:SparkContext)={

    val fileRDD: RDD[String] = sc.textFile("D:\\qq text\\1791028291\\FileRecv\\《飘》英文版.txt")
    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )
    val wordmap: RDD[(String, Int)] = wordRDD.map((_, 1))
    val group: collection.Map[String, Long] = wordmap.countByKey()
    println(group)


    }

    //countByValue
    def wordcount7(sc:SparkContext)={

    val fileRDD: RDD[String] = sc.textFile("D:\\qq text\\1791028291\\FileRecv\\《飘》英文版.txt")
    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )

    val group: collection.Map[String, Long] = wordRDD.countByValue()
    println(group)


    }

    //reduce
    def wordcount8(sc:SparkContext)={

    val fileRDD: RDD[String] = sc.textFile("D:\\qq text\\1791028291\\FileRecv\\《飘》英文版.txt")
    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )
    val wordmap: RDD[mutable.Map[String, Long]] = wordRDD.map(
    word => {
    mutable.Map[String, Long]((word, 1))
    }
    )

    val stringToLong: mutable.Map[String, Long] = wordmap.reduce(
    (map1, map2) => {
    map2.foreach{
    case (word,count)=>{
    val newCount=map1.getOrElse(word,0L)+count
    map1.update(word,newCount)
    }
    }
    map1
    }
    )
    println(stringToLong)
    }

     

  • 相关阅读:
    07.Linux-CentOS系统库文件libaudit.so.1丢失问题
    06.Linux-RedHat系统网卡服务连不上活跃连接路径变化
    05.Linux-CentOS系统普通用户SSH远程问题
    04.Linux-CentOS系统SSH连接问题
    03.Linux-CentOS系统user用户改密码问题
    解决pycharm问题:module 'pip' has no attribute 'main'
    Centos7搭建主从DNS服务器
    docker 启动镜像报 WARNING: IPv4 forwarding is disabled. Networking will not work.
    dockerfile文件命令详解
    Elasticsearch集群搭建笔记(elasticsearch-6.3.0)
  • 原文地址:https://www.cnblogs.com/chenghaixiang/p/15795733.html
Copyright © 2011-2022 走看看