zoukankan      html  css  js  c++  java
  • Java Nio 多线程网络下载

    --> 默认最多50个线程 同一文件下载失败延迟超过30秒就结束下载

    --> 下载5分钟超时时间,假设5分钟内未下载完就结束下载

    --> 依赖 commons-httpclient 与 commons-io 包

    package com.leunpha;
    
    import org.apache.commons.httpclient.HttpClient;
    import org.apache.commons.httpclient.methods.GetMethod;
    import org.apache.commons.httpclient.params.HttpClientParams;
    import org.apache.commons.io.IOUtils;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import java.io.*;
    import java.lang.reflect.Method;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    import java.security.AccessController;
    import java.security.PrivilegedAction;
    import java.util.Observable;
    import java.util.Timer;
    import java.util.TimerTask;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.zip.ZipFile;
    
    /**
     * User: zhoujingjie
     * Date: 14-4-18
     * Time: 下午12:52
     */
    public class Downloader extends Observable {
        protected String url, savePath;             //下载地址与保存路径
        protected FileChannel channel;              //保存文件的通道
        protected long size, perSize;              //文件大小与每一个小文件的大小
        protected volatile long downloaded;       // 已下载的
        protected int connectCount;                 //连接数
        protected Connection[] connections;         //连接对象
        protected boolean isSupportRange;         //是否支持断点下载
        protected long timeout;                     //超时
        protected boolean exists;                   //是否存在
        private RandomAccessFile randomAccessFile;
        protected volatile boolean stop;            //停止
        private static volatile  boolean exception; //是否异常
        private AtomicLong prevDownloaded = new AtomicLong(0); //上一次的下载结果
        private static Log log = LogFactory.getLog(Downloader.class);
        private AtomicInteger loseNum = new AtomicInteger(0);
        private int maxThread;
    
        public Downloader(String url, String savePath) throws IOException {
            //超时一小时
            this(url, savePath, 1000 * 60*5,50);
        }
    
        public Downloader(String url, String savePath, long timeout,int maxThread) throws FileNotFoundException {
            this.timeout = timeout;
            this.url = url;
            this.maxThread = maxThread;
            File file = new File(savePath);
            if (!file.exists()) file.mkdirs();
            this.savePath= file.getAbsolutePath() + "/" + url.substring(url.lastIndexOf("/"));
            exists = new File(this.savePath).exists();
            if(!exists){
                randomAccessFile=   new RandomAccessFile(this.savePath+".temp", "rw");
                channel =randomAccessFile.getChannel();
            }
        }
    
    
        public GetMethod method(long start, long end) throws IOException {
            GetMethod method = new GetMethod(Downloader.this.url);
            method.setRequestHeader("User-Agent", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36");
            if (end > 0) {
                method.setRequestHeader("Range", "bytes=" + start + "-" + (end - 1));
            } else {
                method.setRequestHeader("Range", "bytes=" + start + "-");
            }
            HttpClientParams clientParams = new HttpClientParams();
            //5秒超时
            clientParams.setConnectionManagerTimeout(5000);
            HttpClient client = new HttpClient(clientParams);
            client.executeMethod(method);
            int statusCode = method.getStatusCode();
            if (statusCode >= 200 && statusCode < 300) {
                isSupportRange = (statusCode == 206) ? true : false;
            }
            return method;
        }
    
        public void init() throws IOException {
            size = method(0, -1).getResponseContentLength();
            if (isSupportRange) {
                if (size < 4 * 1024 * 1024) {  //假设小于4M
                    connectCount = 1;
                } else if (size < 10 * 1024 * 1024) { //假设文件小于10M 则两个连接
                    connectCount = 2;
                } else if (size < 30 * 1024 * 1024) { //假设文件小于80M 则使用6个连接
                    connectCount = 3;
                } else if (size < 60 * 1024 * 1024) {          //假设小于60M 则使用10个连接
                    connectCount = 4;
                } else {
                    //否则为10个连接
                    connectCount = 5;
                }
            } else {
                connectCount = 1;
            }
            log.debug(String.format("%s size:%s connectCount:%s", this.url, this.size, this.connectCount));
            perSize = size / connectCount;
            connections = new Connection[connectCount];
            long offset = 0;
            for (int i = 0; i < connectCount - 1; i++) {
                connections[i] = new Connection(offset, offset + perSize);
                offset += perSize;
            }
            connections[connectCount - 1] = new Connection(offset, size);
        }
    
    
        /**
         * 强制释放内存映射
         *
         * @param mappedByteBuffer
         */
        static void unmapFileChannel(final MappedByteBuffer mappedByteBuffer) {
            try {
                if (mappedByteBuffer == null) {
                    return;
                }
                mappedByteBuffer.force();
                AccessController.doPrivileged(new PrivilegedAction<Object>() {
                    @Override
                    public Object run() {
                        try {
                            Method getCleanerMethod = mappedByteBuffer.getClass().getMethod("cleaner", new Class[0]);
                            getCleanerMethod.setAccessible(true);
                            sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(mappedByteBuffer, new Object[0]);
                            cleaner.clean();
                        } catch (Exception e) {
                            //LOG.error("unmapFileChannel." + e.getMessage());
                        }
                        return null;
                    }
                });
            } catch (Exception e) {
                log.debug("异常->exception=true");
                exception = true;
                log.error(e);
            }
        }
    
    
        private void timer() {
            Timer timer = new Timer();
            //延迟3秒,3秒执行一次
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    log.debug(String.format("已下载-->%s -> %s",(((double) downloaded) / size * 100) + "%", url));
                    //假设上一次的下载大小与当前的一样就退出
                    if(prevDownloaded.get() ==downloaded && downloaded<size){
                        if(loseNum.getAndIncrement()>=10){
                            log.debug(String.format("上次下载%s与当前下载%s一致,exception->true  url:%s ",prevDownloaded.get(),downloaded,url));
                            exception = true;
                        }
                    }
                    //假设下载完毕或者异常就退出
                    if(downloaded>=size || exception){
                        stop = true;
                        cancel();
                    }
                    //设置上次下载的大小等于如今的大小
                    prevDownloaded.set(downloaded);
                }
            },3000,3000);
        }
    
        public void start() throws IOException {
            if (exists) {
                log.info("文件已存在." + this.url);
                Thread.currentThread().interrupt();
                return;
            }
            while (Thread.activeCount()>maxThread){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {}
            }
            init();
            timer();
            CountDownLatch countDownLatch = new CountDownLatch(connections.length);
            log.debug("開始下载:" + url);
            for (int i = 0; i < connections.length; i++) {
                new DownloadPart(countDownLatch, i).start();
            }
            end(countDownLatch);
        }
    
        private boolean rename(File tempFile){
            File file = new File(this.savePath);
            boolean isRename=tempFile.renameTo(file);
            if(!isRename){
                try {
                    IOUtils.copy(new FileInputStream(tempFile),new FileOutputStream(file));
                } catch (IOException e) {
                    log.error(e);
                }
            }
            return true;
        }
    
        public void end(CountDownLatch countDownLatch){
            try {
                //超过指定时间就直接结束
                countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                exception = true;
                log.error(e);
                log.info("下载失败:"+this.url);
            } finally {
                try {
                    channel.force(true);
                    channel.close();
                    randomAccessFile.close();
                } catch (IOException e) {
                    log.error(e);
                }
                File temp = new File(this.savePath+".temp");
                log.debug(String.format("%s  %s", exception, this.url));
                //假设有异常则删除已下载的暂时文件
                if(exception){
                    if(!temp.delete()){
                        if(temp!=null)temp.delete();
                    }
                }else{
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {}
                    rename(temp);
                    setChanged();
                    notifyObservers(this.url);
                    log.info("下载成功:"+this.url);
                }
            }
        }
    
    
    
        private class Connection {
            long start, end;
    
            public Connection(long start, long end) {
                this.start = start;
                this.end = end;
            }
    
            public InputStream getInputStream() throws IOException {
                return method(start, end).getResponseBodyAsStream();
            }
        }
    
    
        private class DownloadPart implements Runnable {
            CountDownLatch countDownLatch;
            int i;
    
            public DownloadPart(CountDownLatch countDownLatch, int i) {
                this.countDownLatch = countDownLatch;
                this.i = i;
            }
            public void start() {
                new Thread(this).start();
            }
            @Override
            public void run() {
                MappedByteBuffer buffer = null;
                InputStream is = null;
                try {
                    is = connections[i].getInputStream();
                    buffer = channel.map(FileChannel.MapMode.READ_WRITE, connections[i].start, connections[i].end - connections[i].start);
                    byte[] bytes = new byte[4 * 1024];
                    int len;
                    while ((len = is.read(bytes)) != -1 && !exception && !stop) {
                        buffer.put(bytes, 0, len);
                        downloaded+= len;
                    }
                    log.debug(String.format("file block had downloaded.%s %s",i,url));
                } catch (IOException e) {
                    log.error(e);
                } finally {
                    unmapFileChannel(buffer);
                    if(buffer != null)buffer.clear();
                    if (is != null) try {
                        is.close();
                    } catch (IOException e) {
                    }
                    countDownLatch.countDown();
                }
            }
        }
    
    
    }
    



  • 相关阅读:
    StringGrid 实例3: 本例功能: 1、修改 TStringGrid的默认宽与高; 2、添加行; 3、确认当前单元并赋值.
    StringGrid 实例2:1、获取 StringGrid 的行数、列数; 2、给单元赋值.
    StringGrid 实例1:初始化StirngGrid的首行和首列
    stringgrid事件大全
    firemonkey中stringgrid属性大全
    DELPHI声明一个指针变量,什么时候需要分配内存,什么时候不需要分配内存?
    CoolTrayIcon4.0
    Delphi七个版本
    基础
    按钮打开链接,按钮click代码
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3765890.html
Copyright © 2011-2022 走看看