zoukankan      html  css  js  c++  java
  • nutch 生产者队列的大小如何控制 threadcount * 50

    如果topN 设置为1000万 ,不会这1000万都放到QueueFeeder(内存)中,而是从文件系统中(hdfs)中迭代不断填充QueueFeeder。
    队列中默认存放 threadcount * 50 。

    这个类的作用是从文件系统读文件填充队列。

    /**
    * This class feeds the queues with input items, and re-fills them as * items are consumed by FetcherThread-s. */ private static class QueueFeeder extends Thread { private final Context context; private final FetchItemQueues queues; private final int size; private Iterator<FetchEntry> currentIter; //FetchEntry实现了 org.apache.hadoop.io.Writable boolean hasMore; private long timelimit = -1; public QueueFeeder(Context context, FetchItemQueues queues, int size) throws IOException, InterruptedException { this.context = context; this.queues = queues; this.size = size; this.setDaemon(true); this.setName("QueueFeeder"); hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } // the value of the time limit is either -1 or the time where it should finish timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1); } @Override public void run() { int cnt = 0; int timelimitcount = 0; try { while (hasMore) { if (System.currentTimeMillis() >= timelimit && timelimit != -1) { // enough .. lets' simply // read all the entries from the input without processing them while (currentIter.hasNext()) { currentIter.next(); timelimitcount++; } hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } continue; } int feed = size - queues.getTotalSize(); if (feed <= 0) { // queues are full - spin-wait until they have some free space try { Thread.sleep(1000); } catch (final Exception e) {}; continue; } if (LOG.isDebugEnabled()) { LOG.debug("-feeding " + feed + " input urls ..."); } while (feed > 0 && currentIter.hasNext()) { FetchEntry entry = currentIter.next(); final String url = TableUtil.unreverseUrl(entry.getKey()); queues.addFetchItem(url, entry.getWebPage()); feed--; cnt++; } if (currentIter.hasNext()) { continue; // finish items in current list before reading next key } hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } } } catch (Exception e) { LOG.error("QueueFeeder error reading input, record " + cnt, e); return; } LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :" + timelimitcount); context.getCounter("FetcherStatus","HitByTimeLimit-QueueFeeder").increment(timelimitcount); } }
  • 相关阅读:
    C# 为WebBrowser设置代理,打开网页
    C# WebBrowser 设置代理完全解决方案
    java读取文件的几种方式性能比较
    .NET 对文件和文件夹操作的介绍
    java利用反射打印出类的结构
    java输出月的日历控制台
    java 实现二分查找算法
    java实现快速排序
    解决window 12 service 不能调用excel ,报"System.Runtime.InteropServices.COMException (0x800A03EC)
    3 webpack 4 加vue 2.0生产环境搭建
  • 原文地址:https://www.cnblogs.com/i80386/p/3958948.html
Copyright © 2011-2022 走看看