zoukankan      html  css  js  c++  java
  • nutch 生产者队列的大小如何控制 threadcount * 50

    如果topN 设置为1000万 ,不会这1000万都放到QueueFeeder(内存)中,而是从文件系统中(hdfs)中迭代不断填充QueueFeeder。
    队列中默认存放 threadcount * 50 。

    这个类的作用是从文件系统读文件填充队列。

    /**
    * This class feeds the queues with input items, and re-fills them as * items are consumed by FetcherThread-s. */ private static class QueueFeeder extends Thread { private final Context context; private final FetchItemQueues queues; private final int size; private Iterator<FetchEntry> currentIter; //FetchEntry实现了 org.apache.hadoop.io.Writable boolean hasMore; private long timelimit = -1; public QueueFeeder(Context context, FetchItemQueues queues, int size) throws IOException, InterruptedException { this.context = context; this.queues = queues; this.size = size; this.setDaemon(true); this.setName("QueueFeeder"); hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } // the value of the time limit is either -1 or the time where it should finish timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1); } @Override public void run() { int cnt = 0; int timelimitcount = 0; try { while (hasMore) { if (System.currentTimeMillis() >= timelimit && timelimit != -1) { // enough .. lets' simply // read all the entries from the input without processing them while (currentIter.hasNext()) { currentIter.next(); timelimitcount++; } hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } continue; } int feed = size - queues.getTotalSize(); if (feed <= 0) { // queues are full - spin-wait until they have some free space try { Thread.sleep(1000); } catch (final Exception e) {}; continue; } if (LOG.isDebugEnabled()) { LOG.debug("-feeding " + feed + " input urls ..."); } while (feed > 0 && currentIter.hasNext()) { FetchEntry entry = currentIter.next(); final String url = TableUtil.unreverseUrl(entry.getKey()); queues.addFetchItem(url, entry.getWebPage()); feed--; cnt++; } if (currentIter.hasNext()) { continue; // finish items in current list before reading next key } hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } } } catch (Exception e) { LOG.error("QueueFeeder error reading input, record " + cnt, e); return; } LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :" + timelimitcount); context.getCounter("FetcherStatus","HitByTimeLimit-QueueFeeder").increment(timelimitcount); } }
  • 相关阅读:
    解决执行sql脚本报错:没有足够的内存继续执行程序。
    正则表达式学习
    art-template模板引擎循环嵌套
    textarea 设置最长字数和显示剩余字数
    display:table-cell
    js 发送 ajax 是数组 后台循环 发送json 到前台的方法
    js 函数内数据调用
    Angular 原文输出
    Angular 路由跳转
    JQ 按钮实现两种功能
  • 原文地址:https://www.cnblogs.com/i80386/p/3958948.html
Copyright © 2011-2022 走看看