zoukankan      html  css  js  c++  java
  • hive自定义函数——hive streaming

    Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,Streaming方式是基于Unix系统的标准输入输出来进行MapReduce Job的运行。

    任何支持标准输入输出特性的编程语言都可以使用Streaming方式来实现MapReduce Job,基本原理就是输入从Unix系统标准输入,输出使用Unix系统的标准输出。

    Streaming的实现需要TRANSFORM()函数和USING关键字,TRANSFORM()的参数是表的列名,USING关键字用于指定脚本

    注意:

    先将脚本add file 进来

    比如WordCount功能:

    1、使用Python实现Mapper,代码文件为word_count_mapper.py,代码如下所示:

    1    #!/usr/bin/env python
    2     
    3    import sys
    4     
    5    for line in sys.stdin:
    6        line = line.strip()
    7        words = filter(lambda word: word, line.split())
    8        for word in words:
    9            print '%s\t%s' % (word, 1)
     

    2、使用Python实现Reducer,代码文件为word_count_reducer.py,代码如下所示:

        #!/usr/bin/env python
         
        import sys
        from operator import itemgetter
         
        wc_dict = {}
         
        for line in sys.stdin:
            line = line.strip()
            word, count = line.split()
            try:
                count = int(count)
                wc_dict[word] = wc_dict.get(word, 0) + count
            except ValueError:
                pass
         
        sorted_dict = sorted(wc_dict.items(), key=itemgetter(0))
        for word, count in sorted_dict:
            print '%s\t%s' % (word, count)
     
     

     3、输出统计:

     

    add file /home/hadoop/test928/wc_map.py /home/hadoop/test928/wc_reduce.py;
    select transform(wc.word,wc.count) using 'python wc_reduce.py' as word ,count from (select transform(line) using 'python wc_map.py' as word, count from docs) wc;

    (转)hive streaming 使用的时候的一些心得


    hive streaming 报错的解决方案:
    1、把使用到hive streaming 的sql 分解,例如:select transform a,b,c,d using 'python cc.py' as (e,f) from table,分解成:select a,b,c,d from table ,然后执行:  hive -e "select a,b,c,d from table" | python cc.py,这样如果是语法有问题的话就会检查出来。
    2、查看是否是编码问题:如果你的sql中要使用:using “cc.py” 那么如果python脚本有编码问题的话,就会无法执行并报错。监测是否有问题的方法是:chmod 777 cc.py,然后用./cc.py执行脚本,如果报错,那么sql中肯定也报错,解决方法就是用 using “python cc.py”
    3、如果你的集群配置了日志收集,那么查看问题就方便多了,直接去执行sql的hdfs 上面的application目录下面查看就可以。
    4、在python 脚本中加上 import traceback ; except Exception,e: print traceback.format_exc()




  • 相关阅读:
    Cents 7 Kubernetes
    Docker registry
    centos 7 安装 docker
    ToList()所带来的性能影响
    C#之Linq、where()、FindAll()的区别
    2.2 数据库高速缓冲区
    ORACLE之autotrace使用
    spring.net简介
    初识批处理
    TIBCO Rendezvous — 技术介绍
  • 原文地址:https://www.cnblogs.com/qiuhong10/p/7598523.html
Copyright © 2011-2022 走看看