zoukankan      html  css  js  c++  java
  • Nutch2 WebPage写入数据库的过程分析

    版本: Nutch 2.2.1

    本文通过InjectJob来追踪webpage的定义、创建、传递、序列化、写入数据库的整个过程。从源码中摘录了重要的代码行,并标明其所在文件名、行号。

    1. 定义 schema
    schema直接写在源代码里面:
    1. //file: org/apache/nutch/storage/WebPage.java  
    2. //line: 42  
    3. public class WebPage extends PersistentBase {  
    4.   public static final Schema _SCHEMA = Schema.parse("{"type":"record","name":"WebPage","namespace":"org.apache.nutch.storage","fields":[{"name":"baseUrl","type":"string"},{"name":"status","type":"int"},{"name":"fetchTime","type":"long"},{"name":"prevFetchTime","type":"long"},{"name":"fetchInterval","type":"int"},{"name":"retriesSinceFetch","type":"int"},{"name":"modifiedTime","type":"long"},{"name":"prevModifiedTime","type":"long"},{"name":"protocolStatus","type":{"type":"record","name":"ProtocolStatus","fields":[{"name":"code","type":"int"},{"name":"args","type":{"type":"array","items":"string"}},{"name":"lastModified","type":"long"}]}},{"name":"content","type":"bytes"},{"name":"contentType","type":"string"},{"name":"prevSignature","type":"bytes"},{"name":"signature","type":"bytes"},{"name":"title","type":"string"},{"name":"text","type":"string"},{"name":"parseStatus","type":{"type":"record","name":"ParseStatus","fields":[{"name":"majorCode","type":"int"},{"name":"minorCode","type":"int"},{"name":"args","type":{"type":"array","items":"string"}}]}},{"name":"score","type":"float"},{"name":"reprUrl","type":"string"},{"name":"headers","type":{"type":"map","values":"string"}},{"name":"outlinks","type":{"type":"map","values":"string"}},{"name":"inlinks","type":{"type":"map","values":"string"}},{"name":"markers","type":{"type":"map","values":"string"}},{"name":"metadata","type":{"type":"map","values":"bytes"}},{"name":"batchId","type":"string"}]}");  
    5. //..  
    6. public Schema getSchema() { return _SCHEMA; }  
    7. //...  
    8. }  

    这是一个json格式的字符串,由avro负责解析

    2.  传递Schema
    这一过程在提交job之前的初始化阶段进行
    1. //file: org/apache/nutch/crawl/InjectorJob.java  
    2. //InjectorJob.run(Map<String,Object>) line: 221     
    3. {   
    4.     DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),  
    5.       String.class, WebPage.class);  
    6. }   


    一层层的传递persistentClass

    1. //file: gora-core-0.2.1/org/apache/gora/store/DataStoreFactory.java  
    2. //DataStoreFactory.createDataStore(Class<D>, Class<K>, Class<T>, Configuration, String) line: 135    
    3. {  
    4.     return createDataStore(dataStoreClass, keyClass, persistent, conf, createProps(), schemaName);  
    5. }   


    gora调用WebPage.getSchema() ,获取了Schema
    1. //file: gora-core-0.2.1/org/apache/gora/store/DataStoreBase.java  
    2. //SqlStore<K,T>(DataStoreBase<K,T>).initialize(Class<K>, Class<T>, Properties) line: 81   
    3. {     
    4.     schema = this.beanFactory.getCachedPersistent().getSchema();  
    5.     fieldMap = AvroUtils.getFieldMap(schema);  
    6. }   



    3. 传递数据、序列化
    这一过程在Map阶段进行

    Map方法创建webpage(row),并在最后输出到context
    1. //file: org/apache/nutch/crawl/InjectorJob.java  
    2. //InjectorJob$UrlMapper.map(LongWritable, Text, Mapper<LongWritable,Text,String,Contex>) line: 191   
    3. {     
    4.       context.write(reversedUrl, row);  
    5. }   


    hadoop core 逐层传递webpage
    1. //file: hadoop-src/org/apache/hadoop/mapred/MapTask.java  
    2. //MapTask$NewDirectOutputCollector<K,V>.write(K, V) line: 638      
    3. {  
    4.       reporter.progress();  
    5.       long bytesOutPrev = getOutputBytes(fsStats);  
    6.       out.write(key, value);  
    7. }   

    上面的out对象的类型是GoraRecoreWriter

    1. //file: gora-core-0.2.1/org/apache/gora/mapreduce/GoraRecordWriter.java  
    2. //GoraRecordWriter<K,T>.write(K, T) line: 60     
    3. {   
    4.     store.put(key, (Persistent) value);  
    5. }  


    对象store的实际类型为SqlStore,继承自Gora-core的DataStoreBase类,负责对Mysql的读写。K是主键,T是一个WebPage对象,先写到cache里面。

    1. //file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java  
    2. //SqlStore<K,T>.put(K, T) line: 616    
    3.    
    4.   public void put(K key, T persistent)  
    5.   {       
    6.       List<Field> fields = schema.getFields();  
    7.   
    8.       for (int i = 0; i < fields.size(); i++) {  
    9.         Field field = fields.get(i);  
    10.         Column column = mapping.getColumn(field.name());  
    11.         insertStatement.setObject(persistent.get(i), field.schema(), column);  
    12.       }  
    13.   
    14.       //jdbc already should cache the ps  
    15.       PreparedStatement insert = insertStatement.toStatement(connection);  
    16.       synchronized (writeCache) {  
    17.         writeCache.add(insert);  
    18.       }  
    19.   
    20.   }  


    toStatement()里面调用了setField(),序列化操作由avro实现,这里暂不深入
    1. //file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java  
    2. //SqlStore<K,T>.setField(PreparedStatement, Column, Schema, int, Object) line: 718  
    3. {  
    4.    IOUtils.serialize(os, datumWriter, schema, object);  
    5. }   



    4. flush操作
    1. //file: hadoop-src/org/apache/hadoop/mapred/MapTask.java  
    2. //MapTask.runNewMapper(JobConf, TaskSplitIndex, TaskUmbilicalProtocol, TaskReporter) line: 767  
    3. {   
    4.     output.close(mapperContext);  
    5. }  
    6.   
    7. //file: gora-core-0.2.1/org/apache/gora/mapreduce/GoraRecordWriter.java  
    8. //GoraRecordWriter<K,T>.close(TaskAttemptContext) line: 55      
    9. {  
    10.     store.close();  
    11. }  

    下面是SqlStore.close()内调用的flush()方法:
    1. //file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java  
    2. //SqlStore<K,T>.flush() line: 342  
    3. {  
    4.     connection.commit();  
    5. }   

    至此,webpage被写入Mysql数据库 (底层是调用jdbc)


  • 相关阅读:
    Java 动态编译
    在ubuntu 18.04下,无线网卡无驱动,连不上wifi,显示wifi没有适配器的解决方法
    由浅入深了解Thrift(1,2,3)
    Docker系列05:docker镜像制作 &Docker file
    Docker系列04:docker数据存储
    Docker系列03:docker网络
    关于在github上 下载源码 clone 非 master 分支的代码
    CentOS 6 & 7 忘记root密码的修改方法
    Windows RDP远程连接CentOS 7
    Windows 上用IntelliJ Idea调试百度大数据分析框架Apache Doris FE
  • 原文地址:https://www.cnblogs.com/jpfss/p/7903851.html
Copyright © 2011-2022 走看看