/**技术: CyclicBarrier + AtomicLong + BlockingQueue + RandomAccessFile */ public class MainTest { private static Logger logger = LoggerFactory.getLogger(MainTest.class); //给定线程数 static int threadNum = 4; private static String path = "C:/Users/70403/Downloads/txtTest/txtTest7.txt"; // private static String writePath = "C:/Users/70403/Downloads/txtTest/txtTestWriteMe2.txt"; /** * */ public static void main(String[] args) { logger.info("开始时间:" + new Date()); // 声明缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); BigFileReaderByPart.Builder builder = new BigFileReaderByPart.Builder(path, new IDataHandle() { public void handle(String line) { System.out.println("--100----->" + Thread.currentThread() + line); writeFun(writePath, line); //解析执行的数量时间3460ms,解析数量为5000 //解析执行的数量时间3226ms,解析数量为5000 //解析执行的数量时间3907ms,解析数量为5000 //解析执行的数量时间3279ms,解析数量为5000 } }, queue); // builder.withCharset("UTF-8").withTreahdSize(threadNum).withBufferSize(524288); BigFileReaderByPart bigFileReader = builder.build(); bigFileReader.start(); // // 线程写入数据到同一个文件 // BufferedWriter fw = new BufferedWriter(new FileWriter(ConfigUtil.getValue("log.target.path"), true)); // // 线程池 // ExecutorService executor = Executors.newCachedThreadPool(); // executor.execute(new QueueResult2File(fw, queue)); // // // 关闭线程 // executor.shutdown(); // executor.awaitTermination(5, TimeUnit.DAYS); // // // 关闭数据流 // fw.close(); System.out.println("结束时间:" + new Date()); logger.info("app1 Shutdown..."); } }
/** * 技术实现: jsoup */ public class BigFileReaderByPart { /****************** 初始化 *********************/ // private Logger logger = Logger.getLogger(BigFileReaderByPart.class); // 日志对象 private Logger logger = LoggerFactory.getLogger(getClass()); private int threadSize; // 线程大小 private String charset; // 编码 private int bufferSize; // 缓存字节大小 private IDataHandle handle; // 数据返回定义接口 private ExecutorService executorService; // 线程池 private long fileLength; // 文件长度 private RandomAccessFile rAccessFile; // 文件读写对象 private Set<StartEndPair> startEndPairs; // 分片 private CyclicBarrier cyclicBarrier; private AtomicLong counter = new AtomicLong(0); // 计数 private BlockingQueue<String> queue; // 队列 private Map<String, String> filterMap; // 过滤URL // private CheckSensitiveWord checkSensitiveWord; /** * @param file * @param handle * @param charset * @param bufferSize * @param threadSize */ private BigFileReaderByPart(File file,IDataHandle handle, BlockingQueue<String> queue, String charset,int bufferSize,int threadSize){ this.fileLength = file.length(); this.handle = handle; this.charset = charset; this.bufferSize = bufferSize; this.threadSize = threadSize; try { this.rAccessFile = new RandomAccessFile(file,"rw"); } catch (FileNotFoundException e) { e.printStackTrace(); } this.executorService = newFixedThreadPool(threadSize); startEndPairs = new HashSet<BigFileReaderByPart.StartEndPair>(); this.queue = queue; this.filterMap = new HashMap<String, String>(); // checkSensitiveWord = new CheckSensitiveWord(); } /** * 开启任务 */ public void start(){ long everySize = this.fileLength / this.threadSize; try { calculateStartEnd(0, everySize); } catch (IOException e) { e.printStackTrace(); return; } final long startTime = System.currentTimeMillis(); cyclicBarrier = new CyclicBarrier(startEndPairs.size(), new Runnable() { /* (non-Javadoc) * @see java.lang.Runnable#run() */ public void run() { logger.info("use time---5--->: " + (System.currentTimeMillis() - startTime)); logger.info("all line---8--->: " + counter.get()); logger.info("--50--->解析执行的数量时间{}ms,解析数量为{} : ", System.currentTimeMillis() - startTime, counter.get()); } }); logger.info("总分配分片数量----10---->:" + startEndPairs.size()); for (StartEndPair pair : startEndPairs) { logger.info("分配分片----13---->:" + pair); this.executorService.execute(new SliceReaderTask(pair)); } } /** * 计算指针读取开始和结尾即分片 */ private void calculateStartEnd(long start, long size) throws IOException { if (start > fileLength - 1) { return; } StartEndPair pair = new StartEndPair(); pair.start = start; long endPosition = start + size - 1; if (endPosition >= fileLength - 1) { pair.end = fileLength - 1; startEndPairs.add(pair); return; } // 移动指针读取 rAccessFile.seek(endPosition); byte tmp = (byte) rAccessFile.read(); while (tmp != ' ' && tmp != ' ') { endPosition++; if (endPosition >= fileLength - 1) { endPosition = fileLength - 1; break; } rAccessFile.seek(endPosition); tmp = (byte) rAccessFile.read(); } pair.end = endPosition; startEndPairs.add(pair); // 迭代计算 calculateStartEnd(endPosition + 1, size); } /** * 关闭 */ public void shutdown(){ try { this.rAccessFile.close(); } catch (IOException e) { e.printStackTrace(); } this.executorService.shutdown(); } /** * @throws UnsupportedEncodingException */ private void handle(byte[] bytes) throws UnsupportedEncodingException{ String line = null; if (this.charset == null) { line = new String(bytes); } else { line = new String(bytes, charset); } if (line != null && !"".equals(line)) { this.handle.handle(line); // String writePath="C:/Users/70403/Downloads/txtTest/txtTestWriteMe.txt"; // writeFun(writePath, line); // // 抓取内容 // String [] lineArray = line.split(" "); // if (null != lineArray && lineArray.length == 3 && null != lineArray[2]) { // if (!checkSensitiveWord.isContaintSensitiveWord(lineArray[2])) { // // 如果.html直接访问 // if (!filterMap.containsKey(StringUtil.getDomain(lineArray[2]))) { // String contentStr = grabTitle(lineArray[2]); // if (null != contentStr) { // queue.add(contentStr); // } // } // logger.info("第" + counter.get() + "行 : 已请求" + line); // } else { // logger.info("第" + counter.get() + "行 : 已过滤URL: " + line); // } // } counter.incrementAndGet(); // 递增 保证数据计数同步 } } /** * 抓取网站url的title和keywords */ public String grabTitle(String url) { Document doc = null; try { Connection con = Jsoup.connect(url).timeout(5000); con.userAgent("Mozilla/5.0 (Windows NT 10.0; WOW64; rv:44.0) Gecko/20100101 Firefox/44.0"); doc = con.get(); } catch (Exception e) { //logger.info(url); String domainStr = StringUtil.getDomain(url); if (!filterMap.containsKey(domainStr)) { filterMap.put(domainStr, url); //过滤 } return null; } if (null == doc) { return null; } String title = doc.title(); String keywords = null; Element el = doc.getElementsByAttributeValue("name", "keywords").first(); if (null != el) { keywords = el.attr("content"); } if (StringUtil.isNullOrEmpty(title) && StringUtil.isNullOrEmpty(keywords)) { return null; } if (StringUtil.isNullOrEmpty(title)) { title = "null"; } if (StringUtil.isNullOrEmpty(keywords)) { title = "null"; } StringBuilder sb = new StringBuilder(); sb.append(doc.title()).append("-&-").append(keywords); return sb.toString(); } /** * </pre> */ private static class StartEndPair{ public long start; public long end; /* (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { return "star----15---->" + start + ";end----18---->" + end; } /* (non-Javadoc) * @see java.lang.Object#hashCode() */ @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (end ^ (end >>> 32)); result = prime * result + (int) (start ^ (start >>> 32)); return result; } /* (non-Javadoc) * @see java.lang.Object#equals(java.lang.Object) */ @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } StartEndPair other = (StartEndPair) obj; if (end != other.end) { return false; } if (start != other.start) { return false; } return true; } } /** * </pre> */ private class SliceReaderTask implements Runnable{ private long start; private long sliceSize; private byte[] readBuff; /** * @paramm start read position (include) * @paramm end the position read to(include) */ public SliceReaderTask(StartEndPair pair) { this.start = pair.start; this.sliceSize = pair.end-pair.start+1; this.readBuff = new byte[bufferSize]; } /* (non-Javadoc) * @see java.lang.Runnable#run() */ public void run() { try { MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(MapMode.READ_ONLY, start, this.sliceSize); ByteArrayOutputStream bos = new ByteArrayOutputStream(); for (int offset = 0; offset < sliceSize; offset += bufferSize) { int readLength; if (offset + bufferSize <= sliceSize) { readLength = bufferSize; } else { readLength = (int) (sliceSize - offset); } mapBuffer.get(readBuff, 0, readLength); for (int i = 0; i < readLength; i++) { byte tmp = readBuff[i]; if (tmp == ' ' || tmp == ' ') { handle(bos.toByteArray()); bos.reset(); } else { bos.write(tmp); } } } if (bos.size() > 0) { handle(bos.toByteArray()); } cyclicBarrier.await();// 等待其他线程操作完毕 }catch (Exception e) { e.printStackTrace(); } } } /** * </pre> */ public static class Builder{ //给定线程4 private int threadSize = 4; // private int threadSize = 1; private String charset = null; private int bufferSize = 1024 * 1024; private IDataHandle handle; private File file; private BlockingQueue<String> queue; public Builder(String file, IDataHandle handle, BlockingQueue<String> queue){ this.file = new File(file); if(!this.file.exists()){ throw new IllegalArgumentException("文件不存在!"); } this.handle = handle; this.queue = queue; } public Builder withTreahdSize(int size){ this.threadSize = size; return this; } public Builder withCharset(String charset){ this.charset= charset; return this; } public Builder withBufferSize(int bufferSize){ this.bufferSize = bufferSize; return this; } public BigFileReaderByPart build(){ return new BigFileReaderByPart(this.file,this.handle,this.queue, this.charset,this.bufferSize,this.threadSize); } } }
public interface IDataHandle { /** * 处理接口 * * @param line */ public void handle(String line); }
public class StringUtil { /** * 禁止实例化 */ private StringUtil() { } /** * 判断是否为空 * * @param value * @return */ public static final boolean isNullOrEmpty(String value) { return (value == null) || (value.length() == 0); } /** * @param curl * @return */ public static String getDomain(String curl) { URL url = null; String q = ""; try { url = new URL(curl); q = url.getHost(); } catch (MalformedURLException e) { } url = null; return q; } /** * 截取url中的域名 * * @param url 初始化请求url * @return url域名 */ public static String getDomainName(String url) { if (null != url && url != "") { Pattern p = Pattern.compile("(?<=//|)((\w)+\.)+\w+"); Matcher m = p.matcher(url); if(m.find()){ return m.group(); } } return null; } public static void main(String[] args) { System.out.println(getDomain("http://tf.360.cn/e/wb?_=76b6fa2e03fe712e&ip=49.69.92.204&reduce=0&width=0&height=0"));; } }
https://github.com/butter-fly/big_file_read;