zoukankan      html  css  js  c++  java
  • SparkContext自定义扩展textFiles,支持从多个目录中输入文本文件

    需求
     
    SparkContext自定义扩展textFiles,支持从多个目录中输入文本文件
     
    扩展
     
    class SparkContext(pyspark.SparkContext):
     
        def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None):
            pyspark.SparkContext.__init__(self, master=master, appName=appName, sparkHome=sparkHome, pyFiles=pyFiles,
                                          environment=environment, batchSize=batchSize, serializer=serializer, conf=conf, gateway=gateway, jsc=jsc)
     
        def textFiles(self, dirs):
            hadoopConf = {"mapreduce.input.fileinputformat.inputdir": ",".join(
                dirs), "mapreduce.input.fileinputformat.input.dir.recursive": "true"}
     
            pair = self.hadoopRDD(inputFormatClass="org.apache.hadoop.mapred.TextInputFormat",
                                  keyClass="org.apache.hadoop.io.LongWritable", valueClass="org.apache.hadoop.io.Text", conf=hadoopConf)
     
            text = pair.map(lambda pair: pair[1])
     
            return text
     
    示例
     
    from pyspark import SparkConf
    from dip.spark import SparkContext
     
    conf = SparkConf().setAppName("spark_textFiles_test")
     
    sc = SparkContext(conf=conf)
     
    dirs = ["hdfs://dip.cdh5.dev:8020/user/yurun/dir1",
            "hdfs://dip.cdh5.dev:8020/user/yurun/dir2"]
     
     
    def printLines(lines):
        if lines:
            for line in lines:
                print line
     
    lines = sc.textFiles(dirs).collect()
     
    printLines(lines)
     
    sc.stop()
     
  • 相关阅读:
    Effective Java 的笔记(二)
    设计模式系列 装饰模式
    一道多线程题目的解决方案
    Effective Java 的笔记(一)
    Java 并发编程实践
    【转】微博技术底层架构的实现
    Head First JavaScript 笔记
    JVM 学习笔记 类的加载和执行
    背包问题
    Oracle 序列号通过定时任务重置
  • 原文地址:https://www.cnblogs.com/yurunmiao/p/4893946.html
Copyright © 2011-2022 走看看