zoukankan      html  css  js  c++  java
  • Apache Nutch 1.3 学习笔记五(FetchThread)

    上一节看了Fetcher中主要几个类的实现,这一节会来分析一下其中用到的消费者FetcherThread,来看看它是干嘛的。

    1. FetcherMapp模型

    Fetcher.java代码中可以看到,Fetcher继承自MapRunable,它是Mapper的抽象接口,实现这个接口的子类能够更好的对Map的流程进行控制,包括多线程与异步Maper

    1.1 Fetcher的入口函数fetch(Path segment,int threads, boolean parsing)

    下面是它的源代码,来分析一下:

     

    1. // 对配置进行检测,看一些必要的配置是否已经配置了,如http.agent.name等参数  
    2.         checkConfiguration();  
    3.     
    4.     
    5.         // 记录fetch的开始时间  
    6.         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
    7.         long start = System.currentTimeMillis();  
    8.         if (LOG.isInfoEnabled()) {  
    9.             LOG.info("Fetcher: starting at " + sdf.format(start));  
    10.           LOG.info("Fetcher: segment: " + segment);  
    11.         }  
    12.     
    13.     
    14.         // 这里对抓取的时候进行限制,FetchItemQueue中会用到这个参数  
    15.         // set the actual time for the timelimit relative  
    16.         // to the beginning of the whole job and not of a specific task  
    17.         // otherwise it keeps trying again if a task fails  
    18.         long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);  
    19.         if (timelimit != -1) {  
    20.           timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);  
    21.           LOG.info("Fetcher Timelimit set for : " + timelimit);  
    22.           getConf().setLong("fetcher.timelimit", timelimit);  
    23.         }  
    24.             
    25.         // 生成一个NutchMap-Reduce配置  
    26.         JobConf job = new NutchJob(getConf());  
    27.         job.setJobName("fetch " + segment);  
    28.         
    29.         // 配置抓取线程数,  
    30.         job.setInt("fetcher.threads.fetch", threads);  
    31.         job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());  
    32.         // 配置是否对抓取的内容进行解析  
    33.         job.setBoolean("fetcher.parse", parsing);  
    34.         
    35.         // for politeness, don't permit parallel execution of a single task  
    36.         job.setSpeculativeExecution(false);  
    37.         
    38.         // 配置输出的路径名  
    39.         FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));  
    40.         // 配置输入的文件格式,这里类继承自SequenceFileInputFormat  
    41.         // 它主要是覆盖了其getSplits方法,其作用是不对文件进行切分,以文件数量作为splits的依据  
    42.         // 就是有几个文件,就有几个Map操作  
    43.         job.setInputFormat(InputFormat.class);  
    44.         
    45.         // 配置Map操作的类  
    46.         job.setMapRunnerClass(Fetcher.class);  
    47.         
    48.         // 配置输出路径  
    49.         FileOutputFormat.setOutputPath(job, segment);  
    50.         // 这里配置输出文件方法,这个类在前面已经分析过  
    51.         job.setOutputFormat(FetcherOutputFormat.class);  
    52.         // 配置输出<key,value>类型  
    53.         job.setOutputKeyClass(Text.class);  
    54.         job.setOutputValueClass(NutchWritable.class);  
    55.         
    56.         JobClient.runJob(job);  

    1.2 Fetcherrun方法分析

      这个是Map类的入口,用于启动抓取的生产者与消费者,下面是部分源代码:

     

    1. // 生成生产者,用于读取Generate出来的CrawlDatum,把它们放到共享队列中  
    2.     feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);  
    3.     //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);  
    4.       
    5.     // the value of the time limit is either -1 or the time where it should finish  
    6.     long timelimit = getConf().getLong("fetcher.timelimit", -1);  
    7.     if (timelimit != -1) feeder.setTimeLimit(timelimit);  
    8.     feeder.start();  
    9.     
    10.     
    11.     // set non-blocking & no-robots mode for HTTP protocol plugins.  
    12.     getConf().setBoolean(Protocol.CHECK_BLOCKING, false);  
    13.     getConf().setBoolean(Protocol.CHECK_ROBOTS, false);  
    14.       
    15. // 启动消费者线程  
    16.     for (int i = 0; i < threadCount; i++) {       // spawn threads  
    17.       new FetcherThread(getConf()).start();  
    18.     }  
    19.     
    20.     
    21.     // select a timeout that avoids a task timeout  
    22.     long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;  
    23.     
    24.     
    25. // 这里用一个循环来等待线程结束  
    26.     do {                                          // wait for threads to exit  
    27.       try {  
    28.         Thread.sleep(1000);  
    29.       } catch (InterruptedException e) {}  
    30.     
    31.     
    32.     // 这个函数是得到相前线程的抓取状态,如抓取了多少网页,多少网页抓取失败,抓取速度是多少  
    33.       reportStatus();  
    34.         LOG.info("-activeThreads=" + activeThreads + "spinWaiting=" + spinWaiting.get()  
    35.             + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());  
    36.     
    37.     
    38. // 输出抓取队列中的信息  
    39.      if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {  
    40.        fetchQueues.dump();  
    41.      }  
    42.         
    43.     // 查看timelimit的值,这里只要返回的hitByTimeLimit不为0,checkTimelimit方法会清空抓取队列中的所有数据  
    44.      // check timelimit  
    45.      if (!feeder.isAlive()) {  
    46.         int hitByTimeLimit = fetchQueues.checkTimelimit();  
    47.          if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus",  
    48.             "hitByTimeLimit", hitByTimeLimit);  
    49.      }  
    50.         
    51.     // 查看抓取抓取线程是否超时,如果超时,就退出等待  
    52.         // some requests seem to hang, despite all intentions  
    53.         if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {  
    54.         if (LOG.isWarnEnabled()) {  
    55.           LOG.warn("Aborting with "+activeThreads+" hung threads.");  
    56.         }  
    57.         return;  
    58.         }  
    59.     
    60.     
    61.     } while (activeThreads.get() > 0);  
    62.     LOG.info("-activeThreads=" + activeThreads);  

    2. Fetcher.FetcherThread

    2.1 这个类主要是用来从队列中得到FetchItem,下面来看一下其run方法,其大概做了几件事:

    • 从抓取队列中得到一个FetchItem,如果返回为null,判断生产者是否还活着或者队列中是否还有数据,  如果队列中还有数据,那就等待,如果上面条件没有满足,就认为所有FetchItem都已经处理完了,退出当前抓取线程
    • 得到FetchItem, 抽取其url,从这个url中分析出所使用的协议,调用相应的plugin来解析这个协议
    • 得到相当urlrobotRules,看是否符合抓取规则,如果不符合或者其delayTime大于我们配置的maxDelayTime,那就不抓取这个网页
    • 对网页进行抓取,得到其抓取的Content和抓取状态,调用FetchItemQueuesfinishFetchItem方法,表明当前url已经抓取完成
    • 根据抓取协议的状态来进行下一步操作
    1. 如果状态为WOULDBLOCK,那就进行retry,把当前url放加FetchItemQueues中,进行重试
    2. 如果是MOVED或者TEMP_MOVED,这时这个网页可以被重定向了,对其重定向的内容进行解析,得到重定向的网址,这时要生成一个新的FetchItem,根据其QueueID放到相应的队列的inProgress集合中,然后再对这个重定向的网页进行抓取
    3. 如果状态是EXCEPTION,对当前url所属的FetchItemQueue进行检测,看其异常的网页数有没有超过最大异常网页数,如果大于,那就清空这个队列,认为这个队列中的所有网页都有问题。
    4. 如果状态是RETRY或者是BLOCKED,那就输出CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮进行重新抓取
    5. 如果状态是GONE,NOTFOUND,ACCESS_DENIED,ROBOTS_DENIED,那就输出CrawlDatum,设置其状态为STATUS_FETCH_GONE,可能在下一轮中就不进行抓取了,
    6. 如果状态是NOTMODIFIED,那就认为这个网页没有改变过,那就输出其CrawlDatum,将其状态设成成STATUS_FETCH_NOTMODIFIED.
    7. 如果所有状态都没有找到,那默认输出其CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮抓取中再重试
    • 判断网页重定向的次数,如果超过最大重定向次数,就输出其CrawlDatum,将其状态设置成STATUS_FETCH_GONE


    这里有一些细节没有说明,如网页被重定向以后如果操作,相应的协议是如果产生的,这个是通过插件产生的,具体插件是怎么调用的,这里就不说了,以后有机会会再分析一下。

    2.2 下面分析FetcherThread中的另外一个比较重要的方法,就是output

    具体这个output大概做了如下几件事:

    • 判断抓取的content是否为空,如果不为空,那调用相应的解析插件来对其内容进行解析,然后就是设置当前url所对应的CrawlDatum的一些参数,如当前内容的MD5码,分数等信息
    • 然后就是使用FetchOutputFormat输出当前urlCrawlDatum,Content和解析的结果ParseResult

    下面分析一下FetcherOutputFormat中所使用到的ParseOutputFormat.RecordWriter
    在生成相应的ParseOutputFormatRecordWriter过程中,这个RecordWriter会再生成三个RecordWriter来写出parse_text(MapFile),parse_data(MapFile)crawl_parse(SequenceFile),我们在segments下具体的segment中看到的三个这样的目录就是这个对象生成的,分别输出了网页的源代码;网页的解析数据,如网页title、外链接、元数据、状态等信息,这里会对外链接进行过滤、规格化,并且用插件计算每一个外链接的初始分数;另一个是网页解析后的CrawlDatum对象,这里会分析当前CrawlDatum中的metadata,从中生成两种新的CrawlDatum,还有就是它会对外链接生成相应的CrawlDatum,放入crawl_parse目录中,这里我还没有看明白。


    3. 总结

    有点晕了,这里的代码有点复杂,我们来整理一下思路。

    3.1 从目录生成的角度 

    • Generate后会在segments目录下生成一些要抓取的具体的segment,这里每一个segment下会有一个叫crawl_generate的目录,其中放着要抓取CrawlDatum信息
    • Fetch的时候,会输出另外五个目录
    1. content: 这个目录只有在配置了要输出抓取内容时才会输出
    2. crawl_fetch: 这个目录是输出抓取成功后的CrawlDatum信息,这里是对原来crawl_generate目录中的信息进行了一些修改,下面三个目录只有配置了解析参数后才会输出,如果后面调用bin/nutch parse命令
    3. parse_text: 这个目录存放了抓取的网页内容,以提后面建立索引用
    4. parse_data: 这里存入了网页解析后的一些数据,如网页title,外链接信息等
    5. crawl_parse: 这里存储了一些新生成的CrawlDatum信息,如外链接等,以供下一次迭代抓取使用

    3.2 从数据流的角度

    • Generate生成的CrawlDatum数据首先经过QueueFeeder生产者,放入共享队列
    • 多个消费者(FetcherThread)从共享队列中取得要抓取的FetchItem数据
    • FetchItem所对应的url进行抓取,得到相应的抓取内容,对抓取的状态进行判断,回调相应的操作
    • 对抓取的内容进行解析,产生网页的外链接,生成新的CrawlDatum抓取数据,产生解析后的数据
    • 调用FetcherOutputFormat.Writer对象,把CrawlDatum,Content,ParseResult分别写入crawl_fetch,content,(parse_text,parse_data,crawl_parse)目录中


    好了,Fetcher的分析也差不多了,可能有一些细节还没有分析到,下面有机会再补上吧。

    作者:http://blog.csdn.net/amuseme_lu


    相关文章阅读及免费下载:

    Apache Nutch 1.3 学习笔记目录

    Apache Nutch 1.3 学习笔记一

    Apache Nutch 1.3 学习笔记二

    Apache Nutch 1.3 学习笔记三(Inject)

    Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)

    Apache Nutch 1.3 学习笔记四(Generate)

    Apache Nutch 1.3 学习笔记四(SegmentReader分析)

    Apache Nutch 1.3 学习笔记五(FetchThread)

    Apache Nutch 1.3 学习笔记五(Fetcher流程)

    Apache Nutch 1.3 学习笔记六(ParseSegment)

    Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)

    Apache Nutch 1.3 学习笔记八(LinkDb)

    Apache Nutch 1.3 学习笔记九(SolrIndexer)

    Apache Nutch 1.3 学习笔记十(Ntuch 插件机制简单介绍)

    Apache Nutch 1.3 学习笔记十(插件扩展)

    Apache Nutch 1.3 学习笔记十(插件机制分析)

    Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)

    Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)

    Apache Nutch 1.3 学习笔记十二(Nutch 2.0 的主要变化)

    更多《Apache Nutch文档》,尽在开卷有益360 http://www.docin.com/book_360

  • 相关阅读:
    python 标准库和第3方库的介绍
    delphi延时函数
    delphi控件的安装
    Hadoop 第一课:linux常用命令
    复习 struts1 开发步骤
    struts学习:传统方法完成struts注册表单校验与回显数据
    struts入门第一个案例
    Java Ant build.xml详解
    HTTP 方法:GET 对比 POST
    HTTP 返回的状态消息
  • 原文地址:https://www.cnblogs.com/ibook360/p/2222168.html
Copyright © 2011-2022 走看看