zoukankan      html  css  js  c++  java
  • 文本解析-多线程-分片解析

    /**技术:  CyclicBarrier + AtomicLong + BlockingQueue + RandomAccessFile
     */
    public class MainTest {
        private static Logger logger = LoggerFactory.getLogger(MainTest.class);
    
        //给定线程数
        static int threadNum = 4;
        private static String path =      "C:/Users/70403/Downloads/txtTest/txtTest7.txt"; //
        private static String writePath = "C:/Users/70403/Downloads/txtTest/txtTestWriteMe2.txt";
    
        /**
         *
         */
        public static void main(String[] args) {
            logger.info("开始时间:" + new Date());
            // 声明缓存队列
            BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
            BigFileReaderByPart.Builder builder = new BigFileReaderByPart.Builder(path, new IDataHandle() {
                public void handle(String line) {
                    System.out.println("--100----->" + Thread.currentThread() + line);
                    writeFun(writePath, line);
                    //解析执行的数量时间3460ms,解析数量为5000
                    //解析执行的数量时间3226ms,解析数量为5000
                    //解析执行的数量时间3907ms,解析数量为5000
                    //解析执行的数量时间3279ms,解析数量为5000
                }
            }, queue);
    //        builder.withCharset("UTF-8").withTreahdSize(threadNum).withBufferSize(524288);
            BigFileReaderByPart bigFileReader = builder.build();
            bigFileReader.start();
    
    //        // 线程写入数据到同一个文件
    //        BufferedWriter  fw = new BufferedWriter(new FileWriter(ConfigUtil.getValue("log.target.path"), true));
    //        // 线程池
    //        ExecutorService executor = Executors.newCachedThreadPool();
    //        executor.execute(new QueueResult2File(fw, queue));
    //
    //        // 关闭线程
    //        executor.shutdown();
    //        executor.awaitTermination(5, TimeUnit.DAYS);
    //
    //        // 关闭数据流
    //        fw.close();
    
            System.out.println("结束时间:" + new Date());
            logger.info("app1 Shutdown...");
        }
    }
    /**
     *  技术实现:  jsoup
     */
    public class BigFileReaderByPart {
        /****************** 初始化  *********************/
    //    private Logger logger = Logger.getLogger(BigFileReaderByPart.class); // 日志对象
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        private int threadSize; // 线程大小
        private String charset; // 编码
        private int bufferSize; // 缓存字节大小
        private IDataHandle handle; // 数据返回定义接口
        private ExecutorService  executorService; // 线程池
        private long fileLength; // 文件长度
        private RandomAccessFile rAccessFile; // 文件读写对象
        private Set<StartEndPair> startEndPairs; // 分片
        private CyclicBarrier cyclicBarrier;
        private AtomicLong counter = new AtomicLong(0); // 计数
        private BlockingQueue<String> queue; // 队列
        private Map<String, String> filterMap; // 过滤URL
    //    private CheckSensitiveWord checkSensitiveWord;
        /**
         * @param file
         * @param handle
         * @param charset
         * @param bufferSize
         * @param threadSize
         */
        private BigFileReaderByPart(File file,IDataHandle handle, BlockingQueue<String> queue, String charset,int bufferSize,int threadSize){
            this.fileLength = file.length();
            this.handle = handle;
            this.charset = charset;
            this.bufferSize = bufferSize;
            this.threadSize = threadSize;
            try {
                this.rAccessFile = new RandomAccessFile(file,"rw");
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            this.executorService = newFixedThreadPool(threadSize);
            startEndPairs = new HashSet<BigFileReaderByPart.StartEndPair>();
            this.queue = queue;
            this.filterMap =  new HashMap<String, String>();
    //        checkSensitiveWord = new CheckSensitiveWord();
        }
    
        /**
         * 开启任务
         */
        public void start(){
            long everySize = this.fileLength / this.threadSize;
            try {
                calculateStartEnd(0, everySize);
            } catch (IOException e) {
                e.printStackTrace();
                return;
            }
    
            final long startTime = System.currentTimeMillis();
            cyclicBarrier = new CyclicBarrier(startEndPairs.size(), new Runnable() {
                /* (non-Javadoc)
                 * @see java.lang.Runnable#run()
                 */
                public void run() {
                    logger.info("use time---5--->: " + (System.currentTimeMillis() - startTime));
                    logger.info("all line---8--->: " + counter.get());
                    logger.info("--50--->解析执行的数量时间{}ms,解析数量为{} : ", System.currentTimeMillis() - startTime, counter.get());
    
                }
            });
            logger.info("总分配分片数量----10---->:" + startEndPairs.size());
            for (StartEndPair pair : startEndPairs) {
                logger.info("分配分片----13---->:" + pair);
                this.executorService.execute(new SliceReaderTask(pair));
            }
        }
    
        /**
         * 计算指针读取开始和结尾即分片
         */
        private void calculateStartEnd(long start, long size) throws IOException {
            if (start > fileLength - 1) {
                return;
            }
            StartEndPair pair = new StartEndPair();
            pair.start = start;
            long endPosition = start + size - 1;
            if (endPosition >= fileLength - 1) {
                pair.end = fileLength - 1;
                startEndPairs.add(pair);
                return;
            }
            // 移动指针读取
            rAccessFile.seek(endPosition);
            byte tmp = (byte) rAccessFile.read();
            while (tmp != '
    ' && tmp != '
    ') {
                endPosition++;
                if (endPosition >= fileLength - 1) {
                    endPosition = fileLength - 1;
                    break;
                }
                rAccessFile.seek(endPosition);
                tmp = (byte) rAccessFile.read();
            }
            pair.end = endPosition;
            startEndPairs.add(pair);
    
            // 迭代计算
            calculateStartEnd(endPosition + 1, size);
        }
    
        /**
         * 关闭
         */
        public void shutdown(){
            try {
                this.rAccessFile.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            this.executorService.shutdown();
        }
    
        /**
         * @throws UnsupportedEncodingException
         */
        private void handle(byte[] bytes) throws UnsupportedEncodingException{
            String line = null;
            if (this.charset == null) {
                line = new String(bytes);
            } else {
                line = new String(bytes, charset);
            }
            if (line != null && !"".equals(line)) {
                this.handle.handle(line);
    
    //            String writePath="C:/Users/70403/Downloads/txtTest/txtTestWriteMe.txt";
    //            writeFun(writePath, line);
    
    //            // 抓取内容
    //            String [] lineArray = line.split("    ");
    //            if (null != lineArray && lineArray.length == 3 && null != lineArray[2]) {
    //                if (!checkSensitiveWord.isContaintSensitiveWord(lineArray[2])) {
    //                    // 如果.html直接访问
    //                    if (!filterMap.containsKey(StringUtil.getDomain(lineArray[2]))) {
    //                        String contentStr = grabTitle(lineArray[2]);
    //                        if (null != contentStr) {
    //                            queue.add(contentStr);
    //                        }
    //                    }
    //                    logger.info("第" + counter.get() + "行 : 已请求" + line);
    //                } else {
    //                    logger.info("第" + counter.get() + "行 : 已过滤URL: " + line);
    //                }
    //            }
    
                counter.incrementAndGet(); // 递增 保证数据计数同步
            }
        }
    
    
        /**
         * 抓取网站url的title和keywords
         */
        public String grabTitle(String url) {
            Document doc = null;
            try {
                Connection con = Jsoup.connect(url).timeout(5000);
                con.userAgent("Mozilla/5.0 (Windows NT 10.0; WOW64; rv:44.0) Gecko/20100101 Firefox/44.0");
                doc = con.get();
            } catch (Exception e) {
                //logger.info(url);
                String domainStr = StringUtil.getDomain(url);
                if (!filterMap.containsKey(domainStr)) {
                    filterMap.put(domainStr, url); //过滤
                }
                return null;
            }
            if (null == doc) {
                return null;
            }
    
    
            String title  = doc.title();
            String keywords = null;
            Element el = doc.getElementsByAttributeValue("name", "keywords").first();
            if (null != el) {
                keywords = el.attr("content");
            }
            if (StringUtil.isNullOrEmpty(title) && StringUtil.isNullOrEmpty(keywords)) {
                return null;
            }
    
            if (StringUtil.isNullOrEmpty(title)) {
                title = "null";
            }
            if (StringUtil.isNullOrEmpty(keywords)) {
                title = "null";
            }
            StringBuilder sb = new StringBuilder();
            sb.append(doc.title()).append("-&-").append(keywords);
            return sb.toString();
        }
    
        /**
         * </pre>
         */
        private static class StartEndPair{
            public long start;
            public long end;
    
            /* (non-Javadoc)
             * @see java.lang.Object#toString()
             */
            @Override
            public String toString() {
                return "star----15---->" + start + ";end----18---->" + end;
            }
    
            /* (non-Javadoc)
             * @see java.lang.Object#hashCode()
             */
            @Override
            public int hashCode() {
                final int prime = 31;
                int result = 1;
                result = prime * result + (int) (end ^ (end >>> 32));
                result = prime * result + (int) (start ^ (start >>> 32));
                return result;
            }
    
            /* (non-Javadoc)
             * @see java.lang.Object#equals(java.lang.Object)
             */
            @Override
            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null) {
                    return false;
                }
                if (getClass() != obj.getClass()) {
                    return false;
                }
                StartEndPair other = (StartEndPair) obj;
                if (end != other.end) {
                    return false;
                }
                if (start != other.start) {
                    return false;
                }
                return true;
            }
    
        }
    
        /**
         * </pre>
         */
        private class SliceReaderTask implements Runnable{
            private long start;
            private long sliceSize;
            private byte[] readBuff;
            /**
             * @paramm start     read position (include)
             * @paramm end     the position read to(include)
             */
            public SliceReaderTask(StartEndPair pair) {
                this.start = pair.start;
                this.sliceSize = pair.end-pair.start+1;
                this.readBuff = new byte[bufferSize];
            }
    
            /* (non-Javadoc)
             * @see java.lang.Runnable#run()
             */
            public void run() {
                try {
                    MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(MapMode.READ_ONLY, start, this.sliceSize);
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    for (int offset = 0; offset < sliceSize; offset += bufferSize) {
                        int readLength;
                        if (offset + bufferSize <= sliceSize) {
                            readLength = bufferSize;
                        } else {
                            readLength = (int) (sliceSize - offset);
                        }
                        mapBuffer.get(readBuff, 0, readLength);
                        for (int i = 0; i < readLength; i++) {
                            byte tmp = readBuff[i];
                            if (tmp == '
    ' || tmp == '
    ') {
                                handle(bos.toByteArray());
                                bos.reset();
                            } else {
                                bos.write(tmp);
                            }
                        }
                    }
    
                    if (bos.size() > 0) {
                        handle(bos.toByteArray());
                    }
                    cyclicBarrier.await();// 等待其他线程操作完毕
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        /**
         * </pre>
         */
        public static class Builder{
            //给定线程4
            private int threadSize = 4;
    //        private int threadSize = 1;
            private String charset = null;
            private int bufferSize = 1024 * 1024;
            private IDataHandle handle;
            private File file;
            private BlockingQueue<String> queue;
            public Builder(String file, IDataHandle handle, BlockingQueue<String> queue){
                this.file = new File(file);
                if(!this.file.exists()){
                    throw new IllegalArgumentException("文件不存在!");
                }
                this.handle = handle;
                this.queue = queue;
            }
    
            public Builder withTreahdSize(int size){
                this.threadSize = size;
                return this;
            }
    
            public Builder withCharset(String charset){
                this.charset= charset;
                return this;
            }
    
            public Builder withBufferSize(int bufferSize){
                this.bufferSize = bufferSize;
                return this;
            }
    
            public BigFileReaderByPart build(){
                return new BigFileReaderByPart(this.file,this.handle,this.queue, this.charset,this.bufferSize,this.threadSize);
            }
        }
    }
    public interface IDataHandle {
    
        /**
         * 处理接口
         * 
         * @param line
         */
        public void handle(String line);
    }
    public class StringUtil {
        /**
         * 禁止实例化
         */
        private StringUtil() {
            
        }
        
        /**
         * 判断是否为空
         * 
         * @param value
         * @return
         */
        public static final boolean isNullOrEmpty(String value) {
            return (value == null) || (value.length() == 0);
        }
        
        /**
         * @param curl
         * @return
         */
        public static String getDomain(String curl) {
            URL url = null;
            String q = "";
            try {
                url = new URL(curl);
                q = url.getHost();
            } catch (MalformedURLException e) {
    
            }
            url = null;
            return q;
        }
        
        /**
         * 截取url中的域名
         * 
         * @param url 初始化请求url
         * @return url域名
         */
        public static String getDomainName(String url) {
            if (null != url && url != "") {
                Pattern p = Pattern.compile("(?<=//|)((\w)+\.)+\w+");
                Matcher m = p.matcher(url);
                if(m.find()){
                      return m.group();
                }
            }
            return null;
        }
        
        public static void main(String[] args) {
            System.out.println(getDomain("http://tf.360.cn/e/wb?_=76b6fa2e03fe712e&ip=49.69.92.204&reduce=0&width=0&height=0"));;
        }
    }

    https://github.com/butter-fly/big_file_read;

  • 相关阅读:
    U盘 格式化 ext3 ext4
    MBR
    CentOS开机的时候卡在进度条一直进不去 F5(是关键)
    redis储存中文,客服端读取出现乱码
    redis 做为缓存服务器 注项!
    redis监控
    keepalived virtual_router_id 44
    你真的会用Gson吗?Gson使用指南
    你真的会用Retrofit2吗?Retrofit2完全教程
    Kotlin 初级读本
  • 原文地址:https://www.cnblogs.com/hahajava/p/10265382.html
Copyright © 2011-2022 走看看