在上一节中,我们完成了一个简单的采集示例。本节呢,我们先来小结一下,这个示例可能存在的问题:
- 没有做异常处理
- 没有做反爬应对策略
- 没有做重试机制
- 没有做并发限制
- ……
呃,看似平静的表面下还是隐藏着不少杀机的……
但本节不打算对付上述问题,而是先关注一个隐藏更深的问题,这个问题,可能会牵扯很多人(包括我☹,不包括我☺,包括我☹,不包括我☺)的编程习惯问题。
这里提出一个突出的问题,就是堆栈溢出的问题。
首先,我们以上一节的示例为例,解析一下造成的原因,下图演示了一个内容采集的游走路径,也就是调用过程:
可以看出,方法之间存在着比较明显的依赖关系,也就是说,只有下一级方法执行完毕了,上一级方法才能完成执行,虽然其间,有部分异步方法,但总体来说,还是会有依赖存在。这就造成了堆栈积压,也就是一个方法没有执行完,另一个方法又压入栈中,然后又压入一个,又一个……又一个……最终,就会导致堆栈溢出。
示例的场景应当说是最为简单的,这种依赖还不算严重,但如果量级上来的话,也会是不小的一张关系网,而且被压入堆栈的,不仅仅是这几个方法所占的空间,还有可能会导致这个方法所涉及的类的实例以及其内部一些其他资源都无法被释放,而系统又不得不“保留”这张网,GC也拿它毫无办法(因为引用表都在)。再如果场景更复杂一些,可能一个验证码所需的依赖关系就要比本示例更为严重,再加上后续的流程层级多一些,再加上持久化等处理器的引入、分支结构的增加等等,相应的场景越复杂,耦合度就会越来越高,那对系统的影响将是毁灭性的。
相通的理论,有兴趣的同学,可以查询“递归所带来的问题”,以了解更多。而且这个问题的存在,可不是仅仅存在于爬虫系统中,它存在于我们日常编写的每一行代码中。它与语言无关、与业务无关,稍有不慎,就会留下这么个坑。很可怕,这也是为什么要专门拿出一节来说这个问题。
那么,是什么造成了这种耦合呢?其实,这也是由于“正常”的思维方式所引发的。拿示例来说,我们需要得到书籍列表,才能得到书籍ID,需要得到书籍ID,才能拼接出书籍章节列表的链接,需要得到书籍ID和章节ID,才能拼接出章节内容的链接,所以,理所当然的,就产生了依赖。
另外,当我们在Start方法和Analize方法内部发生错误时,最简单的“重试”方案时什么,就是再次执行Start方法嘛,好么,这种重试,有可能是1次,也有可能是N次,碰到服务器挂掉了,那么就会是永无休止的重试。这种情况造成的堆栈积压可要比一般的N级树带来的毁灭性更大。
了解了问题的严重性以及产生的原因,然后,我们尝试给出一个解决方案,解决这个问题的关键,就在于如何能够打破这个方法间的耦合。
这里,我们举一个生活中的栗子,假设,我是一个轻奢份子,自己不做饭,饿了,就下馆子,这样,我就对馆子产生了依赖,从出门去吃饭的那一刻起,我就无法再享受我的宽屏显示器带来的舒适感了,而外卖小哥的出现,有效的缓解了我的病症,下完单,足不出户,就可以继续抱着显示器写文章了。不用关心小哥什么时候去商家取的餐,也不用操心小哥先送谁的后送谁的,就专心写文章,等餐到了,就开吃,完活。
是不是闻到了“异步”的味道?
其实,我们的示例中,已经使用异步解决了从Start到Analize的耦合,那么从Analize到下一个Start之间甚至是发生错误时的重试呢,我们尝试使用另一种方法 —— 队列。
我们把采集列表页中的每一页,看作一个单独的任务,丢到队列里;
我们把采集每一本书的章节列表,看作一个单独的任务,丢到队列里;
我们把采集每一本书的每一个章节内容,看作一个单独的任务,丢到队列里;
当队列中的任务被执行,又没有执行成功时,就把这个任务再次丢到队列里;(重试)
队列中的任务都是散列的,之间都没有依赖关系,队列可以采用先进先出(FIFO)或后进先出(LIFO)原则来执行,问题不大。这样就可以有效避免了之前提出的堆栈溢出的问题。同时,我们还可以通过控制队列的大小,来限制并发量,一石二鸟:)再加上进入时间为度,还可以对并发频率做限制,一箭三雕:)
好了,既然已经有了解决方案,那么就来对我们的爬虫框架进行一次重构吧:)
第一步,我们在爬虫框架中新创建一个小蚂蚁的领队(LeaderAnt)类:
1 namespace MikeWare.Core.Components.CrawlerFramework 2 { 3 using System; 4 using System.Collections.Concurrent; 5 using System.Collections.Generic; 6 using System.Threading; 7 using System.Threading.Tasks; 8 9 public class LeaderAnt : Ant 10 { 11 public virtual ConcurrentQueue<JobContext> Queue { get; set; } 12 private ManualResetEvent mre = new ManualResetEvent(false); 13 //private List<WorkerAnt> workers = new List<WorkerAnt>(); 14 15 public void Work() 16 { 17 JobContext context = null; 18 19 do 20 { 21 if (Queue.TryDequeue(out context)) 22 { 23 CreateWorker(context).Work(context); 24 } 25 } while (!mre.WaitOne(1)); 26 } 27 28 private WorkerAnt CreateWorker(JobContext context) 29 { 30 return 31 new WorkerAnt() 32 { 33 AntId = (uint)Math.Abs(DateTime.Now.ToString("yyyyMMddHHmmssfff").GetHashCode()), 34 OnJobStatusChanged = (sender, args) => 35 { 36 //Console.WriteLine($"{args.EventAnt.AntId} said: {args.Context.JobName} entered status '{args.Context.JobStatus}'."); 37 switch (args.Context.JobStatus) 38 { 39 case TaskStatus.Created: 40 //if (string.IsNullOrEmpty(args.Context.JobName)) 41 //{ 42 // Console.WriteLine($"Can not execute a job with no name."); 43 // args.Cancel = true; 44 //} 45 //else 46 // Console.WriteLine($"{args.EventAnt.AntId.ToString("000000000")} said: job {args.Context.JobName} created."); 47 break; 48 case TaskStatus.Running: 49 //if (null != args.Context.Memory) 50 // Console.WriteLine($"{args.EventAnt.AntId} said: {args.Context.JobName} already downloaded {args.Context.Memory.Length} bytes."); 51 break; 52 case TaskStatus.RanToCompletion: 53 if (null != args.Context.Buffer && 0 < args.Context.Buffer.Length) 54 args.Context.Analizer.Analize(this, args.Context); 55 if (null != args.Context.Watch) 56 Console.WriteLine($"{args.EventAnt.AntId.ToString("000000000")} said: job {args.Context.JobName} Finished using {(args.Context.Watch.Elapsed.TotalMilliseconds / 100).ToString("000.00")}ms / request ******************** */{Environment.NewLine + Environment.NewLine}"); 57 break; 58 case TaskStatus.Faulted: 59 Console.WriteLine($"{args.EventAnt.AntId} said: job {args.Context.JobName} faulted because {args.Message}."); 60 Queue.Enqueue(args.Context); 61 break; 62 case TaskStatus.WaitingToRun: 63 case TaskStatus.WaitingForChildrenToComplete: 64 case TaskStatus.Canceled: 65 case TaskStatus.WaitingForActivation: 66 default:/* Do nothing on this even. */ 67 break; 68 } 69 }, 70 }; 71 } 72 } 73 }
在这个类中,声明了一个任务队列(ConcurrentQueue<JobContext> Queue),用来提供一个任务池;
一个干活方法(Work),负责从队列中取出任务,并分配给WorkerAnt,支配工蚁去干活~
一个创建工蚁方法(CreateWorker),负责根据任务上下文创建一只小工蚁;这样,我们就无需在业务层直接与工蚁打交道了,只需要往领队的任务池里丢任务就可以了;
领队在创建工蚁的时候,还指定了一项状态监控功能,当任务失败时,就把任务重新丢回任务池,尝试再次执行该任务:
1 case TaskStatus.Faulted: 2 Console.WriteLine($"{args.EventAnt.AntId} said: job {args.Context.JobName} faulted because {args.Message}."); 3 Queue.Enqueue(args.Context); 4 break;
第二步,我们在爬虫框架中又新增了一个解析器的抽象类:
1 namespace MikeWare.Core.Components.CrawlerFramework 2 { 3 using System; 4 5 public abstract class ACrawlerAnalizer 6 { 7 public virtual void Analize(LeaderAnt leader, JobContext context) => throw new NotImplementedException(); 8 } 9 }
这个类是一个抽象类,只提供了一个抽象方法Analize。主要用于实际业务去实现不同的业务节点的解析器,将关注点分离出去;
还是上一节使用的示例,我们在业务层重新提供了三个解析器类型:
1 namespace MikeWare.Crawlers.EBooks.Bizs 2 { 3 using MikeWare.Core.Components.CrawlerFramework; 4 using MikeWare.Crawlers.EBooks.Entities; 5 using System; 6 using System.Collections.Generic; 7 using System.Net; 8 using System.Text; 9 using System.Text.RegularExpressions; 10 11 public class BooksListAnalizer : ACrawlerAnalizer 12 { 13 private static Encoding encoding = new UTF8Encoding(false); 14 private static int total_page = -1; 15 private static Regex regex_list = new Regex(@"<li>[^<]+<div.*?更新:(?<updateTime>d+?-d+?-d+?)[^d].+?<a[^/]+?/Shtml(?<id>d+?).html.+?</li>", RegexOptions.Singleline); 16 private static Regex regex_page = new Regex(@"<div class=""tspage"">.+?<a href='/s/new/index_(?<totalPage>d+?).html'>尾页</a>.+?</div>", RegexOptions.Singleline); 17 18 public override void Analize(LeaderAnt leader, JobContext context) 19 { 20 if (null == context.InParams) return; 21 22 var data = context.Buffer; 23 if (null == data || 0 == data.Length) return; 24 25 var content = encoding.GetString(data); 26 var matches = regex_list.Matches(content); 27 if (!context.InParams.ContainsKey(Consts.LAST_UPDATE_TIME) || null == context.InParams[Consts.LAST_UPDATE_TIME]) return; 28 29 if (null != matches && 0 < matches.Count) 30 { 31 var lastUpdateTime = DateTime.MinValue; 32 if (!DateTime.TryParse(context.InParams[Consts.LAST_UPDATE_TIME].ToString(), out lastUpdateTime)) 33 return; 34 35 var update_time = DateTime.MinValue; 36 var bookId = 0; 37 foreach (Match match in matches) 38 { 39 if (!DateTime.TryParse(match.Groups["updateTime"].Value, out update_time) 40 || !int.TryParse(match.Groups["id"].Value, out bookId)) continue; 41 42 if (update_time > lastUpdateTime) 43 { 44 var newContext = new JobContext 45 { 46 JobName = "“奇书网-电子书-章节列表”", 47 Uri = $"http://www.xqishuta.com/du/{bookId / 1000}/{bookId}/", 48 Method = WebRequestMethods.Http.Get, 49 InParams = new Dictionary<string, object>(), 50 Analizer = new BookSectionsListAnalizer(), 51 }; 52 newContext.InParams.Add(Consts.LAST_UPDATE_TIME, context.InParams[Consts.LAST_UPDATE_TIME]); 53 newContext.InParams.Add(Consts.BOOK_ID, bookId); 54 leader.Queue.Enqueue(newContext); 55 } 56 else 57 return; 58 } 59 } 60 61 if (-1 == total_page) 62 { 63 var match = regex_page.Match(content); 64 if (null != match && match.Success && int.TryParse(match.Groups["totalPage"].Value, out total_page)) ; 65 66 } 67 68 var pageIndex = -1; 69 if (!context.InParams.ContainsKey(Consts.PAGE_INDEX) || null == context.InParams[Consts.PAGE_INDEX] 70 || !int.TryParse(context.InParams[Consts.PAGE_INDEX].ToString(), out pageIndex)) return; 71 72 if (pageIndex < total_page) 73 { 74 pageIndex++; 75 var newContext = new JobContext 76 { 77 JobName = $"奇书网-最新电子书-列表-第{pageIndex.ToString("00000")}页", 78 Uri = $"http://www.xqishuta.com/s/new/index_{pageIndex}.html", 79 Method = WebRequestMethods.Http.Get, 80 InParams = new Dictionary<string, object>(), 81 Analizer = new BooksListAnalizer(), 82 }; 83 newContext.InParams.Add(Consts.PAGE_INDEX, pageIndex); 84 newContext.InParams.Add(Consts.LAST_UPDATE_TIME, context.InParams[Consts.LAST_UPDATE_TIME]); 85 86 leader.Queue.Enqueue(newContext); 87 } 88 } 89 } 90 }
1 namespace MikeWare.Crawlers.EBooks.Bizs 2 { 3 using MikeWare.Core.Components.CrawlerFramework; 4 using MikeWare.Crawlers.EBooks.Entities; 5 using System; 6 using System.Collections.Generic; 7 using System.Net; 8 using System.Text; 9 using System.Text.RegularExpressions; 10 11 public class BookSectionsListAnalizer : ACrawlerAnalizer 12 { 13 private static Encoding encoding = new UTF8Encoding(false); 14 private static Regex regex_section_list = new Regex(@"(?<=<div[^>]+>[^<]+<p[^>]+>[^<]+?正文</p>.+?)(<li><a[^d]+?(?<section_id>d+?).html[^>]*?>(?<section_name>[^<]+?)</a></li>[^<]+?)+?(?=<)", RegexOptions.Singleline); 15 private static Regex regex_book_info = new Regex(@"<img src=""(?<photo>[^""]+)"" onerror=""[^""]+""/>" 16 + @".+?<div class=""info_des"">" 17 + @".+?<h1>(?<name>[^<]+)</h1>" 18 + @".+?<dl>作 者:(?<author>[^<]+)</dl>" 19 + @".+?<dl>最后更新:(?<updateTime>[^<]+)</dl>", RegexOptions.Singleline); 20 21 public override void Analize(LeaderAnt leader, JobContext context) 22 { 23 if (null == context.InParams) return; 24 25 var data = context.Buffer; 26 if (null == data || 0 == data.Length) 27 return; 28 29 var content = encoding.GetString(data); 30 31 var bookId = -1; 32 if (!context.InParams.ContainsKey(Consts.BOOK_ID) 33 || !int.TryParse(context.InParams[Consts.BOOK_ID].ToString(), out bookId)) 34 return; 35 36 var book = new Book { Id = bookId }; 37 var book_info_match = regex_book_info.Match(content); 38 if (null != book_info_match && book_info_match.Success) 39 { 40 book.Name = book_info_match.Groups["name"].Value.Trim(); 41 book.Author = book_info_match.Groups["author"].Value.Trim(); 42 book.PhotoUrl = @"http://www.xqishuta.com" + book_info_match.Groups["photo"].Value; 43 var lastUpdateTime = DateTime.Now; 44 if (DateTime.TryParse(book_info_match.Groups["updateTime"].Value.Trim(), out lastUpdateTime)) 45 book.LastUpdateTime = lastUpdateTime; 46 } 47 48 var matches = regex_section_list.Matches(content); 49 if (null != matches && 0 < matches.Count) 50 { 51 book.Sections = new Dictionary<int, string>(); 52 var section_id = 0; 53 foreach (Match match in matches) 54 { 55 if (!int.TryParse(match.Groups["section_id"].Value, out section_id)) continue; 56 57 if (!book.Sections.ContainsKey(section_id)) 58 book.Sections.Add(section_id, null); 59 book.Sections[section_id] = match.Groups["section_name"].Value.Trim(); 60 61 var newContext = new JobContext 62 { 63 JobName = $"“奇书网-电子书-{section_id}章节内容”", 64 Uri = $"http://www.xqishuta.com/du/{book.Id / 1000}/{book.Id}/{section_id}.html", 65 Method = WebRequestMethods.Http.Get, 66 InParams = new Dictionary<string, object>(), 67 Analizer = new BookSectionAnalizer(), 68 }; 69 70 newContext.InParams.Add(Consts.LAST_UPDATE_TIME, context.InParams[Consts.LAST_UPDATE_TIME]); 71 newContext.InParams.Add(Consts.BOOK_ID, bookId); 72 newContext.InParams.Add(Consts.BOOK_SECTION_ID, section_id); 73 newContext.InParams.Add(Consts.BOOK, book); 74 75 leader.Queue.Enqueue(newContext); 76 } 77 } 78 } 79 } 80 }
1 namespace MikeWare.Crawlers.EBooks.Bizs 2 { 3 using MikeWare.Core.Components.CrawlerFramework; 4 using MikeWare.Crawlers.EBooks.Entities; 5 using System.Collections.Generic; 6 using System.IO; 7 using System.Text; 8 using System.Text.RegularExpressions; 9 10 public class BookSectionAnalizer : ACrawlerAnalizer 11 { 12 private static Encoding encoding = new UTF8Encoding(false); 13 private static Regex regex_section_content = new Regex(@"(?<=<div[^""]+""content1"">)(?<content>.+?)(?=(<p [^<]+</p>)?</div>)", RegexOptions.Singleline); 14 private static Regex regex_html_tag = new Regex(@"(<(w+?)[^>]+>[^<>]+?</2>)|(<(w+?)[^/>]+/>)|&[^;]+;"); 15 16 public override void Analize(LeaderAnt leader, JobContext context) 17 { 18 if (null == context.InParams) return; 19 20 var content = encoding.GetString(context.Buffer); 21 var match = regex_section_content.Match(content); 22 23 if (null != match && match.Success) 24 { 25 if (!context.InParams.ContainsKey(Consts.BOOK) || null == context.InParams[Consts.BOOK]) 26 return; 27 28 var section_id = -1; 29 30 if (!context.InParams.ContainsKey(Consts.BOOK_SECTION_ID) 31 || !int.TryParse(context.InParams[Consts.BOOK_SECTION_ID].ToString(), out section_id)) 32 return; 33 34 content = regex_html_tag.Replace(match.Groups["content"].Value, string.Empty); 35 var builder = new StringBuilder(); 36 using (var reader = new StringReader(content)) 37 { 38 while (0 < reader.Peek()) 39 { 40 var line = reader.ReadLine().Trim(); 41 if (!string.IsNullOrEmpty(line)) builder.AppendLine(line); 42 } 43 } 44 45 var book = context.InParams[Consts.BOOK] as Book; 46 if (null == book.SectionContents) book.SectionContents = new Dictionary<int, string>(); 47 48 if (!book.SectionContents.ContainsKey(section_id)) book.SectionContents[section_id] = builder.ToString(); 49 builder.Clear(); 50 } 51 52 //Console.WriteLine(book.SectionContents[sectionId]); 53 //Console.WriteLine($"{book.Id} - {sectionId} Finished."); 54 } 55 } 56 }
解析器一方面的职责呢,就是解析下载下来的数据,另一方面呢,也根据解析结果,来拼凑出下一步任务,指定该任务的必要如参和对应的解析器,并丢到任务池中。这样,解析器和下一步任务的执行就解开耦合。
在解析器中也提供了一个修改任务参数的机会,我们甚至可以对任务的参数进行任意的排列组合;
同时,在一个解析器中,也可以产生多个子任务;比如,我们在BooksListAnalizer中,一方面产生了采集书籍章节列表的任务,另一方面呢,又产生了采集翻页的任务;
还有其他一些重构的零碎的小点,就不一一列出了。
这里,在抛出一个小问题,如下图所示:
当我们运行几十秒之后,观察一下队列,发现它很长,这是为什么呢,怎么应对呢?我们下节继续,如何制定一些并发策略:)
喜欢本系列丛书的朋友,可以点击链接加入QQ交流群(994761602)【C# 破境之道】
方便各位在有疑问的时候可以及时给我个反馈。同时,也算是给各位志同道合的朋友提供一个交流的平台。
需要源码的童鞋,也可以在群文件中获取最新源代码。