zoukankan      html  css  js  c++  java
  • Hadoop Python MapReduce

    环境:Linux + hadoop python3

    需要注意python不同版本的语法;

    解决的问题:对文本文件进行词频统计;

    hadoop mapreduce计算流程

    inputdata->HDFS ->datasplit ->map-(shuffer&sort)->reudce->output(HDFS)

    任意文本文件

    需要准备的脚本:

    map.py reduce.py run.sh

    (base) [root@pyspark mapreduce]# cat map.py
    import sys

    for line in sys.stdin:
    wordlist = line.strip().split(' ')
    for word in wordlist:
    print(' '.join([word.strip(),'1']))

    --------------------------------------------------------------------------

    (base) [root@pyspark mapreduce]# cat reduce.py
    import sys

    cur_word = None

    sum = 0
    for line in sys.stdin:
    wordlist =line.strip().split(' ')
    if len(wordlist) !=2:
    continue
    word,cnt = wordlist

    if cur_word == None:
    cur_word = word
    if cur_word != word:
    print(' '.join([cur_word,str(sum)]))
    cur_word = word
    sum = 0
    sum +=int(cnt)

    print(' '.join([cur_word,str(sum)]))

    ----------------------------------------------------------------

    (base) [root@pyspark wordcount]# cat run.sh

    #!/root/anaconda3/bin/python


    HADOOP_CMD="/root/hadoop/hadoop-2.9.2/bin/hadoop"
    STREAM_JAR_PATH="/root/hadoop/hadoop-2.9.2/hadoop-streaming-2.9.2.jar"

    INPUT_FILE_PATH_1="/1.data"

    OUTPUT_PATH="/output"

    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

    $HADOOP_CMD jar $STREAM_JAR_PATH
    -input $INPUT_FILE_PATH_1
    -output $OUTPUT_PATH
    -mapper "python maptest.py"
    -reducer "python reducetest.py"
    -file ./maptest.py
    -file ./reducetest.py

    -----------------------------------------------------------------

    (base) [root@pyspark wordcount]# sh run.sh
    rmr: DEPRECATED: Please use '-rm -r' instead.
    Deleted /output
    19/11/10 23:49:42 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
    packageJobJar: [./maptest.py, ./reducetest.py, /tmp/hadoop-unjar5889192628432952141/] [] /tmp/streamjob7228039301823824466.jar tmpDir=null
    19/11/10 23:49:45 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    19/11/10 23:49:45 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    19/11/10 23:49:47 INFO mapred.FileInputFormat: Total input files to process : 1
    19/11/10 23:49:48 INFO mapreduce.JobSubmitter: number of splits:2
    19/11/10 23:49:48 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
    19/11/10 23:49:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573374330013_0006
    19/11/10 23:49:49 INFO impl.YarnClientImpl: Submitted application application_1573374330013_0006
    19/11/10 23:49:49 INFO mapreduce.Job: The url to track the job: http://pyspark:8088/proxy/application_1573374330013_0006/
    19/11/10 23:49:49 INFO mapreduce.Job: Running job: job_1573374330013_0006
    19/11/10 23:50:04 INFO mapreduce.Job: Job job_1573374330013_0006 running in uber mode : false
    19/11/10 23:50:04 INFO mapreduce.Job: map 0% reduce 0%
    19/11/10 23:50:18 INFO mapreduce.Job: map 50% reduce 0%
    19/11/10 23:50:19 INFO mapreduce.Job: map 100% reduce 0%
    19/11/10 23:50:30 INFO mapreduce.Job: map 100% reduce 100%
    19/11/10 23:50:32 INFO mapreduce.Job: Job job_1573374330013_0006 completed successfully
    19/11/10 23:50:33 INFO mapreduce.Job: Counters: 50
    File System Counters
    FILE: Number of bytes read=3229845
    FILE: Number of bytes written=7065383
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=1900885
    HDFS: Number of bytes written=183609
    HDFS: Number of read operations=9
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    Job Counters
    Killed map tasks=1
    Launched map tasks=2
    Launched reduce tasks=1
    Data-local map tasks=2
    Total time spent by all maps in occupied slots (ms)=24785
    Total time spent by all reduces in occupied slots (ms)=9853
    Total time spent by all map tasks (ms)=24785
    Total time spent by all reduce tasks (ms)=9853
    Total vcore-milliseconds taken by all map tasks=24785
    Total vcore-milliseconds taken by all reduce tasks=9853
    Total megabyte-milliseconds taken by all map tasks=25379840
    Total megabyte-milliseconds taken by all reduce tasks=10089472
    Map-Reduce Framework
    Map input records=8598
    Map output records=335454
    Map output bytes=2558931
    Map output materialized bytes=3229851
    Input split bytes=168
    Combine input records=0
    Combine output records=0
    Reduce input groups=16985
    Reduce shuffle bytes=3229851
    Reduce input records=335454
    Reduce output records=16984
    Spilled Records=670908
    Shuffled Maps =2
    Failed Shuffles=0
    Merged Map outputs=2
    GC time elapsed (ms)=687
    CPU time spent (ms)=10610
    Physical memory (bytes) snapshot=731275264
    Virtual memory (bytes) snapshot=6418624512
    Total committed heap usage (bytes)=504365056
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters
    Bytes Read=1900717
    File Output Format Counters
    Bytes Written=183609
    19/11/10 23:50:33 INFO streaming.StreamJob: Output directory: /output

    ---------------------------------------------------------------------------------------------------

    Hadoop Python MapReduce

    -----------------------------------------------------------------------------------------------

    (base) [root@pyspark wordcount]# hdfs dfs -ls /output
    Found 2 items
    -rw-r--r-- 1 root supergroup 0 2019-11-10 23:50 /output/_SUCCESS
    -rw-r--r-- 1 root supergroup 183609 2019-11-10 23:50 /output/part-00000

    ============================================================

    文本数据,mapper.py,reducer.py.run.sh

    (base) [root@pyspark stock]# head 300340.csv
    2016-12-30,-1
    2016-12-29,0
    2016-12-28,-1
    2016-12-27,-2
    2016-12-26,-4
    2016-12-23,-4
    2016-12-22,-3
    2016-12-21,1
    2016-12-20,3
    2016-12-19,-2

    --------------------------------------------------------------------------------------------------------

    maper.py

    (base) [root@pyspark stock]# cat mapper.py
    #! /usr/bin/python

    import sys

    for line in sys.stdin:
    line = line.strip()
    date, change = line.split(',')
    change = int(change)
    print('%s,%d' % (date, change))

    -----------------------------------------------------------------------------------------------------

    (base) [root@pyspark stock]# cat reducer.py
    #! /usr/bin/python

    import sys

    stat = {}
    for line in sys.stdin:
    line = line.strip()
    date, change = line.split(',')
    change = int(change)
    if change not in stat:
    stat[change] = 0
    stat[change] += 1
    for k, v in sorted(stat.items()):
    if k >= 0:
    print('+++ %d%%-%d%% for %d days' % (k, k + 1, v))
    else:
    print('--- %d%%-%d%% for %d days' % (-k, -(k - 1), v))

    --------------------------------------------------------------------------------------------------

    (base) [root@pyspark stock]# cat run.sh

    #!/root/anaconda3/bin/python
    HADOOP_CMD="/root/hadoop/hadoop-2.9.2/bin/hadoop"
    STREAM_JAR_PATH="/root/hadoop/hadoop-2.9.2/hadoop-streaming-2.9.2.jar"

    INPUT_FILE_PATH_1="/stock_input/300340.csv"

    OUTPUT_PATH="/stock_output"

    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

    $HADOOP_CMD jar $STREAM_JAR_PATH
    -input $INPUT_FILE_PATH_1
    -output $OUTPUT_PATH
    -mapper "python mapper.py"
    -reducer "python reducer.py"
    -file ./mapper.py
    -file ./reducer.py

    ------------------------------------------------------------------------------------------------

    (base) [root@pyspark stock]# sh run.sh
    rmr: DEPRECATED: Please use '-rm -r' instead.
    Deleted /stock_output
    19/11/11 00:02:06 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
    packageJobJar: [./mapper.py, ./reducer.py, /tmp/hadoop-unjar8953104380979231231/] [] /tmp/streamjob1591230460326040415.jar tmpDir=null
    19/11/11 00:02:09 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    19/11/11 00:02:10 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    19/11/11 00:02:12 INFO mapred.FileInputFormat: Total input files to process : 1
    19/11/11 00:02:12 INFO mapreduce.JobSubmitter: number of splits:2
    19/11/11 00:02:13 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
    19/11/11 00:02:14 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573374330013_0007
    19/11/11 00:02:15 INFO impl.YarnClientImpl: Submitted application application_1573374330013_0007
    19/11/11 00:02:15 INFO mapreduce.Job: The url to track the job: http://pyspark:8088/proxy/application_1573374330013_0007/
    19/11/11 00:02:15 INFO mapreduce.Job: Running job: job_1573374330013_0007
    19/11/11 00:02:30 INFO mapreduce.Job: Job job_1573374330013_0007 running in uber mode : false
    19/11/11 00:02:30 INFO mapreduce.Job: map 0% reduce 0%
    19/11/11 00:02:43 INFO mapreduce.Job: map 100% reduce 0%
    19/11/11 00:02:55 INFO mapreduce.Job: map 100% reduce 100%
    19/11/11 00:02:57 INFO mapreduce.Job: Job job_1573374330013_0007 completed successfully
    19/11/11 00:02:57 INFO mapreduce.Job: Counters: 49
    File System Counters
    FILE: Number of bytes read=2703
    FILE: Number of bytes written=611093
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=3517
    HDFS: Number of bytes written=475
    HDFS: Number of read operations=9
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    Job Counters
    Launched map tasks=2
    Launched reduce tasks=1
    Data-local map tasks=2
    Total time spent by all maps in occupied slots (ms)=21325
    Total time spent by all reduces in occupied slots (ms)=10182
    Total time spent by all map tasks (ms)=21325
    Total time spent by all reduce tasks (ms)=10182
    Total vcore-milliseconds taken by all map tasks=21325
    Total vcore-milliseconds taken by all reduce tasks=10182
    Total megabyte-milliseconds taken by all map tasks=21836800
    Total megabyte-milliseconds taken by all reduce tasks=10426368
    Map-Reduce Framework
    Map input records=162
    Map output records=162
    Map output bytes=2373
    Map output materialized bytes=2709
    Input split bytes=200
    Combine input records=0
    Combine output records=0
    Reduce input groups=162
    Reduce shuffle bytes=2709
    Reduce input records=162
    Reduce output records=21
    Spilled Records=324
    Shuffled Maps =2
    Failed Shuffles=0
    Merged Map outputs=2
    GC time elapsed (ms)=984
    CPU time spent (ms)=7240
    Physical memory (bytes) snapshot=740073472
    Virtual memory (bytes) snapshot=6417063936
    Total committed heap usage (bytes)=502267904
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters
    Bytes Read=3317
    File Output Format Counters
    Bytes Written=475
    19/11/11 00:02:57 INFO streaming.StreamJob: Output directory: /stock_output

    -----------------------------------------------------------------------------------------------

    (base) [root@pyspark stock]# hdfs dfs -ls /stock_input/
    Found 1 items
    -rw-r--r-- 1 root supergroup 2211 2019-11-10 17:24 /stock_input/300340.csv

    ---------------------------------------------------------------------------------------------

    (base) [root@pyspark stock]# hdfs dfs -ls /stock_output/
    Found 2 items
    -rw-r--r-- 1 root supergroup 0 2019-11-11 00:02 /stock_output/_SUCCESS
    -rw-r--r-- 1 root supergroup 475 2019-11-11 00:02 /stock_output/part-00000

    -----------------------------------------------------------------------------------------------------

    单机调试(map,reduce分步调试):

    (base) [root@pyspark stock]# cat 300340.csv |python mapper.py |sort -k1|head
    2016-01-04,-10
    2016-01-05,-9
    2016-01-06,1
    2016-01-07,-10
    2016-01-08,-1
    2016-01-11,-10
    2016-01-12,-2
    2016-01-13,-6
    2016-01-14,3
    2016-01-15,-4

    (base) [root@pyspark stock]# cat 300340.csv |python mapper.py |sort -k1|python reducer.py
    --- 10%-11% for 5 days
    --- 9%-10% for 1 days
    --- 8%-9% for 3 days
    --- 7%-8% for 3 days
    --- 6%-7% for 6 days
    --- 5%-6% for 7 days
    --- 4%-5% for 7 days
    --- 3%-4% for 15 days
    --- 2%-3% for 15 days
    --- 1%-2% for 23 days
    +++ 0%-1% for 16 days
    +++ 1%-2% for 13 days
    +++ 2%-3% for 10 days
    +++ 3%-4% for 9 days
    +++ 4%-5% for 6 days
    +++ 5%-6% for 2 days
    +++ 6%-7% for 2 days
    +++ 7%-8% for 1 days
    +++ 8%-9% for 1 days
    +++ 9%-10% for 2 days
    +++ 10%-11% for 15 days

     ===========================================

    (base) [root@pyspark join]# cat dept.csv salary.csv |python mapper.py |sort
    001,Tom,10,20000,,
    002,Jenny,20,15000,,
    003,Jack,10,30000,,
    004,Ray,10,25000,,
    005,Ben,30,20000,,
    006,Marry,20,1500,,
    ,,,,10,Sales
    ,,,,20,HR
    ,,,,30,IT
    (base) [root@pyspark join]# cat dept.csv salary.csv |python mapper.py |sort|python reducer.py
    Total salary of Sales department is 75000
    Total salary of HR department is 16500
    Total salary of IT department is 20000

    -------------------------------------------------------------------------------------------------------------------------

    (base) [root@pyspark join]# sh run.sh
    rmr: DEPRECATED: Please use '-rm -r' instead.
    Deleted /join_output
    19/11/11 01:17:16 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
    packageJobJar: [./mapper.py, ./reducer.py, /tmp/hadoop-unjar8080219425220234999/] [] /tmp/streamjob5944376193115326916.jar tmpDir=null
    19/11/11 01:17:20 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    19/11/11 01:17:22 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    19/11/11 01:17:25 INFO mapred.FileInputFormat: Total input files to process : 2
    19/11/11 01:17:25 INFO mapreduce.JobSubmitter: number of splits:3
    19/11/11 01:17:25 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
    19/11/11 01:17:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573374330013_0008
    19/11/11 01:17:27 INFO impl.YarnClientImpl: Submitted application application_1573374330013_0008
    19/11/11 01:17:27 INFO mapreduce.Job: The url to track the job: http://pyspark:8088/proxy/application_1573374330013_0008/
    19/11/11 01:17:27 INFO mapreduce.Job: Running job: job_1573374330013_0008
    19/11/11 01:17:45 INFO mapreduce.Job: Job job_1573374330013_0008 running in uber mode : false
    19/11/11 01:17:45 INFO mapreduce.Job: map 0% reduce 0%
    19/11/11 01:18:04 INFO mapreduce.Job: map 33% reduce 0%
    19/11/11 01:18:06 INFO mapreduce.Job: map 67% reduce 0%
    19/11/11 01:18:07 INFO mapreduce.Job: map 100% reduce 0%
    19/11/11 01:18:18 INFO mapreduce.Job: map 100% reduce 100%
    19/11/11 01:18:19 INFO mapreduce.Job: Job job_1573374330013_0008 completed successfully
    19/11/11 01:18:19 INFO mapreduce.Job: Counters: 50
    File System Counters
    FILE: Number of bytes read=184
    FILE: Number of bytes written=807913
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=465
    HDFS: Number of bytes written=123
    HDFS: Number of read operations=12
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    Job Counters
    Killed map tasks=1
    Launched map tasks=3
    Launched reduce tasks=1
    Data-local map tasks=3
    Total time spent by all maps in occupied slots (ms)=55577
    Total time spent by all reduces in occupied slots (ms)=10061
    Total time spent by all map tasks (ms)=55577
    Total time spent by all reduce tasks (ms)=10061
    Total vcore-milliseconds taken by all map tasks=55577
    Total vcore-milliseconds taken by all reduce tasks=10061
    Total megabyte-milliseconds taken by all map tasks=56910848
    Total megabyte-milliseconds taken by all reduce tasks=10302464
    Map-Reduce Framework
    Map input records=9
    Map output records=9
    Map output bytes=160
    Map output materialized bytes=196
    Input split bytes=295
    Combine input records=0
    Combine output records=0
    Reduce input groups=9
    Reduce shuffle bytes=196
    Reduce input records=9
    Reduce output records=3
    Spilled Records=18
    Shuffled Maps =3
    Failed Shuffles=0
    Merged Map outputs=3
    GC time elapsed (ms)=1771
    CPU time spent (ms)=8150
    Physical memory (bytes) snapshot=1047699456
    Virtual memory (bytes) snapshot=8583450624
    Total committed heap usage (bytes)=699400192
    Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
    File Input Format Counters
    Bytes Read=170
    File Output Format Counters
    Bytes Written=123
    19/11/11 01:18:19 INFO streaming.StreamJob: Output directory: /join_output
    (base) [root@pyspark join]#

    -----------------------------------------------------------------------------------------------------

    base) [root@pyspark join]# hdfs dfs -ls /join_output
    Found 2 items
    -rw-r--r-- 1 root supergroup 0 2019-11-11 01:18 /join_output/_SUCCESS
    -rw-r--r-- 1 root supergroup 123 2019-11-11 01:18 /join_output/part-00000

    ----------------------------------------------------------------------------------------------------

    (base) [root@pyspark join]# hdfs dfs -text /join_output/part-00000
    Total salary of Sales department is 75000
    Total salary of HR department is 16500
    Total salary of IT department is 20000

  • 相关阅读:
    poj2328简单模拟
    一个简单的Silverlight的DataBinding和DateTemplate的Demo
    poj3468线段树_区间数字统计
    一个Silverlight的可视化图的DataBinding的Demo
    poj3321 dfs+树状数组
    C#拖拽控件
    【存档归纳】Sqlserver数据库详解 深度挖掘sqlserver帮助所得 一
    电脑蓝屏原因分析利器
    C# B/S程序如何获取客户端的MAC地址
    尚需研究之QQ音乐首页的图片轮换
  • 原文地址:https://www.cnblogs.com/songyuejie/p/11828592.html
Copyright © 2011-2022 走看看