这里主要看一下CrawlDb中的updatedb,它主要是用来更新CrawlDb数据库的
1. bin/nutch updatedb
我们用nutch的命令行时会看到一个方法叫updatedb,其实这个方法就是调用CrawlDb.java类中的update方法,它的参数帮助如下:
-
Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions]
-
crawldb CrawlDb to update
-
-dir segments parent directory containing all segments to update from
-
seg1 seg2 ... list of segment names to update from
-
-force force update even if CrawlDb appears to be locked (CAUTION advised)
-
-normalize use URLNormalizer on urls in CrawlDb and segment (usually not needed)
-
-filter use URLFilters on urls in CrawlDb and segment
-
-noAdditions only update already existing URLs, don't add any newly discovered URLs
2. 下面我们来分析一下其update方法到底做了些什么
2.1 update的任务提交参数,部分代码如下
-
// 生成一个新的任务,这里面也做了一些相应的配置,
-
// 加入了current目录,就是初始的CrawlDb目录,设置了输入格式为SequenceFileInputFormat
-
// 配置了Map-Reducer为CrawlDbFilter-CrawlDbReducer
-
// 配置了输出格式为MapFileOutputFormat
-
// 还配置了输出的<key,value>类型<Text,CrawlDatum>
-
JobConf job = CrawlDb.createJob(getConf(), crawlDb);
-
// 配置一些参数
-
job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed);
-
job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
-
job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize);
-
// 加入输入目录,一个是crawl_fetch,另一个是crawl_parse
-
for (int i = 0; i < segments.length; i++) {
-
Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
-
Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME);
-
if (fs.exists(fetch) && fs.exists(parse)) {
-
FileInputFormat.addInputPath(job, fetch);
-
FileInputFormat.addInputPath(job, parse);
-
} else {
-
LOG.info(" - skipping invalid segment " + segments[i]);
-
}
-
}
2.2 分析一下其任务的Map-Reducer做了些什么
CrawlDbFilter主要是对url进行过滤和正规化。
CrawlDbReducer主要是用来聚合相同url(老的与新产生的)的,这东东写得很复杂,下面来分析一下其源代码:
-
public void reduce(Text key, Iterator<CrawlDatum> values,
-
OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-
throws IOException {
-
-
-
CrawlDatum fetch = new CrawlDatum();
-
CrawlDatum old = new CrawlDatum();
-
-
-
boolean fetchSet = false;
-
boolean oldSet = false;
-
byte[] signature = null;
-
boolean multiple = false; // avoid deep copy when only single value exists
-
linked.clear();
-
org.apache.hadoop.io.MapWritable metaFromParse = null;
-
-
// 这个循环主要是遍历所有相同url的value(CrawlDatum)值,对old和fetch两个变量进行赋值。
-
// 和收集其外链接,把它们放入一个按分数排序的优先队列中去
-
while (values.hasNext()) {
-
CrawlDatum datum = (CrawlDatum)values.next();
-
// 判断是否要对CrawlDatum进行深度复制
-
if (!multiple && values.hasNext()) multiple = true;
-
// 判断CrawlDatum中是否有数据库相关的参数,如STATUS_DB_(UNFETCHED|FETCHED|GONE|REDIR_TEMP|REDIR_PERM|NOTMODIFIED)
-
if (CrawlDatum.hasDbStatus(datum)) {
-
if (!oldSet) {
-
if (multiple) {
-
old.set(datum);
-
} else {
-
// no need for a deep copy - this is the only value
-
old = datum;
-
}
-
oldSet = true;
-
} else {
-
// always take the latest version
-
// 总是得到最新的CrawlDatum版本
-
if (old.getFetchTime() < datum.getFetchTime()) old.set(datum);
-
}
-
continue;
-
}
-
-
-
// 判断CrawlDatum是否有关抓取的状态,如STATUS_FETCH_(SUCCESS|RETRY|REDIR_TEMP|REDIR_PERM|GONE|NOTMODIFIED)
-
if (CrawlDatum.hasFetchStatus(datum)) {
-
if (!fetchSet) {
-
if (multiple) {
-
fetch.set(datum);
-
} else {
-
fetch = datum;
-
}
-
fetchSet = true;
-
} else {
-
// always take the latest version
-
if (fetch.getFetchTime() < datum.getFetchTime()) fetch.set(datum);
-
}
-
continue;
-
}
-
-
-
// 根据CrawlDatum的状态来收集另一些信息
-
switch (datum.getStatus()) { // collect other info
-
// 如果这个CrawlDatum是一个外链接,那放入一个优先队列中,按分数的降序来做
-
case CrawlDatum.STATUS_LINKED:
-
CrawlDatum link;
-
if (multiple) {
-
link = new CrawlDatum();
-
link.set(datum);
-
} else {
-
link = datum;
-
}
-
linked.insert(link);
-
break;
-
case CrawlDatum.STATUS_SIGNATURE:
-
// 得到其唯一ID号
-
signature = datum.getSignature();
-
break;
-
case CrawlDatum.STATUS_PARSE_META:
-
// 得到其元数据
-
metaFromParse = datum.getMetaData();
-
break;
-
default:
-
LOG.warn("Unknown status, key: " + key + ", datum: " + datum);
-
}
-
}
-
-
// copy the content of the queue into a List
-
// in reversed order
-
int numLinks = linked.size();
-
List<CrawlDatum> linkList = new ArrayList<CrawlDatum>(numLinks);
-
for (int i = numLinks - 1; i >= 0; i--) {
-
linkList.add(linked.pop());
-
}
-
-
// 如果这个CrawlDatum集合中没有数据库相关的状态(也就是说没有这个url的原始状态)或者配置了不添加外链接,直接返回
-
// if it doesn't already exist, skip it
-
if (!oldSet && !additionsAllowed) return;
-
-
// if there is no fetched datum, perhaps there is a link
-
// 如果这个CrawlDatum集合中没有和抓取相关的状态,并且外链接数量要大于0
-
if (!fetchSet && linkList.size() > 0) {
-
fetch = linkList.get(0); // 得到第一个外链接
-
fetchSet = true;
-
}
-
-
// still no new data - record only unchanged old data, if exists, and return
-
// 如果没有抓取相头的状态,也没有外链接,也就是说这个CrawlDatum是老的,
-
if (!fetchSet) {
-
// 判断是否有和数据库相关的状态,有的话就输出,没有的话就直接返回
-
if (oldSet) {// at this point at least "old" should be present
-
output.collect(key, old);
-
} else {
-
LOG.warn("Missing fetch and old value, signature=" + signature);
-
}
-
return;
-
}
-
-
// 下面是用来初始化最新的CrawlDatum版本
-
if (signature == null) signature = fetch.getSignature();
-
long prevModifiedTime = oldSet ? old.getModifiedTime() : 0L;
-
long prevFetchTime = oldSet ? old.getFetchTime() : 0L;
-
-
-
// initialize with the latest version, be it fetch or link
-
result.set(fetch);
-
if (oldSet) {
-
// copy metadata from old, if exists
-
if (old.getMetaData().size() > 0) {
-
result.putAllMetaData(old);
-
// overlay with new, if any
-
if (fetch.getMetaData().size() > 0)
-
result.putAllMetaData(fetch);
-
}
-
// set the most recent valid value of modifiedTime
-
if (old.getModifiedTime() > 0 && fetch.getModifiedTime() == 0) {
-
result.setModifiedTime(old.getModifiedTime());
-
}
-
}
-
-
下面是用来确定其最新的状态
-
switch (fetch.getStatus()) { // determine new status
-
-
-
case CrawlDatum.STATUS_LINKED: // it was link
-
if (oldSet) { // if old exists
-
result.set(old); // use it
-
} else {
-
result = schedule.initializeSchedule((Text)key, result);
-
result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
-
try {
-
scfilters.initialScore((Text)key, result);
-
} catch (ScoringFilterException e) {
-
if (LOG.isWarnEnabled()) {
-
LOG.warn("Cannot filter init score for url " + key +
-
", using default: " + e.getMessage());
-
}
-
result.setScore(0.0f);
-
}
-
}
-
break;
-
-
case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch
-
case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected
-
case CrawlDatum.STATUS_FETCH_REDIR_PERM:
-
case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified
-
// determine the modification status
-
int modified = FetchSchedule.STATUS_UNKNOWN;
-
if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
-
modified = FetchSchedule.STATUS_NOTMODIFIED;
-
} else {
-
if (oldSet && old.getSignature() != null && signature != null) {
-
if (SignatureComparator._compare(old.getSignature(), signature) != 0) {
-
modified = FetchSchedule.STATUS_MODIFIED;
-
} else {
-
modified = FetchSchedule.STATUS_NOTMODIFIED;
-
}
-
}
-
}
-
// set the schedule
-
result = schedule.setFetchSchedule((Text)key, result, prevFetchTime,
-
prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), modified);
-
// set the result status and signature
-
if (modified == FetchSchedule.STATUS_NOTMODIFIED) {
-
result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED);
-
if (oldSet) result.setSignature(old.getSignature());
-
} else {
-
switch (fetch.getStatus()) {
-
case CrawlDatum.STATUS_FETCH_SUCCESS:
-
result.setStatus(CrawlDatum.STATUS_DB_FETCHED);
-
break;
-
case CrawlDatum.STATUS_FETCH_REDIR_PERM:
-
result.setStatus(CrawlDatum.STATUS_DB_REDIR_PERM);
-
break;
-
case CrawlDatum.STATUS_FETCH_REDIR_TEMP:
-
result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP);
-
break;
-
default:
-
LOG.warn("Unexpected status: " + fetch.getStatus() + " resetting to old status.");
-
if (oldSet) result.setStatus(old.getStatus());
-
else result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
-
}
-
result.setSignature(signature);
-
if (metaFromParse != null) {
-
for (Entry<Writable, Writable> e : metaFromParse.entrySet()) {
-
result.getMetaData().put(e.getKey(), e.getValue());
-
}
-
}
-
}
-
// if fetchInterval is larger than the system-wide maximum, trigger
-
// an unconditional recrawl. This prevents the page to be stuck at
-
// NOTMODIFIED state, when the old fetched copy was already removed with
-
// old segments.
-
if (maxInterval < result.getFetchInterval())
-
result = schedule.forceRefetch((Text)key, result, false);
-
break;
-
case CrawlDatum.STATUS_SIGNATURE:
-
if (LOG.isWarnEnabled()) {
-
LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key);
-
}
-
return;
-
case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure
-
if (oldSet) {
-
result.setSignature(old.getSignature()); // use old signature
-
}
-
result = schedule.setPageRetrySchedule((Text)key, result, prevFetchTime,
-
prevModifiedTime, fetch.getFetchTime());
-
if (result.getRetriesSinceFetch() < retryMax) {
-
result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
-
} else {
-
result.setStatus(CrawlDatum.STATUS_DB_GONE);
-
}
-
break;
-
-
-
case CrawlDatum.STATUS_FETCH_GONE: // permanent failure
-
if (oldSet)
-
result.setSignature(old.getSignature()); // use old signature
-
result.setStatus(CrawlDatum.STATUS_DB_GONE);
-
result = schedule.setPageGoneSchedule((Text)key, result, prevFetchTime,
-
prevModifiedTime, fetch.getFetchTime());
-
break;
-
-
-
default:
-
throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + key);
-
}
-
-
-
// 这里用来更新result的分数
-
try {
-
scfilters.updateDbScore((Text)key, oldSet ? old : null, result, linkList);
-
} catch (Exception e) {
-
if (LOG.isWarnEnabled()) {
-
LOG.warn("Couldn't update score, key=" + key + ": " + e);
-
}
-
}
-
// remove generation time, if any
-
result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
-
output.collect(key, result); // 写出数据
-
}
-
-
}
3. 总结
- 这里大概分析了一下CrawlDb的更新流程,有一些地方还是没有看得太明白,可能要通过测试来更深入的理解。
- 其中流程就是对三个目录进行合并,对相同的url的value(CrawlDatum)进行聚合,产生新的CarwlDatum,再写回原来的数据库中。
- 其复杂的地方在于如果对聚合后的结果进行处理,这个有空还要再看一下。
作者:http://blog.csdn.net/amuseme_lu
相关文章阅读及免费下载:
《Apache Nutch 1.3 学习笔记三(Inject)》
《Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)》
《Apache Nutch 1.3 学习笔记四(Generate)》
《Apache Nutch 1.3 学习笔记四(SegmentReader分析)》
《Apache Nutch 1.3 学习笔记五(FetchThread)》
《Apache Nutch 1.3 学习笔记五(Fetcher流程)》
《Apache Nutch 1.3 学习笔记六(ParseSegment)》
《Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)》
《Apache Nutch 1.3 学习笔记八(LinkDb)》
《Apache Nutch 1.3 学习笔记九(SolrIndexer)》
《Apache Nutch 1.3 学习笔记十(Ntuch 插件机制简单介绍)》
《Apache Nutch 1.3 学习笔记十(插件扩展)》
《Apache Nutch 1.3 学习笔记十(插件机制分析)》
《Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)》
《Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)》