zoukankan      html  css  js  c++  java
  • 4Spark学习笔记4

    • SparkCore-WordCount
    package com.lotuslaw.spark.core.wc
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    /**
     * @author: lotuslaw
     * @version: V1.0
     * @package: com.lotuslaw.spark.core.wc
     * @create: 2021-12-02 10:08
     * @description:
     */
    object Spark01_WordCount2 {
    
      def main(args: Array[String]): Unit = {
        // 创建Spark运行配置对象
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    
        // 创建Spark上下文环境对象(连接对象)
        val sc = new SparkContext(sparkConf)
    
        wordCount1(sc).collect().foreach(println)
    
        // 关闭连接
        sc.stop()
      }
    
      // groupBy
      def wordCount1(sc: SparkContext): RDD[(String, Int)] = {
        val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala"))
        val words = rdd.flatMap(_.split(" "))
        val group = words.groupBy(word => word)
        val wordCount = group.mapValues(iter => iter.size)
        wordCount
      }
    
      // groupByKey
      def wordcount2(sc: SparkContext): RDD[(String, Int)] = {
        val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val group = wordOne.groupByKey()
        val wordCount = group.mapValues(iter => iter.size)
        wordCount
      }
    
      // reduceByKey
      def wordCount3(sc: SparkContext): RDD[(String, Int)] = {
        val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordCount = wordOne.reduceByKey(_ + _)
        wordCount
      }
    
      // aggregateByKey
      def wordCount4(sc: SparkContext): RDD[(String, Int)] = {
        val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordCount = wordOne.aggregateByKey(0)(_ + _, _ + _)
        wordCount
      }
    
      // foldByKey
      def wordCount5(sc: SparkContext): RDD[(String, Int)] = {
        val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordCount = wordOne.foldByKey(0)(_ + _)
        wordCount
      }
    
      // combineByKey
      def wordCount6(sc: SparkContext): RDD[(String, Int)] = {
        val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val wordCount = wordOne combineByKey(
          v => v,
          (x:Int, y) => x + y,
          (x:Int, y:Int) => x + y
        )
        wordCount
      }
    
      // countByKey
      def wordCount7(sc: SparkContext): collection.Map[String, Long] = {
        val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala"))
        val words = rdd.flatMap(_.split(" "))
        val wordOne = words.map((_, 1))
        val WordCount = wordOne.countByKey()
        WordCount
      }
    
      // countByValue
      def wordCount8(sc: SparkContext): collection.Map[String, Long] = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
        val wordCount = words.countByValue()
        wordCount
      }
    
      // reduce, aggregate, fold
      def wordCount91011(sc: SparkContext): Unit = {
        val rdd = sc.makeRDD(List("Hello Scala", "Hello Spark"))
        val words = rdd.flatMap(_.split(" "))
    
        val mapWord = words.map(
          word => {
            mutable.Map[String, Long]((word, 1))
          }
        )
    
        mapWord.reduce(
          (map1, map2) => {
            map2.foreach{
              case (word, count) => {
                val newCount = map1.getOrElse(word, 0L) + count
                map1.update(word, newCount)
              }
            }
            map1
          }
        )
      }
    
    }
    
    
  • 相关阅读:
    .net C# 利用Session防重复点击防重复提交
    子报表修改后需要重新导入,0.00显示.00的调整方法
    svn错误 svnserve.conf:12: Option expected解决办法
    mysql远程访问 登录ERROR 1130: is not allowed to connect to this MySQL server解决办法
    phpmyadmin新加用户登陆不了,测试解决方案。
    自己封装的php Curl并发处理,欢迎提出问题优化。
    js和php计算图片自适应宽高算法实现
    jquery获取浏览器宽高
    swftools中的pdf2swf转换Error overflow ID 65535 解决办法
    php 根据ip获取城市以及网络运营商名称(利用qqwry.dat)
  • 原文地址:https://www.cnblogs.com/lotuslaw/p/15640169.html
Copyright © 2011-2022 走看看