InjectorJob实现的功能是:从种子站点文件当中读取站点信息并且将这些站点的个数、url(url以 域名:协议/端口号/路径名 设为形式存储在数据库当中,为了提高读写速度)回写到Context类的实例context当中。
InjectorJob类的运行流程如下:
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(),
args);
System.exit(res);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
主函数,提供函数的入口,主要功能是创建一个ToolRunner类,先去加载Nutch的配置文件,配置文件默认情况下加载nutch-default.xml和nutch-site.xml两个文件,接收命令行输入的参数args并创建一个InjectorJob类运行。
接下来,程序开始检查输入的参数是否合法等一系列操作:
public int run(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: InjectorJob <url_dir> [-crawlId <id>]");
return -1;
}
for (int i = 1; i < args.length; i++) {
if ("-crawlId".equals(args[i])) {
getConf().set(Nutch.CRAWL_ID_KEY, args[i + 1]);//??什么功能?
i++;
} else {
System.err.println("Unrecognized arg " + args[i]);
return -1;
}
}
try {
inject(new Path(args[0]));
return -0;
} catch (Exception e) {
LOG.error("InjectorJob: " + StringUtils.stringifyException(e));
return -1;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
如果没有输入参数,程序将提醒输入参数的正确方法,如果参数输入成功,则跳转到inject(new Path(args[0]))函数进行下一步的操作。
public void inject(Path urlDir) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("InjectorJob: starting at " + sdf.format(start));
LOG.info("InjectorJob: Injecting urlDir: " + urlDir);
run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));
long end = System.currentTimeMillis();
LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
+ TimingUtil.elapsedTime(start, end));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
该函数在打印了基本的日志信息之后跳转到run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));函数执行inject最核心的功能。
public Map<String, Object> run(Map<String, Object> args) throws Exception {
getConf().setLong("injector.current.time", System.currentTimeMillis()); //setLong(String name,long value). set the value of the name property to a long即将name的属性设置成为Long型的。
Path input;
Object path = args.get(Nutch.ARG_SEEDDIR);
if (path instanceof Path) {
input = (Path) path;
} else {
input = new Path(path.toString());
}
numJobs = 1;
currentJobNum = 0;
currentJob = NutchJob.getInstance(getConf(), "inject " + input);
FileInputFormat.addInputPath(currentJob, input);//add a path to the list of inputs for the map-reduce job(addInputPath函数的作用)
/**
* public void Job.setMapperClass(CLass<? extends Mapper> cls) throws IllegalStateException.
* 作用:set the Mapper for the job
*/
currentJob.setMapperClass(UrlMapper.class);
//set the key class for the map output data.This allows the user to specify the map output key class to be different than the final output value。为map流程的输出键值对设置相应的类型
currentJob.setMapOutputKeyClass(String.class);
//set the value class for the map output data
currentJob.setMapOutputValueClass(WebPage.class);
//为该Job设置输出格式,采用Gora格式进行存储
currentJob.setOutputFormatClass(GoraOutputFormat.class);
DataStore<String, WebPage> store = StorageUtils.createWebStore(
currentJob.getConfiguration(), String.class, WebPage.class);
GoraOutputFormat.setOutput(currentJob, store, true);
// NUTCH-1471 Make explicit which datastore class we use
Class<? extends DataStore<Object, Persistent>> dataStoreClass = StorageUtils
.getDataStoreClass(currentJob.getConfiguration());
LOG.info("InjectorJob: Using " + dataStoreClass
+ " as the Gora storage class.");
//set reducer for the job
currentJob.setReducerClass(Reducer.class);
//set the number of reduce tasks
currentJob.setNumReduceTasks(0);
currentJob.waitForCompletion(true);//通过调试发现,执行这一句的时候调用了内部类UrlMapper类的map函数
ToolUtil.recordJobStatus(null, currentJob, results);
// NUTCH-1370 Make explicit #URLs injected @runtime
long urlsInjected = currentJob.getCounters()
.findCounter("injector", "urls_injected").getValue();
long urlsFiltered = currentJob.getCounters()
.findCounter("injector", "urls_filtered").getValue();
LOG.info("InjectorJob: total number of urls rejected by filters: "
+ urlsFiltered);
LOG.info("InjectorJob: total number of urls injected after normalization and filtering: "
+ urlsInjected);
return results;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
在执行currentJob.waitForCompletion(true);这条语句时程序调用UrlMapper内部类执行setup和map函数。
UrlMapper类实现对网页的一些基本信息的控制,包括url标准化urlNormalizers,fetch的时间间隔,网页的注入分数,网页过滤filters,分数赋值scfilters,当前时间等。
其中setup(Context context)函数用来对该类的基本数据成员进行赋值,相当于是该类的构造函数;
map(LongWritable key,Text value,Context context)函数主要有以下功能
1.获取value当中的url,以一行为一个url,若其长度为0或者以“#”开头,则直接返回;
2.将url中的metaname和metavalue值以Map的形式存储在matadata当中,metaname包括两种形式即nutchScoreMDName和nutchFetchIntervalMDName;
3.标准化和过滤url,并给这些新注入的url赋予一定的初始分数,在赋予初始分数的过程当中,调用了org.apache.nutch.scoring包中的ScoringFilters类,这个类为了注入分数又调用了ScoringFilter接口,最后又根据用户想要使用那种方式去注入分数调用opic或tld等插件。如果想更改分数注入方式,则可以通过修改conf文件夹下面的nutch-default.xml文件中的plugin.includes的value值来实现;
4.记录注入网页的本次fetch的时间和其正常的两次fetch之间的时间间隔。
UrlMapper类的源码如下所示:
public static class UrlMapper extends
Mapper<LongWritable, Text, String, WebPage> {
private URLNormalizers urlNormalizers;//url标准化
private int interval;//fetch的时间间隔默认30天
private float scoreInjected;
private URLFilters filters;//过滤url
private ScoringFilters scfilters;
private long curTime;//当前时间
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
urlNormalizers = new URLNormalizers(context.getConfiguration(),
URLNormalizers.SCOPE_INJECT);
interval = context.getConfiguration().getInt("db.fetch.interval.default",
2592000);
filters = new URLFilters(context.getConfiguration());
scfilters = new ScoringFilters(context.getConfiguration());
scoreInjected = context.getConfiguration().getFloat("db.score.injected",
1.0f);
curTime = context.getConfiguration().getLong("injector.current.time",
System.currentTimeMillis());
}
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String url = value.toString().trim(); // value is line of text。String的trim()函数用以返回字符串的副本,忽略前导空白和尾部空白
System.out.println("输入的种子站点为:"+url);
//若url不为空 且 url的长度不为0或url以“#”号开始,则直接返回???
if (url != null && (url.length() == 0 || url.startsWith("#"))) {
/* Ignore line that start with # */
return;
}
// if tabs : metadata that could be stored
// must be name=value and separated by
float customScore = -1f;
int customInterval = interval;
Map<String, String> metadata = new TreeMap<String, String>();
if (url.indexOf(" ") != -1) {
String[] splits = url.split(" ");
url = splits[0];
for (int s = 1; s < splits.length; s++) {
// find separation between name and value
int indexEquals = splits[s].indexOf("=");
if (indexEquals == -1) {
// skip anything without a =System.out.println(filters.getClass().getName());
continue;
}
String metaname = splits[s].substring(0, indexEquals);
String metavalue = splits[s].substring(indexEquals + 1);
//System.out.println("metaname:" + metaname +" metavalue:"+metavalue);
if (metaname.equals(nutchScoreMDName)) {
try {
customScore = Float.parseFloat(metavalue);
} catch (NumberFormatException nfe) {
}
} else if (metaname.equals(nutchFetchIntervalMDName)) {
try {
customInterval = Integer.parseInt(metavalue);
} catch (NumberFormatException nfe) {
}
} else
metadata.put(metaname, metavalue);
}
}
try {
url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
url = filters.filter(url); // filter the url
} catch (Exception e) {
LOG.warn("Skipping " + url + ":" + e);
url = null;
}
if (url == null) {
context.getCounter("injector", "urls_filtered").increment(1);
return;
} else { // if it passes
String reversedUrl = TableUtil.reverseUrl(url); // collect it
WebPage row = WebPage.newBuilder().build();
row.setFetchTime(curTime);
row.setFetchInterval(customInterval);
// now add the metadata
Iterator<String> keysIter = metadata.keySet().iterator();
while (keysIter.hasNext()) {
String keymd = keysIter.next();
String valuemd = metadata.get(keymd);
row.getMetadata().put(new Utf8(keymd),
ByteBuffer.wrap(valuemd.getBytes()));
}
//System.out.println("customScore:"+customScore);
if (customScore != -1){
//System.out.println("customScore:"+customScore);
row.setScore(customScore);
}
else
row.setScore(scoreInjected);
//System.out.println("scoreInjected:" + scoreInjected);
try {
scfilters.injectedScore(url, row);
//System.out.println("网页内容为"+row.getContent()+"的分数值是:" + row.getScore());
} catch (ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Cannot filter injected score for url " + url
+ ", using default (" + e.getMessage() + ")");
}
}
context.getCounter("injector", "urls_injected").increment(1);
row.getMarkers()
.put(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0)));
Mark.INJECT_MARK.putMark(row, YES_STRING);
context.write(reversedUrl, row);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
在UrlMapper类的map函数中,传入的参数有一个键值对,key和对应的value,还有一个Context context参数,符合Haddoop的map/reduce工作模式,map函数实现完上述功能之后,将注入的网页数目和处理之后的url回写到context当中。
接下来程序回到public Map