zoukankan      html  css  js  c++  java
  • 生产者消费者模式实现数据下载存储流程

      最近在做的一个项目,涉及到数据的下载 解析和存储操作.最初的实现方案是针对下载的原始数据一条条进行处理,即下载,解析和存储操作按顺序流程执行,

    这种方案显然难以提高效率.故后来重新实现了下载和解析存储流程并行执行的方案

    1. 数据输出接口,数据存储到数据库或其他存储的抽象

    public interface IDataOut {
        void save(Object object);
    }

    2. 参数类

    public class DownloadSaveArg {
        public String encode; //编码
        public int maxRetryCount; //下载失败最大重试次数
        public List<IDataOut> saveList; //数据存储列表
        public List<BaseModule> workModules; //解析模块列表
        public List<URLink> links; //下载接口的链接封装
        public CrawlerLog crawlerLog; //下载存储日志记录
    }

    3. DownloadSaver 下载和存储过程的封装

    public abstract class DownloadSaver {
        private static Logger log = Logger.getLogger(DownloadSaver.class);
    
        private static final int MAX_RETRY_COUNT = 3;
    
        private String encode;
        private int maxRetryCount;
        private List<IDataOut> saveList;
        private List<BaseModule> workModules;
        private CrawlerLog crawlerLog;
        
         /**
         * 是否添加url队列完成,
         * 为ture时,表示不会再继续添加url
         */
        boolean addUrlFinish = false;
        boolean downloadFinished = false;
        boolean saveFinished = false;
    
        
        synchronized boolean isAddUrlFinish() {
            return this.addUrlFinish;
        }
    
        synchronized void setUrlFinish() {
            if (!this.addUrlFinish)
                this.addUrlFinish = true;
        }
    
        //url队列
        private BlockingQueue<URLink> urlQueue = new LinkedBlockingQueue<>();
    
        public void addUrl(URLink url) {
            try {
                urlQueue.put(url);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
      private void addUrls(List<URLink> links) {
            if (links == null) return;
    
            for (URLink l : links)
                try {
                    this.urlQueue.put(l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
    
        // 下载结果缓存队列
        private LinkedBlockingQueue<JSONObject> resultQueue = new LinkedBlockingQueue<>(
                1000);
    
        public DownloadSaver(DownloadSaveArg arg) {
            this.encode = Strings.isNullOrEmpty(arg.encode) ? "utf-8" : arg.encode;
            this.maxRetryCount = arg.maxRetryCount > 0 ? arg.maxRetryCount : MAX_RETRY_COUNT;
            this.saveList = arg.saveList;
            this.addUrls(arg.links);
            this.workModules = arg.workModules;
            this.crawlerLog = arg.crawlerLog;
        }
    
        private void addUrls(List<URLink> links) {
            if (links == null) return;
    
            for (URLink l : links)
                try {
                    this.urlQueue.put(l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
    
        /**
         * 下载操作
         * 消费url队列里的url,下载数据,经过解析缓存到resultQueue中
         * addUrlFinish为true并且url队列为空时,结束操作
         */
        public void download() {
        }
    
       /**
         * 保存操作
         * 消费下载结果队列里的数据,存储
         * 当下载完成并且队列为空时,任务操作结束.
         */
        public void save() {
        
        }
    
    
       public static class Downloader{}
    
       public static class Saver{}
    
       /**
         * 并行执行下载 解析流程
         */
        public void execute() {
            ExecutorService service = Executors.newCachedThreadPool();
    
            Downloader downloader = new Downloader("downloader01", this);
            Saver saver = new Saver("saver01", this);
    
            Future<?> fs1 = service.submit(downloader);
            Future<?> fs2 = service.submit(saver);
    
            try {
                log.debug(fs1.get() + " downloader finished!");
                log.debug(fs2.get() + " saver finished!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } finally {
                service.shutdown();
            }
    
        }   
    }

    4.Downloader 下载器

        /**
         * 下载器线程.
         */
        public static class Downloader implements Runnable {
            private String instance;
            private DownloadSaver downloadSaver;
    
            public Downloader(String instance, DownloadSaver downloadSaver) {
                this.instance = instance;
                this.downloadSaver = downloadSaver;
            }
    
            @Override
            public void run() {
                System.out.println(this.instance + " running...");
                downloadSaver.download();
            }
        }

    5. Saver 存储器

        /**
         * 存储线程.
         */
        public static class Saver implements Runnable {
            private String instance;
            private DownloadSaver downloadSaver;
    
            public Saver(String instance, DownloadSaver downloadSaver) {
                this.instance = instance;
                this.downloadSaver = downloadSaver;
            }
    
            @Override
            public void run() {
                log.debug(this.instance + " running...");
                downloadSaver.save();
            }
        }

    6. 下载操作

    /**
         * 下载操作
         * 消费url队列里的url,下载数据,经过解析缓存到resultQueue中
         * addUrlFinish为true并且url队列为空时,结束操作
         */
        public void download() {
            this.crawlerLog.setCrawlerStartTime();
            while (true) {
                try {
                    URLink link = urlQueue.take();
                    String result = startDownload(link.realLink);
    
                    List<JSONObject> results = parseContent2Json(result, link);
                    if (results != null)
                        for (JSONObject j : results) {
                            resultQueue.put(j);
                            crawlerLog.incrCrawlerNum();
                        }
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                if (addUrlFinish && urlQueue.isEmpty()) {
                    downloadFinished = true;
                    break;
                }
            }
    
            this.crawlerLog.setCrawlerEndTime();
        }

    6. 存储操作

    /**
         * 保存操作
         * 消费下载结果队列里的数据,存储
         * 当下载完成并且队列为空时,任务操作结束.
         */
        public void save() {
            this.crawlerLog.setStoreStartTime();
            while (true) {
                try {
                    parseSaveItem(resultQueue.take());
                    int saveCount = this.incSaveCount();
                    if (saveCount % 100 == 0)
                        log.debug("saveCount: " + saveCount);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                if (downloadFinished && resultQueue.isEmpty()) {
                    saveFinished = true;
                    break;
                }
            }
    
            this.crawlerLog.setStoreEndTime();
        }
  • 相关阅读:
    Day3----《Pattern Recognition and Machine Learning》Christopher M. Bishop
    Day2----《Pattern Recognition and Machine Learning》Christopher M. Bishop
    学习笔记-----《Pattern Recognition and Machine Learning》Christopher M. Bishop
    “数学之美”笔记
    win10下使用nodejs安装及webstorm创建express项目的指导
    deepin/ubuntu下搭建Jekyll环境
    struts2.3.23升级到struts2.3.32
    struts2.5能不能再恶心点
    线程通信、线程同步以及进城通信的真的搞懂了么
    WINFORM数据库操作,有点像安装里面的SQLITE
  • 原文地址:https://www.cnblogs.com/taich-flute/p/7144387.html
Copyright © 2011-2022 走看看