zoukankan      html  css  js  c++  java
  • nutch 生产者队列的大小如何控制 threadcount * 50

    如果topN 设置为1000万 ,不会这1000万都放到QueueFeeder(内存)中,而是从文件系统中(hdfs)中迭代不断填充QueueFeeder。
    队列中默认存放 threadcount * 50 。

    这个类的作用是从文件系统读文件填充队列。

    /**
    * This class feeds the queues with input items, and re-fills them as * items are consumed by FetcherThread-s. */ private static class QueueFeeder extends Thread { private final Context context; private final FetchItemQueues queues; private final int size; private Iterator<FetchEntry> currentIter; //FetchEntry实现了 org.apache.hadoop.io.Writable boolean hasMore; private long timelimit = -1; public QueueFeeder(Context context, FetchItemQueues queues, int size) throws IOException, InterruptedException { this.context = context; this.queues = queues; this.size = size; this.setDaemon(true); this.setName("QueueFeeder"); hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } // the value of the time limit is either -1 or the time where it should finish timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1); } @Override public void run() { int cnt = 0; int timelimitcount = 0; try { while (hasMore) { if (System.currentTimeMillis() >= timelimit && timelimit != -1) { // enough .. lets' simply // read all the entries from the input without processing them while (currentIter.hasNext()) { currentIter.next(); timelimitcount++; } hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } continue; } int feed = size - queues.getTotalSize(); if (feed <= 0) { // queues are full - spin-wait until they have some free space try { Thread.sleep(1000); } catch (final Exception e) {}; continue; } if (LOG.isDebugEnabled()) { LOG.debug("-feeding " + feed + " input urls ..."); } while (feed > 0 && currentIter.hasNext()) { FetchEntry entry = currentIter.next(); final String url = TableUtil.unreverseUrl(entry.getKey()); queues.addFetchItem(url, entry.getWebPage()); feed--; cnt++; } if (currentIter.hasNext()) { continue; // finish items in current list before reading next key } hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } } } catch (Exception e) { LOG.error("QueueFeeder error reading input, record " + cnt, e); return; } LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :" + timelimitcount); context.getCounter("FetcherStatus","HitByTimeLimit-QueueFeeder").increment(timelimitcount); } }
  • 相关阅读:
    Java数据结构概述·14
    Java之自定义异常·13
    idea spirng项目jsp页面乱码
    HashMap和LinkedHashMap的区别
    jar包导入仓库中
    后台接口接受前端参数的时候使用包装类和基本类型接受
    转 为什么程序员怕改需求?
    thymeleaf常用标签
    linux查看端口占用情况
    查询linux硬件配置
  • 原文地址:https://www.cnblogs.com/i80386/p/3958948.html
Copyright © 2011-2022 走看看