zoukankan      html  css  js  c++  java
  • python wordCount

    一、

    以下环境在pycharm中运行

    确保jdk,hadoop,spark,scala 软件在电脑端安装完毕,环境搭建

    pycharm 环境配置

     环境变量配置:

    PYTHONUNBUFFERED=1;  -u 不缓冲stdin、stdout和stderr 默认是缓冲的。同PYTHONUNBUFFERED=1 不用buffer的意思

    PYTHONPATH=C:spark-2.2.0-bin-hadoop2.7python;

    SPARK_HOME=C:spark-2.2.0-bin-hadoop2.7

    from pyspark import SparkContext,SparkConf
    
    conf = SparkConf().setMaster("local").setAppName("liujinjie")
    sc = SparkContext(conf=conf)
    textFile = sc.textFile("D:hello.txt")
    wordCount = textFile.flatMap(lambda line: line.split(",")).map(lambda word:(word,1)).reduceByKey(lambda a,b :a + b)
    wordCount.foreach(print)

     其中flatMap,Map,reduceByKey 含义讲解 link

     二、

    scala 代码

    collect: 收集一个弹性分布式数据集的所有元素到一个数组中,这样便于我们观察,毕竟分布式数据集比较抽象。Spark的collect方法,是Action类型的一个算子,会从远程集群拉取数据到driver端。最后,将大量数据
     汇集到一个driver节点上,将数据用数组存放,占用了jvm堆内存,非常用意造成内存溢出,只用作小型数据的观察。
        val arr = res.collect();

    
    import org.apache.spark.{SparkContext, SparkConf}
    object WordCount {
      def main(args: Array[String]) {
        /*
        创建Spark配置对象SparkConf,设置Spark程序运行时的配置信息,
        SetMaster 来设置程序要链接的Spark集群Master的URL,如果设定为local 则代表在本地运行,
        setAppName 设定应用程序名称,在程序运行监控界面可看到名称
        */
      val conf = new SparkConf().setMaster("local").setAppName("testRdd")
      val sc = new SparkContext(conf) // 等价与  val sc = new SparkContext("local","testRdd")
        /**
        * 创建SparkContext对象,SparkContext是所有程序功能的唯一入口,无论用Scala、Java、Python、R等都必须要有个SparkContext
        * SparkContext核心作用是初始化spark应用程序所需核心组件,包含DAGScheduler,TaskScheduler,SchedulerBackend
        * 还负责 spark 程序往master 注册程序等, 是整个应用程序最重要的一个对象, 通过传入conf  定制spark运行的具体参数跟配置信息
        */
        /**
          * 3.根据具体数据的来源(HDFS,HBase,Local,FS,DB,S3等)通过SparkContext来创建RDD;
          * RDD的创建基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其他的RDD操作;
          * 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴;
          */
        /**
          * 4.对初始的RDD进行Transformation级别的处理,例如map,filter等高阶函数的变成,来进行具体的数据计算
          * 4.1.将每一行的字符串拆分成单个单词
          */
        //对每一行的字符串进行拆分并把所有行的拆分结果通过flat合并成一个大的集合
        val lines = sc.textFile("d://hello.txt")
        val words = lines.flatMap(_.split(","))
        val pairs = words.map{word =>(word,1)}
        val wordCounts = pairs.reduceByKey(_ + _)
        wordCounts.foreach(pair => println(pair._1 + ":" + pair._2))
        sc.stop()
    
    //  data.flatMap(_.split(","))//下划线是占位符,flatMap是对行操作的方法,对读入的数据进行分割
    //    .collect()//将分布式的RDD返回一个单机的scala array,在这个数组上运用scala的函数操作,并返回结果到驱动程序
    //    .foreach(println)//循环打印
      }
    }
    scala> List(1, 2, 3, 4, 5, 6) collect { case i if i % 2 == 0 => i * i }
    res0: List[Int] = List(4, 16, 36)
    
    
    scala> List(1, 2, 3, 4, 5, 6) collect { case i if i % 2 == 0 => i+1 }
    res1: List[Int] = List(3, 5, 7)
    
    
    scala> List(1, 2, 3, 4, 5, 6) collect { case i  => i+1 }
    res2: List[Int] = List(2, 3, 4, 5, 6, 7)
    
    
    scala> List(1, 2, 3, 4, 5, 6) collect { case i  =>(i, i+1 )}
    res3: List[(Int, Int)] = List((1,2), (2,3), (3,4), (4,5), (5,6), (6,7))
    
    
    scala> List(1, 2, 7, 4, 9, 6) collect { case i  =>(i, i+1 )}
    res4: List[(Int, Int)] = List((1,2), (2,3), (7,8), (4,5), (9,10), (6,7))
    
    
    scala> List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i }
    <console>:1: error: ')' expected but ';' found.
           List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i }
                                                            ^
    <console>:1: error: ';' expected but ')' found.
           List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;i)=> i * i }
                                                              ^
    
    scala> List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i }
    <console>:1: error: ')' expected but ';' found.
           List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i }
                                                            ^
    <console>:1: error: ';' expected but ')' found.
           List(1, 2, 3, 4, 5, 6) collect { case i if (i=i+1;true)=> i * i }
                                                                 ^
    
    scala> List(1, 2, 3, 4, 5, 6) collect { case i if {i=i+1;true}=> i * i }
    <console>:8: error: reassignment to val
                  List(1, 2, 3, 4, 5, 6) collect { case i if {i=i+1;true}=> i * i }
                                                               ^
    scala> List(1, 2, 3, 4, 5, 6) collect { case i if {i+1;true}=> i * i }
    res6: List[Int] = List(1, 4, 9, 16, 25, 36)
    


     

    关注公众号 海量干货等你
  • 相关阅读:
    从成本与职责谈测试的核心价值到底是什么
    浅谈测试媛职业发展
    Spotlight监控Oracle--Spotlight On Oracle安装和使用
    Jmeter-阶梯场景设置
    Jmeter-常用线程组设置及场景运行时间计算
    浮点数二分算法
    整数二分算法
    归并排序算法
    快速排序算法
    hadoop3.2+Centos7+5个节点主从模式配置
  • 原文地址:https://www.cnblogs.com/sowhat1412/p/12734187.html
Copyright © 2011-2022 走看看