zoukankan      html  css  js  c++  java
  • hadoop配置2.6.1 centos7

    上传文件(分发)的三种方式:

    1.本地:

    -file 的模式,上传一些小的文件。

    例如:

    -file ./test

    INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
    OUTPUT_PATH="/output_file_broadcast"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
    
    # Step 1.
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py mapper_func white_list" 
        -reducer "python red.py reduer_func" 
        -jobconf "mapred.reduce.tasks=3" 
        -file ./map.py 
        -file ./red.py 
        -file ./white_list      
    run.sh

    2.-cacheFile  ,向计算节点分发hdfs文件。(文件需要先上传到HDFS中)

    例如:

    -cacheFile "hdfs://master:9000/white_list#ABC"

    3.-cacheArchive,向计算节点分发hdfs文件。(文件需要先上传到HDFS中)

    例如:

    -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz"  
    这种情况是streaming结构会自动给你解压文件,不用你去考虑。只需要改相应的文件路径就好了。
    def get_file_handler(f):
        file_in = open(f, 'r')
        return file_in
    
    def get_cachefile_handlers(f):
        f_handlers_list = []
        if os.path.isdir(f):
            for fd in os.listdir(f):
                f_handlers_list.append(get_file_handler(f + '/' + fd))
        return f_handlers_list
    
    
    def read_local_file_func(f):
        word_set = set()
        for cachefile in get_cachefile_handlers(f):
            for line in cachefile:
                word = line.strip()
                word_set.add(word)
        return word_set
    
    
    def mapper_func(white_list_fd):
        word_set = read_local_file_func(white_list_fd)
    
        for line in sys.stdin:
            ss = line.strip().split(' ')
            for s in ss:
                word = s.strip()
                #if word != "" and (word in word_set):
                if word != "":
                    print "%s	%s" % (s, 1)
    
    
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        func(*args)
    map.py
    
    
    #!/usr/bin/python
    
    import sys
    
    def reduer_func():
        current_word = None
        count_pool = []
        sum = 0
    
        for line in sys.stdin:
            word, val = line.strip().split('	')
    
            if current_word == None:
                current_word = word
    
            if current_word != word:
                for count in count_pool:
                    sum += count
                print "%s	%s" % (current_word, sum)
                current_word = word
                count_pool = []
                sum = 0
    
            count_pool.append(int(val))
    
        for count in count_pool:
            sum += count
        print "%s	%s" % (current_word, str(sum))
    
    
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        func(*args)
    
    red.py
    
    HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
    
    INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
    OUTPUT_PATH="/output_cachearchive_broadcast"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
    
    # Step 1.
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py mapper_func WH.gz" 
        -reducer "python red.py reduer_func" 
        -jobconf "mapred.reduce.tasks=10" 
        -jobconf  "mapred.job.name=cachefile_demo" 
        -jobconf  "mapred.compress.map.output=true" 
        -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
        -jobconf  "mapred.output.compress=true" 
        -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
        -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" 
        -file "./map.py" 
        -file "./red.
    red.py
    HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
    #!/user/bin/env python
    #上面这个是让系统自己寻找python可执行文件
    
    #输入文件,多个文件可以使用,分隔,前提文件需要先上传到hdfs上。
    INPUT_FILE_PATH_1="/1.txt,/2.txt"  
    
    #hdfs上的输出文件目录的位置
    OUTPUT_PATH="/table1"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
    
    # Step 1.
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py " 
        -reducer "python red.py " 
        -file ./map.py 
        -file ./red.py 
        -jobconf mapred.reduce.tasks=2     #设置reduce的数量
        
        #下面两行:是开启map阶段产生的数据是否压缩,第二行是压缩的格式
        -jobconf  "mapred.compress.map.output=true"             ###1
        -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"    ###1
        
        #下面两行是:最终输出的是否开启压缩,及其压缩的格式
        -jobconf  "mapred.output.compress=true"         ###2
        -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec"    ###2
      
        #下面是压缩文件上传的位置 “#”后面是别名,在配置文件中可以使用,slave节点#在运行过程中也是使用别名来建目录的。   
         -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz"                         ###第三种传文件的方式。
        
        #下面第一行是表示以什么分隔,默认是制表符“	”
        #第二行是以分隔后的前两个作为key,剩下为value
        #第三行是在key中以,分隔,
        #第四行是在第三行分隔后,用第一列分桶
        -jobconf stream.map.output.field.separator=','  / 
        -jobconf stream.num.map.output.key.fields=2     -jobconf map.output.key.field.separator=',' / 
        -jobconf num.key.fields.for.partition=1 
        #下面是在你自己设置partition时写入的东西。
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    run.sh

    -jobconf mapred.text.key.partitioner.options=-k2,3   相当于-jobconf num.key.fields.for.partition=1
    的扩展,意思是在key中,选择2,3列作为partition
    在没有设partion的时候,默认等于
    先分桶,之后再在桶中按照key排序,

    补充:!!!

    可以通过压缩文件的方式,控制map的数量,一个压缩文件对应一个map

    还可以不影响路径,即可以让目录结构保持不变.



    -----------------------------------------
    def get_file_handler(f):
        file_in = open(f, 'r')
        return file_in
    
    def get_cachefile_handlers(f):
        f_handlers_list = []
        if os.path.isdir(f):
            for fd in os.listdir(f):
                f_handlers_list.append(get_file_handler(f + '/' + fd))
        return f_handlers_list
    
    
    def read_local_file_func(f):
        word_set = set()
        for cachefile in get_cachefile_handlers(f):
            for line in cachefile:
                word = line.strip()
                word_set.add(word)
        return word_set
    
    
    def mapper_func(white_list_fd):
        word_set = read_local_file_func(white_list_fd)
    
        for line in sys.stdin:
            ss = line.strip().split(' ')
            for s in ss:
                word = s.strip()
                #if word != "" and (word in word_set):
                if word != "":
                    print "%s	%s" % (s, 1)
    
    
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        f
    #!/usr/bin/python
    
    import sys
    
    def reduer_func():
        current_word = None
        count_pool = []
        sum = 0
    
        for line in sys.stdin:
            word, val = line.strip().split('	')
    
            if current_word == None:
                current_word = word
    
            if current_word != word:
                for count in count_pool:
                    sum += count
                print "%s	%s" % (current_word, sum)
                current_word = word
                count_pool = []
                sum = 0
    
            count_pool.append(int(val))
    
        for count in count_pool:
            sum += count
        print "%s	%s" % (current_word, str(sum))
    
    
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        f
    HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
    
    INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
    OUTPUT_PATH="/output_cachearchive_broadcast"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
    
    # Step 1.
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py mapper_func WH.gz" 
        -reducer "python red.py reduer_func" 
        -jobconf "mapred.reduce.tasks=10" 
        -jobconf  "mapred.job.name=cachefile_demo" 
        -jobconf  "mapred.compress.map.output=true" 
        -jobconf  "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
        -jobconf  "mapred.output.compress=true" 
        -jobconf  "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" 
        -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" 
        -file "./map.py" 
        -file "./red.
  • 相关阅读:
    shiro中 UnknownAccountException
    shiro Filter--拦截器
    java构造器执行顺序一个有趣的简单实例
    Java Serializable接口(序列化)理解及自定义序列化
    js中绑定事件处理函数,使用event以及传递额外数据
    js中的this
    jQuery + ashx 实现图片按比例预览、异步上传及显示
    asp.net中的参数传递:Context.Handler 的用法
    javascript 对象详解
    ashx 文件的运用
  • 原文地址:https://www.cnblogs.com/taozizainali/p/8811893.html
Copyright © 2011-2022 走看看