1. Fetcher模块的简单介绍
Fetcher这个模块在Nutch中有单独一个包在实现,在org.apache.nutch.fetcher,其中有Fetcher.java, FetcherOutput 和FetcherOutputFormat来组成,看上去很简单,但其中使用到了多线程,多线程的生产者与消费者模型,MapReduce的多路径输出等方法。
下面我们来看一下Fetcher的注释,从中我们可以得到很多有用的信息。
首先,这是一种基于队列的fetcher方法,它使用了一种经典的线程模型,生产者(a-QueueFeeder)与消费者(many-FetcherThread)模型,注意,这里有多个消费者。生产者从Generate产生的fetchlists中分类得到一批FetchItemQueue,每一个FetchItmeQueue都是由一类相同host的FetchItem组成,这些FetchItem是用来描述被抓取的对象。当一个FetchItem从FetchItemQueue中取出后,QueueFeeder这个生产者会不断的向队列中加入新的FetchItem,直到这个队列满了为止或者已经没有fetchlist可读取,当队列中的所有FetchItem都被抓取完成后,所有抓取线程都会退出运行。每一个FetchItemQueue都有一套自己的抓取策略,如最大的并行抓取个数,两次抓取的间隔等,如果当FetcherThread向队列申请一个FetchItem时,FetchItemQueue发现当前的FetchItem没有满足抓取策略,那这里它就会返回null,表达当前FetchItem还没有准备好被抓取。如果这些所有FetchItem都没有准备好被抓取,那这时FetchThread就会进入等待状态,直到条件满足被促发或者是等待超时,它会认为任务已经被挂起,这时FetchThread会自动退出。
2. FetcherOutputFormat的介绍
这个类是用来把FetcherOutput对象切分到不同的Map文件中的,也就是说它会根据对象的类型来判断输出到哪一个文件中,这里用到了一个多文件的输出。
FetcherOutputFormat继承自MapReduce框架的OutputFormat模板,其输出的<key,value>类型为<Text,NutchWritable>。
这里的OutputFormat定义了Map-Reduce任务的输出描述,Map-Reduce框架依赖任务的OutputFormat来做如下二件事情,一是用来验证输出源的可用性,如是否已经建立了相应的目录,数据库是否已经连上;另一件事是提供RecordWriter抽象来对数据进行写出到特定的数据源,一般输出文件定义在FileSystem里面。
FetcherOutputFormat主要是实现了getRecordWriter这个方法,用于得到相应的数据写出对象,我们来分析一下其源代码:
-
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
-
final JobConf job,
-
final String name,
-
final Progressable progress) throws IOException {
-
// 定义输出目录
-
Path out = FileOutputFormat.getOutputPath(job);
-
// 定义抓取的输出目录
-
final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
-
// 定义抓取内容的输出目录
-
final Path content = new Path(new Path(out, Content.DIR_NAME), name);
-
// 定义数据压缩格式
-
final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
-
-
-
// 定义抓取的输出抽象类
-
final MapFile.Writer fetchOut =
-
new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
-
compType, progress);
-
-
// 这里使用了inner class来定义相应的RecordWriter
-
return new RecordWriter<Text, NutchWritable>() {
-
private MapFile.Writer contentOut;
-
private RecordWriter<Text, Parse> parseOut;
-
-
-
{
-
// 这里看如果Fetcher定义了输出内容,就生成相应的Content输出抽象
-
if (Fetcher.isStoringContent(job)) {
-
contentOut = new MapFile.Writer(job, fs, content.toString(),
-
Text.class, Content.class,
-
compType, progress);
-
}
-
// 如果Fetcher对抓取的内容进行了解析,这里就定义相应的解析输出抽象
-
// 注意这里使用了ParseOutputFormat的getReocrdWriter,主要是解析网页,抽取其外链接
-
if (Fetcher.isParsing(job)) {
-
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
-
}
-
}
-
-
-
public void write(Text key, NutchWritable value)
-
throws IOException {
-
-
-
Writable w = value.get();
-
// 对对象类型进行判断,调用相应的抽象输出,写到不同的文件中去
-
if (w instanceof CrawlDatum)
-
fetchOut.append(key, w);
-
else if (w instanceof Content)
-
contentOut.append(key, w);
-
else if (w instanceof Parse)
-
parseOut.write(key, (Parse)w);
-
}
-
-
-
public void close(Reporter reporter) throws IOException {
-
fetchOut.close();
-
if (contentOut != null) {
-
contentOut.close();
-
}
-
if (parseOut != null) {
-
parseOut.close(reporter);
-
}
-
}
-
-
-
};
3. 生产者QueueFeeder的介绍
这个类作用是用于生产被抓取的FetchItem对象,把其放入抓取队列中。下来我们来对其源代码进行分析
-
// 这个类继承自Thread,是用一个单独的线程来做的
-
private static class QueueFeeder extends Thread {
-
private RecordReader<Text, CrawlDatum> reader; // 这里是InputFormat产生的ReocrdReader,用于读取Generate的产生的数据
-
private FetchItemQueues queues; // 这是生产者与消费者所使用的共享队列,这个对列是分层的,分一层对应一个host
-
private int size; // 队列的大小
-
private long timelimit = -1; // 这是一个过滤机制的策略,用于过滤所有的FetchItem
-
-
// 构造方法
-
public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
-
FetchItemQueues queues, int size) {
-
this.reader = reader;
-
this.queues = queues;
-
this.size = size;
-
this.setDaemon(true);
-
this.setName("QueueFeeder");
-
}
-
-
public void setTimeLimit(long tl) {
-
timelimit = tl;
-
}
-
-
-
// 函数的run方法
-
public void run() {
-
boolean hasMore = true; // while的循环条件
-
int cnt = 0;
-
int timelimitcount = 0;
-
while (hasMore) {
-
// 这里判断是否设置了这个过滤机制,如果设置了,判断相前时间是否大于这个timelimit,如果大于timelimit,过滤所有的FetchItem
-
if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
-
// enough .. lets' simply
-
// read all the entries from the input without processing them
-
try {
-
// 读出<key,value>对,过滤之
-
Text url = new Text();
-
CrawlDatum datum = new CrawlDatum();
-
hasMore = reader.next(url, datum);
-
timelimitcount++;
-
} catch (IOException e) {
-
LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
-
return;
-
}
-
continue; // 过滤之
-
}
-
int feed = size - queues.getTotalSize();
-
// 判断剩余的队列空间是否为0
-
if (feed <= 0) {
-
// queues are full - spin-wait until they have some free space
-
try {
-
// 休息1秒种
-
Thread.sleep(1000);
-
} catch (Exception e) {};
-
continue;
-
} else {
-
LOG.debug("-feeding " + feed + " input urls ...");
-
// 如果队列还有空间(feed>0)并且recordRedder中还有数据(hasMore)
-
while (feed > 0 && hasMore) {
-
try {
-
Text url = new Text();
-
CrawlDatum datum = new CrawlDatum();
-
// 读出<key,value>
-
hasMore = reader.next(url, datum);
-
if (hasMore) { // 判断是否成功读出数据
-
queues.addFetchItem(url, datum); // 放入对列,这个队列应该是thread-safe的,下面我们可以看到
-
cnt++; // 统计总数
-
feed--; // 剩余队列空间减1
-
}
-
} catch (IOException e) {
-
LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
-
return;
-
}
-
}
-
}
-
}
-
LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :"
-
+ timelimitcount);
-
}
-
}
这个类主要负责向队列中放数据。
4. 下面我们来看一下这个队列是如果工作的
这里的共享对列主要如三个类组成,一个是FetchItem,存储队列中的元素;另一个是FetchItemQueue,用于存储相同host的FetchItem,最后一个是FetchItemQueues,看名字我们就知道,这是用于存储所有的FetchItemQueue的。
4.1 先让我们来看一下FetchItem的结构:
-
FetchItem =>
-
{
-
queueID:String, // 用于存储队列的ID号
-
url:Text, // 用于存储CrawlDatum的url地址
-
u:URL, // 也是存储url,但是以URL的类型来存储,不过我看了一下,这东东在判断RobotRules的时候用了一下
-
datum:CrawlDatum // 这是存储抓取对象的一些元数据信息àà
-
}
下面我们来看一下它的create方法,是用来生成相应的FetchItem的,源代码如下:
-
//从注释中我们可以看到,队列ID是由protocol+hotname或者是protocol+IP组成的
-
/** Create an item. Queue id will be created based on <code>byIP</code>
-
* argument, either as a protocol + hostname pair, or protocol + IP
-
* address pair.
-
*/
-
public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {
-
String queueID;
-
URL u = null;
-
try {
-
u = new URL(url.toString()); // 得到其URL
-
} catch (Exception e) {
-
LOG.warn("Cannot parse url: " + url, e);
-
return null;
-
}
-
// 得到协议号
-
String proto = u.getProtocol().toLowerCase();
-
String host;
-
if (byIP) {
-
// 如果是基于IP的,那得到其IP地址
-
try {
-
InetAddress addr = InetAddress.getByName(u.getHost());
-
host = addr.getHostAddress();
-
} catch (UnknownHostException e) {
-
// unable to resolve it, so don't fall back to host name
-
LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
-
return null;
-
}
-
} else {
-
// 否则得到Hostname
-
host = u.getHost();
-
if (host == null) {
-
LOG.warn("Unknown host for url: " + url + ", skipping.");
-
return null;
-
}
-
hosthost = host.toLowerCase(); // 统一变小写
-
}
-
// 得成相应的队列ID号,放入FetchItemQueue中
-
queueID = proto + "://" + host;
-
return new FetchItem(url, u, datum, queueID);
-
}
4.2 下面我们来看一下FetchQueue的组成结构
这个类主要是用于收集相同QueueID的FetchItem对象,对正在抓取的FetchItem进行跟踪,使用的是一个inProgress集合,还有计算两次请求的间隔时间,我们来看一下其结构:
-
FetchQueue =>
-
{
-
// 用于收集相同QueueID的FetchItem, 这里使用的是线程安全的对象
-
List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
-
// 用于收集正在抓取的FetchItem
-
Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
-
// 用于存储下一个FetchItem的抓取时候,如果没有到达这个时间,就返回给FetchThread为null
-
AtomicLong nextFetchTime = new AtomicLong();
-
// 存储抓取的出错次数
-
AtomicInteger exceptionCounter = new AtomicInteger();
-
// 存储FetchItem抓取间隔,这个配置只有当同时抓取最大线程数为1时才有用
-
long crawlDelay;
-
// 存储最小的抓取间隔,这个配置当同时抓取的最大线程数大于1时有用
-
long minCrawlDelay;
-
// 同时抓取的最大线程数
-
int maxThreads;
-
Configuration conf;
-
}
我们主要还看一下其getFetchItem方法:
-
public FetchItem getFetchItem() {
-
// 当正在抓取的FetchItem数大于同时抓取的线程数时,返回null,这是一个politness策略
-
// 就是对于同一个网站,不能同时有大于maxThreads个线程在抓取,不然人家网站会认为你是在攻击它
-
if (inProgress.size() >= maxThreads) return null;
-
long now = System.currentTimeMillis();
-
// 计算两个抓取的间隔时间,如果没有到达这个时间,就返回null,这个是保证不会有多个线程同时在抓取一个网站
-
if (nextFetchTime.get() > now) return null;
-
FetchItem it = null;
-
// 判断队列是否为空
-
if (queue.size() == 0) return null;
-
try {
-
// 从准备队列中移除一个FetchItem,把其放到inProcess集合中
-
it = queue.remove(0);
-
inProgress.add(it);
-
} catch (Exception e) {
-
LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
-
}
-
return it;
-
}
这里还有一个方法是finishFetchItem,就是当这个FetchItem被抓了完成后,会调用这个方法,这个方法会把这个FetchTime从inProgress集合中删除,然后再更新一下nextFetchTime,nextFetchTime = endTime + (maxThread > 1) ? minCrawlDelay : crawlDelay)
4.3 下面再来看一下FetchItemQueues
这个类主要是用来管理FetchItemQueue,下面介绍一下其主要的几个方法:
- synchronized addFetchItem(FetchItem it): 是用来把FetchItem根据其QueueID号放到对应的FetchItemQueue中
- synchronized getFetchItem() : 它是遍历FetchItemQueue,从中得到一个FetchItem,如果没有就返回null
- synchronized checkExceptionThreshold : 用于查看特定FetchItemQueue的抓取失败次数,当这个次数大于maxExceptionsPerQueue时,就清空这个FetchItemQueue中的其它FetchItem.
5. 总结
这里我们看了一下Fetcher的抓取模型和其中使用的一些主要的类结构,还有FetcherOutputFormat的多文件轮子,下来我会对FetcherThread进行分析。
作者:http://blog.csdn.net/amuseme_lu
相关文章阅读及免费下载:
《Apache Nutch 1.3 学习笔记三(Inject)》
《Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)》
《Apache Nutch 1.3 学习笔记四(Generate)》
《Apache Nutch 1.3 学习笔记四(SegmentReader分析)》
《Apache Nutch 1.3 学习笔记五(FetchThread)》
《Apache Nutch 1.3 学习笔记五(Fetcher流程)》
《Apache Nutch 1.3 学习笔记六(ParseSegment)》
《Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)》
《Apache Nutch 1.3 学习笔记八(LinkDb)》
《Apache Nutch 1.3 学习笔记九(SolrIndexer)》
《Apache Nutch 1.3 学习笔记十(Ntuch 插件机制简单介绍)》
《Apache Nutch 1.3 学习笔记十(插件扩展)》
《Apache Nutch 1.3 学习笔记十(插件机制分析)》
《Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)》
《Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)》