zoukankan      html  css  js  c++  java
  • Pipeline & PageProcesser

    Pipeline & PageProcesser

    这两部分是应该程序员自己实现的部分,因为PageProcesser关乎如何解析页面而Pipeline则是存储,推荐使用OOSpider也就是注解式编程。

    Downloader

    public interface Downloader {

    /**
    * Downloads web pages and store in Page object.
    *
    * @param request request
    * @param task task
    * @return page
    */
    public Page download(Request request, Task task);

    /**
    * Tell the downloader how many threads the spider used.
    * @param threadNum number of threads
    */
    public void setThread(int threadNum);
    }

    主要的实现类又3个,我只重点介绍一下HttpClientDownloader,有兴趣的可以自己看看源码

    @ThreadSafe
    public class HttpClientDownloader extends AbstractDownloader {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private final Map<String, CloseableHttpClient> httpClients = new HashMap<String, CloseableHttpClient>();

    private HttpClientGenerator httpClientGenerator = new HttpClientGenerator();

    private CloseableHttpClient getHttpClient(Site site, Proxy proxy) {
    if (site == null) {
    return httpClientGenerator.getClient(null, proxy);
    }
    String domain = site.getDomain();
    CloseableHttpClient httpClient = httpClients.get(domain);
    if (httpClient == null) {
    synchronized (this) {
    httpClient = httpClients.get(domain);
    if (httpClient == null) {
    httpClient = httpClientGenerator.getClient(site, proxy);
    httpClients.put(domain, httpClient);
    }
    }
    }
    return httpClient;
    }

    @Override
    public Page download(Request request, Task task) {
    Site site = null;
    if (task != null) {
    site = task.getSite();
    }
    Set<Integer> acceptStatCode;
    String charset = null;
    Map<String, String> headers = null;
    if (site != null) {
    acceptStatCode = site.getAcceptStatCode();
    charset = site.getCharset();
    headers = site.getHeaders();
    } else {
    acceptStatCode = Sets.newHashSet(200);
    }
    logger.info("downloading page {}", request.getUrl());
    CloseableHttpResponse httpResponse = null;
    int statusCode=0;
    try {
    HttpHost proxyHost = null;
    Proxy proxy = null; //TODO
    if (site.getHttpProxyPool() != null && site.getHttpProxyPool().isEnable()) {
    proxy = site.getHttpProxyFromPool();
    proxyHost = proxy.getHttpHost();
    } else if(site.getHttpProxy()!= null){
    proxyHost = site.getHttpProxy();
    }

    HttpUriRequest httpUriRequest = getHttpUriRequest(request, site, headers, proxyHost);
    httpResponse = getHttpClient(site, proxy).execute(httpUriRequest);��֤
    statusCode = httpResponse.getStatusLine().getStatusCode();
    request.putExtra(Request.STATUS_CODE, statusCode);
    if (statusAccept(acceptStatCode, statusCode)) {
    Page page = handleResponse(request, charset, httpResponse, task);
    onSuccess(request);
    return page;
    } else {
    logger.warn("code error " + statusCode + " " + request.getUrl());
    return null;
    }
    } catch (IOException e) {
    logger.warn("download page " + request.getUrl() + " error", e);
    if (site.getCycleRetryTimes() > 0) {
    return addToCycleRetry(request, site);
    }
    onError(request);
    return null;
    } finally {
    request.putExtra(Request.STATUS_CODE, statusCode);
    if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
    site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY), (Integer) request
    .getExtra(Request.STATUS_CODE));
    }
    try {
    if (httpResponse != null) {
    //ensure the connection is released back to pool
    EntityUtils.consume(httpResponse.getEntity());
    }
    } catch (IOException e) {
    logger.warn("close response fail", e);
    }
    }
    }

    @Override
    public void setThread(int thread) {
    httpClientGenerator.setPoolSize(thread);
    }

    protected boolean statusAccept(Set<Integer> acceptStatCode, int statusCode) {
    return acceptStatCode.contains(statusCode);
    }

    protected HttpUriRequest getHttpUriRequest(Request request, Site site, Map<String, String> headers,HttpHost proxy) {
    RequestBuilder requestBuilder = selectRequestMethod(request).setUri(request.getUrl());
    if (headers != null) {
    for (Map.Entry<String, String> headerEntry : headers.entrySet()) {
    requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
    }
    }
    RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
    .setConnectionRequestTimeout(site.getTimeOut())
    .setSocketTimeout(site.getTimeOut())
    .setConnectTimeout(site.getTimeOut())
    .setCookieSpec(CookieSpecs.BEST_MATCH);
    if (proxy !=null) {
    requestConfigBuilder.setProxy(proxy);
    request.putExtra(Request.PROXY, proxy);
    }
    requestBuilder.setConfig(requestConfigBuilder.build());
    return requestBuilder.build();
    }

    protected RequestBuilder selectRequestMethod(Request request) {
    String method = request.getMethod();
    if (method == null || method.equalsIgnoreCase(HttpConstant.Method.GET)) {
    //default get
    return RequestBuilder.get();
    } else if (method.equalsIgnoreCase(HttpConstant.Method.POST)) {
    RequestBuilder requestBuilder = RequestBuilder.post();
    NameValuePair[] nameValuePair = (NameValuePair[]) request.getExtra("nameValuePair");
    if (nameValuePair != null && nameValuePair.length > 0) {
    requestBuilder.addParameters(nameValuePair);
    }
    return requestBuilder;
    } else if (method.equalsIgnoreCase(HttpConstant.Method.HEAD)) {
    return RequestBuilder.head();
    } else if (method.equalsIgnoreCase(HttpConstant.Method.PUT)) {
    return RequestBuilder.put();
    } else if (method.equalsIgnoreCase(HttpConstant.Method.DELETE)) {
    return RequestBuilder.delete();
    } else if (method.equalsIgnoreCase(HttpConstant.Method.TRACE)) {
    return RequestBuilder.trace();
    }
    throw new IllegalArgumentException("Illegal HTTP Method " + method);
    }

    protected Page handleResponse(Request request, String charset, HttpResponse httpResponse, Task task) throws IOException {
    String content = getContent(charset, httpResponse);
    Page page = new Page();
    page.setRawText(content);
    page.setUrl(new PlainText(request.getUrl()));
    page.setRequest(request);
    page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
    return page;
    }

    protected String getContent(String charset, HttpResponse httpResponse) throws IOException {
    if (charset == null) {
    byte[] contentBytes = IOUtils.toByteArray(httpResponse.getEntity().getContent());
    String htmlCharset = getHtmlCharset(httpResponse, contentBytes);
    if (htmlCharset != null) {
    return new String(contentBytes, htmlCharset);
    } else {
    logger.warn("Charset autodetect failed, use {} as charset. Please specify charset in Site.setCharset()", Charset.defaultCharset());
    return new String(contentBytes);
    }
    } else {
    return IOUtils.toString(httpResponse.getEntity().getContent(), charset);
    }
    }

    protected String getHtmlCharset(HttpResponse httpResponse, byte[] contentBytes) throws IOException {
    String charset;
    // charset
    // 1、encoding in http header Content-Type
    String value = httpResponse.getEntity().getContentType().getValue();
    charset = UrlUtils.getCharset(value);
    if (StringUtils.isNotBlank(charset)) {
    logger.debug("Auto get charset: {}", charset);
    return charset;
    }
    // use default charset to decode first time
    Charset defaultCharset = Charset.defaultCharset();
    String content = new String(contentBytes, defaultCharset.name());
    // 2、charset in meta
    if (StringUtils.isNotEmpty(content)) {
    Document document = Jsoup.parse(content);
    Elements links = document.select("meta");
    for (Element link : links) {
    // 2.1、html4.01 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
    String metaContent = link.attr("content");
    String metaCharset = link.attr("charset");
    if (metaContent.indexOf("charset") != -1) {
    metaContent = metaContent.substring(metaContent.indexOf("charset"), metaContent.length());
    charset = metaContent.split("=")[1];
    break;
    }
    // 2.2、html5 <meta charset="UTF-8" />
    else if (StringUtils.isNotEmpty(metaCharset)) {
    charset = metaCharset;
    break;
    }
    }
    }
    logger.debug("Auto get charset: {}", charset);
    // 3、todo use tools as cpdetector for content decode
    return charset;
    }
    }

    其中包括了添加http proxy这部分官方文档都没有介绍,如果需要那就自行看源码吧- -b
    再看带那种的这部分

    if (statusAccept(acceptStatCode, statusCode)) {
    Page page = handleResponse(request, charset, httpResponse, task);
    onSuccess(request);
    return page;
    } else {
    logger.warn("code error " + statusCode + " " + request.getUrl());
    return null;
    }

    acceptStatCode默认是200,如果出现其他resultCode那么将会直接return null,也不会释放HttpClient的资源,也就是下面的finally块不会被执行。也算是一个bug吧

    finally {
    request.putExtra(Request.STATUS_CODE, statusCode);
    if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
    site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY), (Irequest
    .getExtra(Request.STATUS_CODE));
    }
    try {
    if (httpResponse != null) {
    //ensure the connection is released back to pool
    EntityUtils.consume(httpResponse.getEntity());
    }
    } catch (IOException e) {
    logger.warn("close response fail", e);
    }
    }

    到此为止,所有的关于WebMagic的主体源码都介绍完毕了,如果你需要使用那么目前的知识已经足够了,如果出现bug还是需要自行更改,还好WebMagic给我们提供了更换组件的接口,使用起来还是很方便的。

  • 相关阅读:
    hadoop balance
    随笔
    ubuntu server 使用parted分区
    程序员内功续
    hadoop——hdfs多硬盘挂载
    hdfs老数据压缩备份的一些问题20120521
    hadoop balance failed
    hoj 2524 Allocate Dormitories 二分图的最大匹配
    HDOJ 分类(转)
    hoj 3008 Matryoshka Dolls Again 最大独立子集
  • 原文地址:https://www.cnblogs.com/timssd/p/5975852.html
Copyright © 2011-2022 走看看