zoukankan      html  css  js  c++  java
  • 大数据mapreduce二分法ip定位之Python实现

    ip定位数据大约12M,采用-chacheFile 分发

    文件来源https://pan.baidu.com/s/1J0pwTafHgt4T0k3vV_gC-A

    格式大致格式如下:

    0.0.0.0 0.255.255.255 NULL IANA保留地址 NULL
    1.0.0.0 1.0.0.255 亚洲 亚太地区 NULL
    1.0.1.0 1.0.1.255 亚洲 中国 福建
    1.0.2.0 1.0.3.255 亚洲 中国 福建
    1.0.4.0 1.0.7.255 大洋洲 澳大利亚 NULL
    1.0.8.0 1.0.15.255 亚洲 中国 广东
    1.0.16.0 1.0.31.255 亚洲 日本 NULL
    1.0.32.0 1.0.63.255 亚洲 中国 广东
    1.0.64.0 1.0.127.255 亚洲 日本 NULL
    1.0.128.0 1.0.255.255 亚洲 泰国 NULL

    用户数据是应该是大量的,这个用于数据输入:

    文件来源:https://pan.baidu.com/s/1l6qqr9U2YObl_pyM4r3VjA

    数据大致格式如下:

    ECEE8FBBBB      113.224.76.226
    ED38780B1D      106.36.217.145
    120BB4FB44      113.109.42.83
    9D4EC87B4B      219.153.212.31
    AF0E43C785      111.77.229.40
    4AAAEB560B      60.13.190.132
    53BAABADD8      124.167.254.130
    6C1FADFF90      60.27.188.251
    478A215D8C      101.47.19.207
    0000000000      106.114.74.120
    A7FDC270DF      112.26.70.229
    1714BE65F3      36.63.12.44
    0EFF6D7DCC      218.75.123.226
    0000000000      211.161.248.231

    map.py思路,每个计算节点都将ip定位数据加载到内存中,然后输入部分用户数据,进行分析,代码如下:

    #!/usr/bin/python
    ip_convert = lambda x:sum([256**j*int(i) for j,i in enumerate(x.split('.')[::-1])])#将ip转换为数字
    def load_ip_lib_func(ip_lib_fd):#将ip定位信息加载到内存
        ip_lib_list = []
        file_in = open(ip_lib_fd, 'r')
        for line in file_in:
            ss = line.strip().split(' ')
            if len(ss) != 5:
                continue
            start_ip = ss[0].strip()
            end_ip = ss[1].strip()
            area = ss[2].strip()
            country = ss[3].strip()
            province = ss[4].strip()
            ip_lib_list.append((ip_convert(start_ip), ip_convert(end_ip), area, country, province))
        return ip_lib_list
    
    def get_addr(ip_lib_list, ip_str):#二分查找获取地址信息
        ip_num = ip_convert(ip_str)
        low_index = 0
        mid_index = 0
        high_index = len(ip_lib_list) - 1
        while (low_index < high_index):
            mid_index = (low_index + high_index) / 2
            sss = ip_lib_list[mid_index]
            start_ip = sss[0]
            end_ip = sss[1]
            provice = sss[4].strip()
            if ip_num < start_ip:
                high_index = mid_index - 1
            elif ip_num > start_ip:
                low_index = mid_index + 1
        if ip_num < start_ip:
            provice = ip_lib_list[mid_index-1][4]
        else:
            provice = ip_lib_list[mid_index][4]
        return provice
    def mapper_func(ip_lib_fd):
        ip_lib_list = load_ip_lib_func(ip_lib_fd)
    
        for line in sys.stdin:
            ss = line.strip().split('	')
            if len(ss) != 2:
                continue
    
            cookie = ss[0].strip()
            ip_str = ss[1].strip()
    
            user_addr = get_addr(ip_lib_list, ip_str)
            print '	'.join([cookie, ip_str, user_addr])
    
    
    
    if __name__ == "__main__":
        module = sys.modules[__name__]
        func = getattr(module, sys.argv[1])
        args = None
        if len(sys.argv) > 1:
            args = sys.argv[2:]
        func(*args)

    run.sh如下:

    HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
    STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
    
    INPUT_FILE_PATH_1="/mapreduce/ip/cookie_ip.txt"
    OUTPUT_PATH="/mapreduce/ip/out"
    
    $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
    
    # Step 1.
    $HADOOP_CMD jar $STREAM_JAR_PATH 
        -input $INPUT_FILE_PATH_1 
        -output $OUTPUT_PATH 
        -mapper "python map.py mapper_func ABC" 
        -reducer "cat" 
        -jobconf "mapred.reduce.tasks=5" 
        -jobconf "mapred.map.tasks=5" 
        -jobconf "mapreduce.reduce.memory.mb=5000" 
        -jobconf  "mapred.job.name=ip_lib_demo" 
        -cacheFile "hdfs://master:9000/mapreduce/ip/ip.lib.txt#ABC" 
        -file "./map.py"

  • 相关阅读:
    【锁】java 锁的技术内幕
    【BlockingQueue】BlockingQueue 阻塞队列实现
    【多线程】获取多个线程任务执行完事件
    【spring cloud】源码分析(一)
    【spring boot】FilterRegistrationBean介绍
    【FAQ】服务下线
    解决org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)...
    实现人民币大写代码解析
    application.yml使用@符合问题:'@' that cannot start any token. (Do not use @ for indentation)
    Maven常见异常及解决方法---测试代码编译错误
  • 原文地址:https://www.cnblogs.com/students/p/8858320.html
Copyright © 2011-2022 走看看