zoukankan      html  css  js  c++  java
  • SparkContext自定义扩展textFiles,支持从多个目录中输入文本文件

    需求
     
    SparkContext自定义扩展textFiles,支持从多个目录中输入文本文件
     
    扩展
     
    class SparkContext(pyspark.SparkContext):
     
        def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None):
            pyspark.SparkContext.__init__(self, master=master, appName=appName, sparkHome=sparkHome, pyFiles=pyFiles,
                                          environment=environment, batchSize=batchSize, serializer=serializer, conf=conf, gateway=gateway, jsc=jsc)
     
        def textFiles(self, dirs):
            hadoopConf = {"mapreduce.input.fileinputformat.inputdir": ",".join(
                dirs), "mapreduce.input.fileinputformat.input.dir.recursive": "true"}
     
            pair = self.hadoopRDD(inputFormatClass="org.apache.hadoop.mapred.TextInputFormat",
                                  keyClass="org.apache.hadoop.io.LongWritable", valueClass="org.apache.hadoop.io.Text", conf=hadoopConf)
     
            text = pair.map(lambda pair: pair[1])
     
            return text
     
    示例
     
    from pyspark import SparkConf
    from dip.spark import SparkContext
     
    conf = SparkConf().setAppName("spark_textFiles_test")
     
    sc = SparkContext(conf=conf)
     
    dirs = ["hdfs://dip.cdh5.dev:8020/user/yurun/dir1",
            "hdfs://dip.cdh5.dev:8020/user/yurun/dir2"]
     
     
    def printLines(lines):
        if lines:
            for line in lines:
                print line
     
    lines = sc.textFiles(dirs).collect()
     
    printLines(lines)
     
    sc.stop()
     
  • 相关阅读:
    如何在IDEA中查看Gradle项目的所有依赖关系
    YUYV&YV12&mtk6763
    MTK-shot mode
    Thread中,join()方法
    高通平台常用缩写
    Android.mk用法详解
    HAL层编写规范
    MMU
    Linux设备驱动中断机制
    阻塞和非阻塞I/O
  • 原文地址:https://www.cnblogs.com/yurunmiao/p/4893946.html
Copyright © 2011-2022 走看看