zoukankan      html  css  js  c++  java
  • 海量数据处理ipTopN处理

    import com.google.common.base.Charsets;
    import com.google.common.base.Joiner;
    import com.google.common.base.Predicate;
    import com.google.common.base.Stopwatch;
    import com.google.common.collect.FluentIterable;
    import com.google.common.io.ByteSink;
    import com.google.common.io.Files;
    import com.sun.istack.internal.Nullable;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.io.FileUtils;
    import org.apache.commons.io.LineIterator;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.LinkedHashMap;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    
    import static lombok.Lombok.checkNotNull;
    
    /**
     * Created by edwin on 2019/4/29.
     * eg:海量日志数据,提取出访问前N次的IP信息
     * <p>
     * IP是32位的,地址最多有2^32=4G种取值情况,不能完全加载到内存中处理;
     * 采用"分而治之"的思想,按照IP地址的Hash(IP)/1024值,把海量IP日志分别切割存储到1024个小文件中,每个小文件最多包含4MB个IP地址;
     * 对于每一个小文件,可以构建一个IP为key,出现次数为value的HashMap,同时记录当前出现次数最多的那个IP地址;
     * 可以得到1024个小文件中的出现次数最多的IP,再依据常规的排序算法得到总体上出现次数最多TopN的IP;
     *
     * @author edwin
     */
    @Slf4j
    public class SimpleTopN {
    
        /***
         * 保存每个文件的ByteSink对象
         */
        private final Map<Integer, ByteSink> bufferedMap = new HashMap<Integer, ByteSink>();
    
        /***
         * 分隔文件-缓存每个小文件存放对象
         */
        private final Map<Integer, List<String>> dataMap = new HashMap<Integer, List<String>>();
    
        /***
         * 切割文件
         * 将源大文件切割成小文件,然后将值Hash到对应的小文件里
         * @param sourceFile   源文件
         * @param dataShardingPath  小文件分片路径
         * @param dataSharding 分片数量
         * @throws Exception
         */
        public void splitSharding(File sourceFile, String dataShardingPath, int dataSharding) throws Exception {
            checkNotNull(sourceFile, "sourceFile must not be null.");
            checkNotNull(dataShardingPath, "dataShardingPath must not be null.");
            checkNotNull(dataSharding, "dataSharding must not be null.");
            Stopwatch stopwatch = Stopwatch.createStarted();
            //创建小文件
            for (int i = 0; i < dataSharding; i++) {
                File file = new File(dataShardingPath + "shard_" + i + ".txt");
                if (!file.exists()) {
                    file.createNewFile();
                }
                bufferedMap.put(i, Files.asByteSink(file));
                dataMap.put(i, new LinkedList<String>());
            }
    
            //读取源文件
            //readBigDataFileByGuava(sourceFile,dataSharding);
    
            //读取源文件
            readBigDataFileByCommonsIO(sourceFile, dataSharding);
            long costTimes = stopwatch.elapsed(TimeUnit.MILLISECONDS);
            log.info("sharding file finish, total cost time:{} ms.", costTimes);
        }
    
        /***
         * Guava readLines 方式读取文件
         * <p>需全量读入内存,如果数据文件过大会造成内存溢出OutOfMemoryError</p>
         * @param sourceFile
         * @param dataSharding
         */
        private void readBigDataFileByGuava(File sourceFile, int dataSharding) {
            try {
                List<String> readLines = Files.readLines(sourceFile, Charsets.UTF_8);
                for (String ip : readLines) {
                    //按照IP地址的Hash(IPNode)%1024值,把整个大文件映射为1024个小文件
                    int fileIndex = hashCode(ip) % dataSharding;
                    List<String> list = dataMap.get(fileIndex);
                    list.add(ip + "
    ");
                    if (list.size() % 1000 == 0) {
                        //将数据写入文件
                        ByteSink byteSink = bufferedMap.get(fileIndex);
                        byteSink.write(Joiner.on(" ").join(list.toArray()).getBytes());
                    }
                }
            } catch (Exception e) {
                log.error("read(Guava readLines) file exception,msg:{}", e.getMessage(), e);
            }
        }
    
    
        /****
         * Apache Commons IO方式读取文件
         * <p>非全量读入内存,资源消耗小</p>
         * @param sourceFile 源文件
         * @param dataSharding 分片数
         */
        private void readBigDataFileByCommonsIO(File sourceFile, int dataSharding) {
            LineIterator it = null;
            try {
                //使用Apache Commons 自定义LineIterator处理IO流
                it = FileUtils.lineIterator(sourceFile, "UTF-8");
                while (it.hasNext()) {
                    //读取每一行数据
                    String ip = it.nextLine();
                    int fileIndex = hashCode(ip) % dataSharding;
                    List<String> list = dataMap.get(fileIndex);
                    list.add(ip + "
    ");
                    if (list.size() % 1000 == 0) {
                        //将数据写入文件
                        ByteSink byteSink = bufferedMap.get(fileIndex);
                        byteSink.write(Joiner.on(" ").join(list.toArray()).getBytes());
                    }
                }
            } catch (Exception e) {
                log.error("read(Apache Commons IO) file exception,msg:{}", e.getMessage(), e);
            } finally {
                LineIterator.closeQuietly(it);
            }
        }
    
    
        /***
         * 分析数据
         * @param dataShardingPath 数据文件目录
         * @param topNumber 访问前TopN值
         * @return
         * @throws Exception
         */
        private List<Map.Entry<String, Integer>> analysis(String dataShardingPath, int topNumber) throws Exception {
            checkNotNull(dataShardingPath, "dataShardingPath must not be null.");
            Stopwatch stopwatch = Stopwatch.createStarted();
            File shardingFile = new File(dataShardingPath);
            //获取Path下所有子目录
            //Iterable<File> childrens = Files.fileTreeTraverser().children(shardingFile);
            //获取Path目录下所有目录包含 preOrderTraversal(前序遍历)  postOrderTraversal(后序遍历)  breadthFirstTraversal(广度优先)
            FluentIterable<File> childrens = Files.fileTreeTraverser().breadthFirstTraversal(shardingFile).filter(new Predicate<File>() {
                @Override
                public boolean apply(@Nullable File file) {
                    //过滤analysis目录
                    return !file.getName().equals("analysis");
                }
            });
            log.info("scan sharding directory:{}, file total : {}", dataShardingPath, childrens.size());
            //存放每个小文件访问最多次数IP集合
            Map<String, Integer> collectMap = new HashMap<String, Integer>();
            for (File file : childrens) {
                //临时存放当前文件所有ip
                Map<String, Integer> tempMap = new HashMap<String, Integer>();
                List<String> readLines = Files.readLines(file, Charsets.UTF_8);
                for (String ip : readLines) {
                    ip = ip.replaceAll("
    |
    ", "").trim();
                    if (tempMap.containsKey(ip)) {
                        tempMap.put(ip, tempMap.get(ip) + 1);
                    } else {
                        tempMap.put(ip, 1);
                    }
                }
                //Collectors.toMap 直接返回排好序的map
                tempMap = tempMap.entrySet().stream()
                        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
                        .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue(), (x1, x2) -> x2, LinkedHashMap<String, Integer>::new));
    
                //获取分片访问次数最多的IP,并将其汇总到集合
                Map.Entry<String, Integer> entry = tempMap.entrySet().iterator().next();
                collectMap.put(entry.getKey(), entry.getValue());
            }
            //将Map转换为List
            List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(collectMap.entrySet());
            //倒序排列
            Collections.sort(list, (o1, o2) -> o2.getValue().compareTo(o1.getValue()));
            //取出TopN的IP信息
            List<Map.Entry<String, Integer>> limitList = list.stream().limit(topNumber).collect(Collectors.toList());
            long costTimes = stopwatch.elapsed(TimeUnit.MILLISECONDS);
            log.info("analysis file finish, total cost time:{} ms.", costTimes);
            return limitList;
        }
    
    
        /***
         * Hash算法
         * @param key
         * @return
         */
        private int hashCode(String key) {
            int hash;
            int i;
            for (hash = 0, i = 0; i < key.length(); ++i) {
                hash += key.charAt(i);
                hash += (hash << 10);
                hash ^= (hash >> 6);
            }
            hash += (hash << 3);
            hash ^= (hash >> 11);
            hash += (hash << 15);
            return Math.abs(hash);
        }
    
        /***
         * Hash算法2
         * @param key
         * @return
         */
        private final int hashCode2(Object key) {
            int h;
            return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
        }
    
    
        public void generateIpFiles(String filePathName, int ipCount) {
            try {
                Stopwatch stopwatch = Stopwatch.createStarted();
                File file = new File(filePathName);
                file.createNewFile();
                StringBuffer ipAddress = new StringBuffer();
                for (int i = 0; i < ipCount; i++) {
                    //文件追加多次I/O比较慢有性能问题,这里将每次生成的ip地址buffer起来,再一次写入文件
                    //根据自己的机器配置及需要生成的ip数量选择是否需要buffer,如果数据量过大会产生java.lang.OutOfMemoryError
                    //Files.append(generateRandomIp()+"
    ", file, Charsets.UTF_8);
                    ipAddress.append(generateRandomIp() + "
    ");
                }
                Files.write(ipAddress.toString(), file, Charsets.UTF_8);
                long time = stopwatch.elapsed(TimeUnit.MILLISECONDS);
                log.info("Generate ip finish, ip count:{} , total cost time:{} ms.", ipCount, time);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /***
         * 生成一个随机IP
         * Tips:
         * IP范围,IP地址是一个32位的二进制数,通常被分割为4个"8位二进制数"(也就是4个字节).
         * IP地址通常用"点分十进制"表示成(a.b.c.d)的形式,其中,a,b,c,d都是0~255之间的十进制整数。
         * 例:点分十进IP地址(100.4.5.6),实际上是32位二进制数(11000000.10100111.00010111.00111000)
         * @return
         */
        private String generateRandomIp() {
            int[][] range = {{607649792, 608174079}, // 36.56.0.0-36.63.255.255
                    {1038614528, 1039007743}, // 61.232.0.0-61.237.255.255
                    {1783627776, 1784676351}, // 106.80.0.0-106.95.255.255
                    {2035023872, 2035154943}, // 121.76.0.0-121.77.255.255
                    {2078801920, 2079064063}, // 123.232.0.0-123.235.255.255
                    {-1950089216, -1948778497}, // 139.196.0.0-139.215.255.255
                    {-1425539072, -1425014785}, // 171.8.0.0-171.15.255.255
                    {-1236271104, -1235419137}, // 182.80.0.0-182.92.255.255
                    {-770113536, -768606209}, // 210.25.0.0-210.47.255.255
                    {-569376768, -564133889}, // 222.16.0.0-222.95.255.255
            };
            Random random = new Random();
            int index = random.nextInt(10);
            String ip = convert2IpAddress(range[index][0] + new Random().nextInt(range[index][1] - range[index][0]));
            return ip;
        }
    
        /***
         * 将十进制转换成IP地址
         * @param ip
         * @return
         */
        private String convert2IpAddress(int ip) {
            int[] ipArray = new int[4];
            ipArray[0] = (int) ((ip >> 24) & 0xff);
            ipArray[1] = (int) ((ip >> 16) & 0xff);
            ipArray[2] = (int) ((ip >> 8) & 0xff);
            ipArray[3] = (int) (ip & 0xff);
            String realIp = Integer.toString(ipArray[0]) + "." + Integer.toString(ipArray[1]) + "." + Integer.toString(ipArray[2]) + "." + Integer.toString(ipArray[3]);
            return realIp;
        }
  • 相关阅读:
    出现java.lang.NoClassDefFoundError: org/apache/commons/collections/FastHashMap错误问题解决
    选择一个更合适的编程语言
    23.if结构简单应用
    java环境的配置-传送门
    Java课程继续更新说明
    go语言熟知的开源项目
    go语言关于值类型和引用类型
    go语言实现生产者-消费者
    http协议——无连接、无状态
    jenkins结合gitlab实现提交代码自动构建
  • 原文地址:https://www.cnblogs.com/zzq-include/p/13617937.html
Copyright © 2011-2022 走看看