版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
一、问题描述
这个问题在大数据面试中容易出现,问题如下:
有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M,要求返回频数最高的100个词。
二、思路
- 此处1G文件远远大于1M内存,分治法,先hash映射把大文件分成很多个小文件,具体操作如下:读文件中,对于每个词x,取hash(x)%5000,然后按照该值存到5000个小文件(记为f0,f1,...,f4999)中,这样每个文件大概是200k左右(每个相同的词一定被映射到了同一文件中);
- 对于每个文件fi,都用hash_map做词和出现频率的统计,取出频率大的前100个词(怎么取?topK问题,建立一个100个节点的最小堆),把这100个词和出现频率再单独存入一个文件;
- 根据上述处理,我们又得到了5000个文件,归并文件取出top100。
三、解法
1 产生了一个7G大文件,每行一个[0,100000]区间的整数
2 top n 求解:
- 大文件分成小文件:把这个7G左右的大文件,按照读入数字的hashcode值分成1024个小文件(每个文件平均最大就7M左右)
- 小文件统计:对每个小文件,可以用堆,hash,内部排序等等方法进行处理;
-
小文件的统计结果:再做一次统计 求出出现频数最高的那个数
四、代码
本例使用Java的HashMap
import java.io.*; import java.util.*; class IP implements Serializable { private static final long serialVersionUID = -8903000680469719698L; private String ip = ""; private int count; public IP(String ip2, Integer integer) { this.ip = ip2; this.count = integer; } public int getCount() { return count; } public String getIp() { return ip; } public void setCount(int count) { this.count = count; } public void setIp(String ip) { this.ip = ip; } } public class Data { static String fileLoc = "D:\hadoop\numsout\nums.txt"; /** * 将打文件hash成1024个小文件 * * @throws FileNotFoundException * @throws IOException */ private static void hashToSmallFiles() throws FileNotFoundException, IOException { BufferedReader br = new BufferedReader(new FileReader(fileLoc)); String ip = ""; HashMap<String, FileWriter> fileWriters = new HashMap<String, FileWriter>(); while ((ip = br.readLine()) != null) { int tmp = Math.abs(ip.hashCode() % 1024); String fileName = fileLoc + tmp + ".txt"; FileWriter fw = null; if (fileWriters.containsKey(fileName)) { fw = fileWriters.get(fileName); } else { fw = new FileWriter(fileName, true); fileWriters.put(fileName, fw); } fw.write(ip + " "); } br.close(); for (FileWriter ff : fileWriters.values()) { ff.close(); } } /** * 利用HashMap<> 统计每个小文件频数最高的ip; * @return 所有小文件的结果 组成List 返回 * @throws FileNotFoundException * @throws IOException */ private static List<IP> countEverySmallFile() throws FileNotFoundException, IOException { List<IP> list = new ArrayList<IP>(); for (int i = 0; i < 1024; i++) { File file = new File(fileLoc + i + ".txt"); if (file.exists()) { long startTime = System.currentTimeMillis(); BufferedReader br1 = new BufferedReader(new FileReader(file)); String ip1 = ""; HashMap<String, Integer> hm = new HashMap<String, Integer>(); while ((ip1 = br1.readLine()) != null) { if (!hm.containsKey(ip1)) { hm.put(ip1, 1); } else { hm.put(ip1, hm.get(ip1) + 1); } } IP[] ips = new IP[hm.size()]; int index = 0; for (String temp : hm.keySet()) { ips[index] = new IP(temp, hm.get(temp)); index++; } int max = 0; for (int j = 1; j < ips.length; j++) { if (ips[j].getCount() > ips[max].getCount()) { max = j; } } list.add(ips[max]); long endTime = System.currentTimeMillis(); System.out.println("已经统计文件:" + fileLoc + i + ".txt,用时:" + (endTime - startTime) + " 毫秒"); } } return list; } /** * 从每个文件出现频率最高ip中,计算出所有文件中出现频率最高ip。 * * @param list */ private static IP calculateResult(List<IP> list) { IP[] ips = new IP[list.size()]; ips = list.toArray(ips); int max = 0; for (int j = 1; j < ips.length; j++) { if (ips[j].getCount() > ips[max].getCount()) { max = j; } } return ips[max]; } public static void findIp() throws IOException, ClassNotFoundException { long start = System.currentTimeMillis(); //hashToSmallFiles(); long end1 = System.currentTimeMillis(); System.out.println("将大文件映射成小文件,用时:" + (end1 - start) + "毫秒"); System.out.println("将大文件映射成小文件,约:" + (end1 - start) / 60000 + "分钟"); // 测试时大约28分钟 System.out.println("映射到小文件完成,开始统计每个小文件中出现频率最高的ip"); long start1 = System.currentTimeMillis(); List<IP> list = countEverySmallFile(); long end2 = System.currentTimeMillis(); System.out.println("统计所有文件共用时:" + (end2 - start1) + " 毫秒"); System.out.println("统计所有文件共用时,约:" + (end2 - start1) / 60000 + "分钟"); // 测试时大约13分钟 System.out.println("统计完成,开始计算所有ip中出现频率最高的ip"); IP ip = calculateResult(list); System.out.println("访问次数最多的ip是:" + ip.getIp() + ":" + ip.getCount()); long end = System.currentTimeMillis(); System.out.println("公用时:" + (end - start) + "毫秒"); } // 产生大文件 public static void getBigFile() { try { File file = new File(fileLoc); if(!file.exists()) { //如果不存在则创建 file.createNewFile(); System.out.println("文件创建完成,开始写入"); } FileWriter fw = new FileWriter(file); //创建文件写入 BufferedWriter bw = new BufferedWriter(fw); //产生随机数据,写入文件 Random random = new Random(); for(int i=0;i<1024*1024*1024;i++) { int randint = (int)Math.floor((random.nextDouble()*100000.0));//产生【0,10000】之间随机数 bw.write(String.valueOf(randint)); //写入一个随机数 bw.newLine(); //新的一行 } bw.close(); fw.close(); System.out.println("文件写入完成"); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //getBigFile(); // 产生文件:有1024*1024*1024行(1G行),每一行一个数字 findIp(); System.out.println("Finished"); } }
五、总结
统计词频在大数据领域应用非常广泛,我们在学习大数据技术中的第一个Demo就是WordCount,所以大家必须把这个思想掌握到位,这样在使用Hadoop中的MapReduce进行数据归并处理时才不至于懵逼。
六、在海量日志数据中,找出出现次数最多的IP地址(扩展)
有一个12G的文本文件,每行记录的是一个IP地址,现要找出这个文件中出现次数最多的那个ip。
import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; class IP implements Serializable { private static final long serialVersionUID = -8903000680469719698L; private String ip = ""; private int count; public IP(String ip2, Integer integer) { this.ip = ip2; this.count = integer; } public int getCount() { return count; } public String getIp() { return ip; } public void setCount(int count) { this.count = count; } public void setIp(String ip) { this.ip = ip; } } /** * 1、海量日志数据,提取出某日访问百度次数最多的那个IP。 * * 首先是这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。注意到IP是32位的,最多有个2^32个IP。同样可以采用映射的方法, * 比如模1000 * ,把整个大文件映射为1000个小文件,再找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率 * 。然后再在这1000个最大的IP中,找出那个频率最大的IP * * @author WilsonPeng 846106184@qq.com * */ public class No2 { static String fileLoc = "D:\bigdata_ip.txt"; public static void findIp() throws IOException, ClassNotFoundException { long start = System.currentTimeMillis(); hashToSmallFiles(); long end1 = System.currentTimeMillis(); System.out.println("将大文件映射成小文件,用时:" + (end1 - start) + "毫秒"); System.out.println("映射到小文件完成,开始统计每个小文件中出现频率最高的ip"); long start1 = System.currentTimeMillis(); List<IP> list = countEverySmallFile(); long end2 = System.currentTimeMillis(); System.out.println("统计所有文件共用时:" + (end2 - start1) + " 毫秒"); System.out.println("统计完成,开始计算所有ip中出现频率最高的ip"); IP ip = calculateResult(list); System.out.println("访问次数最多的ip是:" + ip.getIp() + ":" + ip.getCount()); long end = System.currentTimeMillis(); System.out.println("公用时:" + (end - start) + "毫秒"); } /** * 从每个文件出现频率最高ip中,计算出所有文件中出现频率最高ip。 * * @param list */ private static IP calculateResult(List<IP> list) { IP[] ips = new IP[list.size()]; ips = list.toArray(ips); int max = 0; for (int j = 1; j < ips.length; j++) { if (ips[j].getCount() > ips[max].getCount()) { max = j; } } return ips[max]; } /** * 统计生成的每一个小文件,返回一个List,这个List的每一项就是每个小文件的统计结果,即每个小文件中出现频率最高的ip和出现次数 * * @return * @throws FileNotFoundException * @throws IOException */ private static List<IP> countEverySmallFile() throws FileNotFoundException, IOException { List<IP> list = new ArrayList<IP>(); for (int i = 0; i < 1024; i++) { File file = new File(fileLoc + i + ".txt"); if (file.exists()) { long startTime = System.currentTimeMillis(); BufferedReader br1 = new BufferedReader(new FileReader(file)); String ip1 = ""; HashMap<String, Integer> hm = new HashMap<String, Integer>(); while ((ip1 = br1.readLine()) != null) { if (!hm.containsKey(ip1)) { hm.put(ip1, 1); } else { hm.put(ip1, hm.get(ip1) + 1); } } IP[] ips = new IP[hm.size()]; int index = 0; for (String temp : hm.keySet()) { ips[index] = new IP(temp, hm.get(temp)); index++; } int max = 0; for (int j = 1; j < ips.length; j++) { if (ips[j].getCount() > ips[max].getCount()) { max = j; } } list.add(ips[max]); long endTime = System.currentTimeMillis(); System.out.println("已经统计文件:" + fileLoc + i + ".txt,用时:" + (endTime - startTime) + " 毫秒"); } } return list; } /** * 将打文件hash成1024个小文件 * * @throws FileNotFoundException * @throws IOException */ private static void hashToSmallFiles() throws FileNotFoundException, IOException { BufferedReader br = new BufferedReader(new FileReader(fileLoc)); String ip = ""; HashMap<String, FileWriter> fileWriters = new HashMap<String, FileWriter>(); while ((ip = br.readLine()) != null) { int tmp = Math.abs(ip.hashCode() % 1024); String fileName = fileLoc + tmp + ".txt"; FileWriter fw = null; if (fileWriters.containsKey(fileName)) { fw = fileWriters.get(fileName); } else { fw = new FileWriter(fileName, true); fileWriters.put(fileName, fw); } fw.write(ip + " "); } br.close(); for (FileWriter ff : fileWriters.values()) { ff.close(); } } /** * 随机生成ip地址,生成大文本文件 * * @throws IOException */ private static void generateFile() throws IOException { FileWriter fw = new FileWriter(fileLoc, true); for (int i = 0; i < 100000000; i++) { for (int j = 0; j < 100000000; j++) { fw.write(generateIp() + " "); } } fw.close(); System.out.println("done"); } /** * 随机生成ip地址 * * @return */ private static String generateIp() { String ip = ""; for (int i = 0; i < 4; i++) { int temp = (int) (Math.random() * 255); ip += temp + "."; } return ip.substring(0, ip.length() - 1); } public static void main(String[] args) { try { findIp(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
我的微信公众号:架构真经(id:gentoo666),分享Java干货,高并发编程,热门技术教程,微服务及分布式技术,架构设计,区块链技术,人工智能,大数据,Java面试题,以及前沿热门资讯等。每日更新哦!
参考资料: