最近在做的一个项目,涉及到数据的下载 解析和存储操作.最初的实现方案是针对下载的原始数据一条条进行处理,即下载,解析和存储操作按顺序流程执行,
这种方案显然难以提高效率.故后来重新实现了下载和解析存储流程并行执行的方案
1. 数据输出接口,数据存储到数据库或其他存储的抽象
public interface IDataOut { void save(Object object); }
2. 参数类
public class DownloadSaveArg { public String encode; //编码 public int maxRetryCount; //下载失败最大重试次数 public List<IDataOut> saveList; //数据存储列表 public List<BaseModule> workModules; //解析模块列表 public List<URLink> links; //下载接口的链接封装 public CrawlerLog crawlerLog; //下载存储日志记录 }
3. DownloadSaver 下载和存储过程的封装
public abstract class DownloadSaver { private static Logger log = Logger.getLogger(DownloadSaver.class); private static final int MAX_RETRY_COUNT = 3; private String encode; private int maxRetryCount; private List<IDataOut> saveList; private List<BaseModule> workModules; private CrawlerLog crawlerLog; /** * 是否添加url队列完成, * 为ture时,表示不会再继续添加url */ boolean addUrlFinish = false; boolean downloadFinished = false; boolean saveFinished = false; synchronized boolean isAddUrlFinish() { return this.addUrlFinish; } synchronized void setUrlFinish() { if (!this.addUrlFinish) this.addUrlFinish = true; } //url队列 private BlockingQueue<URLink> urlQueue = new LinkedBlockingQueue<>(); public void addUrl(URLink url) { try { urlQueue.put(url); } catch (InterruptedException e) { e.printStackTrace(); } } private void addUrls(List<URLink> links) { if (links == null) return; for (URLink l : links) try { this.urlQueue.put(l); } catch (InterruptedException e) { e.printStackTrace(); } } // 下载结果缓存队列 private LinkedBlockingQueue<JSONObject> resultQueue = new LinkedBlockingQueue<>( 1000); public DownloadSaver(DownloadSaveArg arg) { this.encode = Strings.isNullOrEmpty(arg.encode) ? "utf-8" : arg.encode; this.maxRetryCount = arg.maxRetryCount > 0 ? arg.maxRetryCount : MAX_RETRY_COUNT; this.saveList = arg.saveList; this.addUrls(arg.links); this.workModules = arg.workModules; this.crawlerLog = arg.crawlerLog; } private void addUrls(List<URLink> links) { if (links == null) return; for (URLink l : links) try { this.urlQueue.put(l); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 下载操作 * 消费url队列里的url,下载数据,经过解析缓存到resultQueue中 * addUrlFinish为true并且url队列为空时,结束操作 */ public void download() { } /** * 保存操作 * 消费下载结果队列里的数据,存储 * 当下载完成并且队列为空时,任务操作结束. */ public void save() { } public static class Downloader{} public static class Saver{} /** * 并行执行下载 解析流程 */ public void execute() { ExecutorService service = Executors.newCachedThreadPool(); Downloader downloader = new Downloader("downloader01", this); Saver saver = new Saver("saver01", this); Future<?> fs1 = service.submit(downloader); Future<?> fs2 = service.submit(saver); try { log.debug(fs1.get() + " downloader finished!"); log.debug(fs2.get() + " saver finished!"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { service.shutdown(); } } }
4.Downloader 下载器
/** * 下载器线程. */ public static class Downloader implements Runnable { private String instance; private DownloadSaver downloadSaver; public Downloader(String instance, DownloadSaver downloadSaver) { this.instance = instance; this.downloadSaver = downloadSaver; } @Override public void run() { System.out.println(this.instance + " running..."); downloadSaver.download(); } }
5. Saver 存储器
/** * 存储线程. */ public static class Saver implements Runnable { private String instance; private DownloadSaver downloadSaver; public Saver(String instance, DownloadSaver downloadSaver) { this.instance = instance; this.downloadSaver = downloadSaver; } @Override public void run() { log.debug(this.instance + " running..."); downloadSaver.save(); } }
6. 下载操作
/** * 下载操作 * 消费url队列里的url,下载数据,经过解析缓存到resultQueue中 * addUrlFinish为true并且url队列为空时,结束操作 */ public void download() { this.crawlerLog.setCrawlerStartTime(); while (true) { try { URLink link = urlQueue.take(); String result = startDownload(link.realLink); List<JSONObject> results = parseContent2Json(result, link); if (results != null) for (JSONObject j : results) { resultQueue.put(j); crawlerLog.incrCrawlerNum(); } } catch (InterruptedException e) { e.printStackTrace(); } if (addUrlFinish && urlQueue.isEmpty()) { downloadFinished = true; break; } } this.crawlerLog.setCrawlerEndTime(); }
6. 存储操作
/** * 保存操作 * 消费下载结果队列里的数据,存储 * 当下载完成并且队列为空时,任务操作结束. */ public void save() { this.crawlerLog.setStoreStartTime(); while (true) { try { parseSaveItem(resultQueue.take()); int saveCount = this.incSaveCount(); if (saveCount % 100 == 0) log.debug("saveCount: " + saveCount); } catch (InterruptedException e) { e.printStackTrace(); } if (downloadFinished && resultQueue.isEmpty()) { saveFinished = true; break; } } this.crawlerLog.setStoreEndTime(); }