zoukankan      html  css  js  c++  java
  • 大数据mapreduce二分法ip定位之Python实现

    ip定位数据大约12M,采用-chacheFile 分发

    文件来源https://pan.baidu.com/s/1J0pwTafHgt4T0k3vV_gC-A

    格式大致格式如下:

    0.0.0.0 0.255.255.255 NULL IANA保留地址 NULL
    1.0.0.0 1.0.0.255 亚洲 亚太地区 NULL
    1.0.1.0 1.0.1.255 亚洲 中国 福建
    1.0.2.0 1.0.3.255 亚洲 中国 福建
    1.0.4.0 1.0.7.255 大洋洲 澳大利亚 NULL
    1.0.8.0 1.0.15.255 亚洲 中国 广东
    1.0.16.0 1.0.31.255 亚洲 日本 NULL
    1.0.32.0 1.0.63.255 亚洲 中国 广东
    1.0.64.0 1.0.127.255 亚洲 日本 NULL
    1.0.128.0 1.0.255.255 亚洲 泰国 NULL

    用户数据是应该是大量的,这个用于数据输入:

    文件来源:https://pan.baidu.com/s/1l6qqr9U2YObl_pyM4r3VjA

    数据大致格式如下:

    ECEE8FBBBB      113.224.76.226
    ED38780B1D      106.36.217.145
    120BB4FB44      113.109.42.83
    9D4EC87B4B      219.153.212.31
    AF0E43C785      111.77.229.40
    4AAAEB560B      60.13.190.132
    53BAABADD8      124.167.254.130
    6C1FADFF90      60.27.188.251
    478A215D8C      101.47.19.207
    0000000000      106.114.74.120
    A7FDC270DF      112.26.70.229
    1714BE65F3      36.63.12.44
    0EFF6D7DCC      218.75.123.226
    0000000000      211.161.248.231

    map.py思路,每个计算节点都将ip定位数据加载到内存中,然后输入部分用户数据,进行分析,代码如下:

    #!/usr/bin/python
    ip_convert = lambda x:sum([256**j*int(i) for j,i in enumerate(x.split('.')[::-1])])#将ip转换为数字
    def load_ip_lib_func(ip_lib_fd):#将ip定位信息加载到内存
        ip_lib_list = []
        file_in = open(ip_lib_fd, 'r')
        for line in file_in:
            ss = line.strip().split(' ')
            if len(ss) != 5:
                continue
            start_ip = ss[0].strip()
            end_ip = ss[1].strip()
            area = ss[2].strip()
            country = ss[3].strip()
            province = ss[4].strip()
            ip_lib_list.append((ip_convert(start_ip), ip_convert(end_ip), area, country, province))
        return ip_lib_list
    
    def get_addr(ip_lib_list, ip_str):#二分查找获取地址信息
        ip_num = ip_convert(ip_str)
        low_index = 0
        mid_index = 0
        high_index = len(ip_lib_list) - 1
        while (low_index < high_index):
            mid_index = (low_index + high_index) / 2
            sss = ip_lib_list[mid_index]
            start_ip = sss[0]
            end_ip = sss[1]
            provice = sss[4].strip()
            if ip_num < start_ip:
                high_index = mid_index - 1
            elif ip_num > start_ip:
                low_index = mid_index + 1
        if ip_num < start_ip:
            provice = ip_lib_list[mid_index-1][4]
        else:
            provice = ip_lib_list[mid_index][4]
        return provice
    def mapper_func(ip_lib_fd):
        ip_lib_list = load_ip_lib_func(ip_lib_fd)
    
        for line in sys.stdin:
            ss = line.strip().split('	')
            if len(ss) != 2:
                continue
    
            cookie = ss[0].strip()
            ip_str = ss[1].strip()
    
            user_addr = get_addr(ip_lib_list, ip_str)
            print '	'.join([cookie, ip_str, user_addr])
    
    
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        func(*args)

    run.sh如下:

    HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
    
    INPUT_FILE_PATH_1="/mapreduce/ip/cookie_ip.txt"
    OUTPUT_PATH="/mapreduce/ip/out"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
    
    # Step 1.
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py mapper_func ABC" 
        -reducer "cat" 
        -jobconf "mapred.reduce.tasks=5" 
        -jobconf "mapred.map.tasks=5" 
        -jobconf "mapreduce.reduce.memory.mb=5000" 
        -jobconf  "mapred.job.name=ip_lib_demo" 
        -cacheFile "hdfs://master:9000/mapreduce/ip/ip.lib.txt#ABC" 
        -file "./map.py"

  • 相关阅读:
    基于深度学习的目标检测
    Redmine发送邮件
    用7次比较完成5个元素的排序
    在GEM5模拟器运行时,对Kill命令的使用
    GDB中的backtrace命令
    [译]如何定义python源文件的文件编码
    QEMU ELF_LOAER分析[基于MIPS]
    if语句的数据驱动优化(Java版)
    解决idea中Activiti的bpmn编辑器的中文乱码问题
    最简易的PHP Storm调试模式开启方式
  • 原文地址:https://www.cnblogs.com/students/p/8858320.html
Copyright © 2011-2022 走看看