zoukankan      html  css  js  c++  java
  • hadoop streaming编程小demo(python版)

    大数据团队搞数据质量评测。自动化质检和监控平台是用django,MR也是通过python实现的。(后来发现有orc压缩问题,python不知道怎么解决,正在改成java版本)

    这里展示一个python编写MR的例子吧。

    抄一句话:Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer。

    1、首先,先介绍一下背景,我们的数据是存放在hive里的。hive建表语句如下:

    我们将会解析元数据,和HDFS上的数据进行merge,方便处理。这里的partition_key用的是year/month/day。

    hive (gulfstream_ods)> desc g_order;
    OK
    col_name        data_type       comment
    order_id                bigint                  订单id                
    driver_id               bigint                  司机id,司机抢单前该值为0      
    driver_phone            string                  司机电话                
    passenger_id            bigint                  乘客id                
    passenger_phone         string                  乘客电话                
    car_id                  int                     接驾车辆id              
    area                    int                     城市id                
    district                string                  城市区号                
    type                    int                     订单时效,0 实时  1预约      
    current_lng             decimal(19,6)           乘客发单时的经度            
    current_lat             decimal(19,6)           乘客发单时的纬度            
    starting_name           string                  起点名称                
    starting_lng            decimal(19,6)           起点经度                
    starting_lat            decimal(19,6)           起点纬度                
    dest_name               string                  终点名称                
    dest_lng                decimal(19,6)           终点经度                
    dest_lat                decimal(19,6)           终点纬度                
    driver_start_distance   int                     司机与出发地的路面距离,单位:米    
    start_dest_distance     int                     出发地与终点的路面距离,单位:米    
    departure_time          string                  出发时间(预约单的预约时间,实时单为发单时间)
    strive_time             string                  抢单成功时间              
    consult_time            string                  协商时间                
    arrive_time             string                  司机点击‘我已到达’的时间       
    setoncar_time           string                  上车时间(暂时不用)          
    begin_charge_time       string                  司机点机‘开始计费’的时间       
    finish_time             string                  完成时间                
    year                    string                                      
    month                   string                                      
    day                     string                                      
                     
    # Partition Information          
    # col_name              data_type               comment             
                     
    year                    string                                      
    month                   string                                      
    day                     string              

    2、我们解析元数据

    这里是解析元数据的过程。之后我们把元数据序列化后存入文件desc.gulfstream_ods.g_order,我们将会将此配置文件连同MR脚本一起上传到hadoop集群。

    import subprocess
    from subprocess import Popen
    
    
    def desc_table(db, table):
        process = Popen('hive -e "desc %s.%s"' % (db, table),
                shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()
        is_column = True
        structure_list = list()
        column_list = list()
        for line in stdout.split('
    '):
            value_list = list()
            if not line or len(line.split()) < 2:
                break
            if is_column:
                column_list = line.split()
                is_column = False
                continue
            else:
                value_list = line.split()
            structure_dict = dict(zip(column_list, value_list))
            structure_list.append(structure_dict)
    
        return structure_list

    3、下面是hadoop streaming执行脚本。

    #!/bin/bash
    source /etc/profile
    source ~/.bash_profile

    #hadoop目录
    echo "HADOOP_HOME: "$HADOOP_HOME
    HADOOP="$HADOOP_HOME/bin/hadoop"

    DB=$1
    TABLE=$2
    YEAR=$3
    MONTH=$4
    DAY=$5
    echo $DB--$TABLE--$YEAR--$MONTH--$DAY

    if [ "$DB" = "gulfstream_ods" ]
    then
    DB_NAME="gulfstream"
    else
    DB_NAME=$DB
    fi
    TABLE_NAME=$TABLE

    #输入路径
    input_path="/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*"
    #标记文件后缀名
    input_mark="_SUCCESS"
    echo $input_path
    #输出路径
    output_path="/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY"
    output_mark="_SUCCESS"
    echo $output_path
    #性能约束参数
    capacity_mapper=500
    capacity_reducer=200
    map_num=10
    reducer_num=10
    queue_name="root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop"
    #启动job name
    job_name="DW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}"
    mapper="python mapper.py $DB $TABLE_NAME"
    reducer="python reducer.py"

    $HADOOP fs -rmr $output_path
    $HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar
    -jobconf mapred.job.name="$job_name"
    -jobconf mapred.job.queue.name=$queue_name
    -jobconf mapred.map.tasks=$map_num
    -jobconf mapred.reduce.tasks=$reducer_num
    -jobconf mapred.map.capacity=$capacity_mapper
    -jobconf mapred.reduce.capacity=$capacity_reducer
    -input $input_path
    -output $output_path
    -file ./mapper.py
    -file ./reducer.py
    -file ./utils.py
    -file ./"desc.${DB}.${TABLE_NAME}"
    -mapper "$mapper"
    -reducer "$reducer"
    if [ $? -ne 0 ]; then
    echo "$DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faild"
    fi
    $HADOOP fs -touchz "${output_path}/$output_mark"
    rm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}
    $HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}

     4、这里是Wordcount的进阶版本,第一个功能是分区域统计订单量,第二个功能是在一天中分时段统计订单量。

    mapper脚本

    # -*- coding:utf-8 -*-
    #!/usr/bin/env python
    import sys
    import json
    import pickle
    reload(sys)
    sys.setdefaultencoding('utf-8')
    
    
    # 将字段和元数据匹配, 返回迭代器
    def read_from_input(file, separator, columns):
        for line in file:
            if line is None or line == '':
                continue
            data_list = mapper_input(line, separator)
            if not data_list:
                continue
            item = None
            # 最后3列, 年月日作为partitionkey, 无用
            if len(data_list) == len(columns) - 3:
                item = dict(zip(columns, data_list))
            elif len(data_list) == len(columns):
                item = dict(zip(columns, data_list))
            if not item:
                continue
            yield item
    
    
    def index_columns(db, table):
        with open('desc.%s.%s' % (db, table), 'r') as fr:
            structure_list = deserialize(fr.read())
        return [column.get('col_name') for column in structure_list]
    
    
    # map入口
    def main(separator, columns):
        items = read_from_input(sys.stdin, separator, columns)
        mapper_result = {}
        for item in items:
            mapper_plugin_1(item, mapper_result)
            mapper_plugin_2(item, mapper_result)
    
    def mapper_plugin_1(item, mapper_result): # key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducer key = 'route1' area = item.get('area') district = item.get('district') order_id = item.get('order_id') if not area or not district or not order_id: return mapper_output(key, {'area': area, 'district': district, 'order_id': order_id, 'count': 1}) def mapper_plugin_2(item, mapper_result): key = 'route2' strive_time = item.get('strive_time') order_id = item.get('order_id') if not strive_time or not order_id: return try: day_hour = strive_time.split(':')[0] mapper_output(key, {'order_id': order_id, 'strive_time': strive_time, 'count': 1, 'day_hour': day_hour})except Exception, ex: pass def serialize(data, type='json'): if type == 'json': try: return json.dumps(data) except Exception, ex: return '' elif type == 'pickle': try: return pickle.dumps(data) except Exception, ex: return '' else: return '' def deserialize(data, type='json'): if type == 'json': try: return json.loads(data) except Exception, ex: return [] elif type == 'pickle': try: return pickle.loads(data) except Exception, ex: return [] else: return [] def mapper_input(line, separator=' '): try: return line.split(separator) except Exception, ex: return None def mapper_output(key, data, separator=' '): key = str(key) data = serialize(data) print '%s%s%s' % (key, separator, data) # print >> sys.stderr, '%s%s%s' % (key, separator, data) if __name__ == '__main__': db = sys.argv[1] table = sys.argv[2] columns = index_columns(db, table) main('||', columns)

    reducer脚本

    #!/usr/bin/env python
    # vim: set fileencoding=utf-8
    import sys
    reload(sys)
    sys.setdefaultencoding('utf-8')
    import json
    import pickle
    from itertools import groupby
    from operator import itemgetter
    
    
    def read_from_mapper(file, separator):
        for line in file:
            yield reducer_input(line)
    
    
    def main(separator='	'):
        reducer_result = {}
        line_list = read_from_mapper(sys.stdin, separator)
        for route_key, group in groupby(line_list, itemgetter(0)):
            if route_key is None:
                continue
            reducer_result.setdefault(route_key, {})
            if route_key == 'route1':
                reducer_plugin_1(route_key, group, reducer_result)
                reducer_output(route_key, reducer_result[route_key])
            if route_key == 'route2':
                reducer_plugin_2(route_key, group, reducer_result)
                reducer_output(route_key, reducer_result[route_key])

    def reducer_plugin_1(route_key, group, reducer_result): for _, data in group: if data is None or len(data) == 0: continue if not data.get('area') or not data.get('district') or not data.get('count'): continue key = '_'.join([data.get('area'), data.get('district')]) reducer_result[route_key].setdefault(key, 0) reducer_result[route_key][key] += int(data.get('count')) # print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key]) def reducer_plugin_2(route_key, group, reducer_result): for _, data in group: if data is None or len(data) == 0: continue if not data.get('order_id') or not data.get('strive_time') or not data.get('count') or not data.get('day_hour'): continue key = data.get('day_hour') reducer_result[route_key].setdefault(key, {}) reducer_result[route_key][key].setdefault('count', 0) reducer_result[route_key][key].setdefault('order_list', []) reducer_result[route_key][key]['count'] += int(data.get('count')) if len(reducer_result[route_key][key]['order_list']) < 100: reducer_result[route_key][key]['order_list'].append(data.get('order_id')) # print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])
    def serialize(data, type='json'): if type == 'json': try: return json.dumps(data) except Exception, ex: return '' elif type == 'pickle': try: return pickle.dumps(data) except Exception, ex: return '' else: return '' def deserialize(data, type='json'): if type == 'json': try: return json.loads(data) except Exception, ex: return [] elif type == 'pickle': try: return pickle.loads(data) except Exception, ex: return [] else: return [] def reducer_input(data, separator=' '): data_list = data.strip().split(separator, 2) key = data_list[0] data = deserialize(data_list[1]) return [key, data] def reducer_output(key, data, separator=' '): key = str(key) data = serialize(data) print '%s %s' % (key, data) # print >> sys.stderr, '%s %s' % (key, data) if __name__ == '__main__': main()

    5、上一个版本,遭遇了reduce慢的情况,原因有两个:一是因为route的设置,所有相同的route都将分发到同一个reducer,造成单个reducer处理压力大,性能下降。二是因为集群是搭建在虚拟机上的,性能本身就差。可以对这个问题进行改进。改进版本如下,方案是在mapper阶段先对数据进行初步的统计,缓解reducer的计算压力。

    mapper脚本

    # -*- coding:utf-8 -*-
    #!/usr/bin/env python
    import sys
    import json
    import pickle
    reload(sys)
    sys.setdefaultencoding('utf-8')
    
    
    # 将字段和元数据匹配, 返回迭代器
    def read_from_input(file, separator, columns):
        for line in file:
            if line is None or line == '':
                continue
            data_list = mapper_input(line, separator)
            if not data_list:
                continue
            item = None
            # 最后3列, 年月日作为partitionkey, 无用
            if len(data_list) == len(columns) - 3:
                item = dict(zip(columns, data_list))
            elif len(data_list) == len(columns):
                item = dict(zip(columns, data_list))
            if not item:
                continue
            yield item
    
    
    def index_columns(db, table):
        with open('desc.%s.%s' % (db, table), 'r') as fr:
            structure_list = deserialize(fr.read())
        return [column.get('col_name') for column in structure_list]
    
    
    # map入口
    def main(separator, columns):
        items = read_from_input(sys.stdin, separator, columns)
        mapper_result = {}
        for item in items:
            mapper_plugin_1(item, mapper_result)
            mapper_plugin_2(item, mapper_result)
    
        for route_key, route_value in mapper_result.iteritems():
            for key, value in route_value.iteritems():
                ret_dict = dict()
                ret_dict['route_key'] = route_key
                ret_dict['key'] = key
                ret_dict.update(value)
                mapper_output('route_total', ret_dict)
    
    
    def mapper_plugin_1(item, mapper_result):
        # key在现实中可以是不同appkey, 是用来分发到不同的reducer上的, 相同的route用来分发到相同的reducer
        key = 'route1'
        area = item.get('area')
        district = item.get('district')
        order_id = item.get('order_id')
        if not area or not district or not order_id:
            returntry:
            # total统计
            mapper_result.setdefault(key, {})
            mapper_result[key].setdefault('_'.join([area, district]), {})
            mapper_result[key]['_'.join([area, district])].setdefault('count', 0)
            mapper_result[key]['_'.join([area, district])].setdefault('order_id', [])
            mapper_result[key]['_'.join([area, district])]['count'] += 1
            if len(mapper_result[key]['_'.join([area, district])]['order_id']) < 10:
                mapper_result[key]['_'.join([area, district])]['order_id'].append(order_id)
        except Exception, ex:
            pass
    
    
    def mapper_plugin_2(item, mapper_result):
        key = 'route2'
        strive_time = item.get('strive_time')
        order_id = item.get('order_id')
        if not strive_time or not order_id:
            return
        try:
            day_hour = strive_time.split(':')[0]# total统计
            mapper_result.setdefault(key, {})
            mapper_result[key].setdefault(day_hour, {})
            mapper_result[key][day_hour].setdefault('count', 0)
            mapper_result[key][day_hour].setdefault('order_id', [])
            mapper_result[key][day_hour]['count'] += 1
            if len(mapper_result[key][day_hour]['order_id']) < 10:
                mapper_result[key][day_hour]['order_id'].append(order_id)
        except Exception, ex:
            pass
    
    
    def serialize(data, type='json'):
        if type == 'json':
            try:
                return json.dumps(data)
            except Exception, ex:
                return ''
        elif type == 'pickle':
            try:
                return pickle.dumps(data)
            except Exception, ex:
                return ''
        else:
            return ''
    
    
    def deserialize(data, type='json'):
        if type == 'json':
            try:
                return json.loads(data)
            except Exception, ex:
                return []
        elif type == 'pickle':
            try:
                return pickle.loads(data)
            except Exception, ex:
                return []
        else:
            return []
    
    
    def mapper_input(line, separator='	'):
        try:
            return line.split(separator)
        except Exception, ex:
            return None
    
    
    def mapper_output(key, data, separator='	'):
        key = str(key)
        data = serialize(data)
        print '%s%s%s' % (key, separator, data)
        # print >> sys.stderr, '%s%s%s' % (key, separator, data)
    
    
    if __name__ == '__main__':
        db = sys.argv[1]
        table = sys.argv[2]
        columns = index_columns(db, table)
        main('||', columns)

    reducer脚本

    #!/usr/bin/env python
    # vim: set fileencoding=utf-8
    import sys
    reload(sys)
    sys.setdefaultencoding('utf-8')
    import json
    import pickle
    from itertools import groupby
    from operator import itemgetter
    
    
    def read_from_mapper(file, separator):
        for line in file:
            yield reducer_input(line)
    
    
    def main(separator='	'):
        reducer_result = {}
        line_list = read_from_mapper(sys.stdin, separator)
        for route_key, group in groupby(line_list, itemgetter(0)):
            if route_key is None:
                continue
            reducer_result.setdefault(route_key, {})if route_key == 'route_total':
                reducer_total(route_key, group, reducer_result)
                reducer_output(route_key, reducer_result[route_key])
    
    
    def reducer_total(route_key, group, reducer_result):
        for _, data in group:
            if data is None or len(data) == 0:
                continue
            if data.get('route_key') == 'route1':
                reducer_result[route_key].setdefault(data.get('route_key'), {})
                reducer_result[route_key][data.get('key')].setdefault('count', 0)
                reducer_result[route_key][data.get('key')].setdefault('order_id', [])
                reducer_result[route_key][data.get('key')]['count'] += data.get('count')
                for order_id in data.get('order_id'):
                    if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:
                        reducer_result[route_key][data.get('key')]['order_id'].append(order_id)
            elif data.get('route_key') == 'route2':
                reducer_result[route_key].setdefault(data.get('route_key'), {})
                reducer_result[route_key][data.get('key')].setdefault('count', 0)
                reducer_result[route_key][data.get('key')].setdefault('order_id', [])
                reducer_result[route_key][data.get('key')]['count'] += data.get('count')
                for order_id in data.get('order_id'):
                    if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:
                        reducer_result[route_key][data.get('key')]['order_id'].append(order_id)
            else:
                pass
    
    
    def serialize(data, type='json'):
        if type == 'json':
            try:
                return json.dumps(data)
            except Exception, ex:
                return ''
        elif type == 'pickle':
            try:
                return pickle.dumps(data)
            except Exception, ex:
                return ''
        else:
            return ''
    
    
    def deserialize(data, type='json'):
        if type == 'json':
            try:
                return json.loads(data)
            except Exception, ex:
                return []
        elif type == 'pickle':
            try:
                return pickle.loads(data)
            except Exception, ex:
                return []
        else:
            return []
    
    
    def reducer_input(data, separator='	'):
        data_list = data.strip().split(separator, 2)
        key = data_list[0]
        data = deserialize(data_list[1])
        return [key, data]
    
    
    def reducer_output(key, data, separator='	'):
        key = str(key)
        data = serialize(data)
        print '%s	%s' % (key, data)
        # print >> sys.stderr, '%s	%s' % (key, data)
    
    
    if __name__ == '__main__':
        main()

    遇到的问题:

    1、The DiskSpace /user/bigdata/qa quota of  is exceeded

    在reducer结束后,遭遇如上问题,是因为HDFS  路径下的disk容量已经被沾满,释放容量即可;

  • 相关阅读:
    redis 笔记
    经验:什么影响了数据库查询速度、什么影响了MySQL性能 (转)
    对于线程安全的一些理解
    重要的接口需要做哪些检查(转)
    数据库分库分表思路
    代码优化:Java编码技巧之高效代码50例
    java new一个对象的过程中发生了什么
    java如何防止反编译(转)
    运用加密技术保护Java源代码(转)
    redis 工具包
  • 原文地址:https://www.cnblogs.com/kangoroo/p/6151104.html
Copyright © 2011-2022 走看看