zoukankan      html  css  js  c++  java
  • Hive Python Streaming的原理及写法

    在Hive中,须要实现Hive中的函数无法实现的功能时,就能够用Streaming来实现。

    其原理能够理解成:用HQL语句之外的语言,如Python、Shell来实现这些功能,同一时候配合HQL语句,以实现特殊的功能。

    比方,我有一张不同站点訪问的日志表,当中有两个列是url和ref。分别代表当前訪问的网址和来源地址,我想要查看用户的来源。即看用户都是从那些站点跳到这些站点上去的。这里有些站点可能域名是二级甚至三级域名,可是这些实际上是属于其顶级域名的。

    所以要找出其顶级域名。

    这个在Hive的自己定义函数中是无法实现的。所以採用Streaming的方式来实现。


    准备表和数据

    --建表
    drop table if exists view_all;
    create table if not exists view_all(
    ad STRING,ua STRING,ip STRING,cookie STRING,url STRING,ref STRING,
    action STRING,value STRING,timestamp STRING)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '	'
    STORED AS TEXTFILE;
    --导入日志数据
    load data local inpath 'view_all' overwrite into table view_all;


    Python实现获取顶级域名和过滤站内跳转

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    '''
    Created on 2014年8月6日
    @author: zhaohf
    '''
    import sys
    import urlparse
    import re
    
    topHostPostfix = (
        '.com',
        '.net',
        '.org',
        '.cn',
        '.com.cn'
    )
    #获取顶级域名
    def get_top_host(url):
        parts = urlparse.urlparse(url)
        host = parts.netloc
        extractPattern = r'[^.]+('+'|'.join([h.replace('.',r'.') for h in topHostPostfix])+')$'
        pattern = re.compile(extractPattern,re.IGNORECASE)
        m = pattern.search(host)
        return m.group() if m else host
    
    
    #排除站内跳转
    if __name__ == '__main__':
        for line in sys.stdin:
            try:
                line = line.strip()
                li = line.split('	')
                url = li[4]
                ref = li[5]
                url_top_host = get_top_host(url)
                ref_top_host = get_top_host(ref)
                if url_top_host != ref_top_host:
                    li[4] = url_top_host
                    li[5] = ref_top_host
                    newline = ''
                    for l in li:
                        newline += '	'+l
                    print newline.strip()
                else:
                    pass
            except Exception,err:
                continue


    调用python 脚本
    --排除站内跳转
    drop table if exists view_from;
    CREATE TABLE if not exists view_from like view_from;
    
    ADD FILE mapper.py;
    INSERT OVERWRITE TABLE view_from
    SELECT TRANSFORM(ad,ua,ip,cookie,url,ref,action,value,timestamp)
    USING 'python mapper.py'
    AS (ad,ua,ip,cookie,url,ref,action,value,timestamp)
    FROM view_all
    ;
    

    亦能够将hql语句保存成文件,使用hive -f query.hql 来调用。

    当中须要注意细节,比方__main__被写成了 __mian__。好痛苦!



  • 相关阅读:
    Day-10: 错误、调试和测试
    Day-9: 面对对象高级编程
    json文件解析
    sqlite3入门之sqlite3_get_table,sqlite3_free_table
    sqlite3入门之sqlite3_open,sqlite3_exec,slite3_close
    字符集编码与字符大小
    让ubuntu下的eclipse支持GBK编码
    使用virtualbox安装unbuntu开启共享文件夹时遇到的权限问题
    QT--信号与槽
    QT--初识
  • 原文地址:https://www.cnblogs.com/cxchanpin/p/6733211.html
Copyright © 2011-2022 走看看