zoukankan      html  css  js  c++  java
  • CountableThreadPool

    Spider剩下的CountableThreadPool

    在上一篇的Spider中我们一定注意到了threadpool这个变量,这个变量是Spider中的线程池,具体代码

    public class CountableThreadPool {

    private int threadNum;

    private AtomicInteger threadAlive = new AtomicInteger();

    private ReentrantLock reentrantLock = new ReentrantLock();

    private Condition condition = reentrantLock.newCondition();

    public CountableThreadPool(int threadNum) {
    this.threadNum = threadNum;
    this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    public CountableThreadPool(int threadNum, ExecutorService executorService) {
    this.threadNum = threadNum;
    this.executorService = executorService;
    }

    public void setExecutorService(ExecutorService executorService) {
    this.executorService = executorService;
    }

    public int getThreadAlive() {
    return threadAlive.get();
    }

    public int getThreadNum() {
    return threadNum;
    }

    private ExecutorService executorService;

    public void execute(final Runnable runnable) {


    if (threadAlive.get() >= threadNum) {
    try {
    reentrantLock.lock();
    while (threadAlive.get() >= threadNum) {
    try {
    condition.await();
    } catch (InterruptedException e) {
    }
    }
    } finally {
    reentrantLock.unlock();
    }
    }
    threadAlive.incrementAndGet();
    executorService.execute(new Runnable() {
    @Override
    public void run() {
    try {
    runnable.run();
    } finally {
    try {
    reentrantLock.lock();
    threadAlive.decrementAndGet();
    condition.signal();
    } finally {
    reentrantLock.unlock();
    }
    }
    }
    });
    }

    public boolean isShutdown() {
    return executorService.isShutdown();
    }

    public void shutdown() {
    executorService.shutdown();
    }


    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    CountableThreadPool提供了设置Executor或者默认创建的方式,如果不是很懂Java的线程池先去补习一下~最主要的三个变量

    private AtomicInteger threadAlive = new AtomicInteger();

    private ReentrantLock reentrantLock = new ReentrantLock();

    private Condition condition = reentrantLock.newCondition();
    1
    2
    3
    4
    5
    1
    2
    3
    4
    5
    threadAlive表示目前正在执行的线程,reentrantLock是一个自旋锁,用于对条件变量操作的同步,condition用户唤醒阻塞线程的条件变量。

    关键的方法:

    public void execute(final Runnable runnable) {


    if (threadAlive.get() >= threadNum) {
    try {
    reentrantLock.lock();
    while (threadAlive.get() >= threadNum) {
    try {
    condition.await();
    } catch (InterruptedException e) {
    }
    }
    } finally {
    reentrantLock.unlock();
    }
    }
    threadAlive.incrementAndGet();
    executorService.execute(new Runnable() {
    @Override
    public void run() {
    try {
    runnable.run();
    } finally {
    try {
    reentrantLock.lock();
    threadAlive.decrementAndGet();
    condition.signal();
    } finally {
    reentrantLock.unlock();
    }
    }
    }
    });
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    使用threadAlive这个变量来控制目前的活动线程,如果超出定义的线程数就阻塞,为什么这样呢,因为我们创建的是固定大小的线程池,默认的newFixedThreadPool创建的最大线程数就是传入的参数,如果线程数量超过线程池中的数值,对于默认的操作就是抛异常了。可以看一下这篇博客:
    http://uule.iteye.com/blog/1123185
    http://blog.csdn.net/sd0902/article/details/8395677

    Spider剩下的SpiderMonitor

    先说一句

    SpiderMonitor是负责监控Spider的运行状态的,建议仔细阅读官方文档
    http://webmagic.io/docs/zh/posts/ch4-basic-page-processor/monitor.html
    http://my.oschina.net/xpbug/blog/221547
    所以如果这部分对你没什么用,你可以跳过去,我就没用到~

    开始吧

    在Spider的代码中我们看到了这个

    public void run() {
    try {
    processRequest(requestFinal);
    onSuccess(requestFinal);
    } catch (Exception e) {
    onError(requestFinal);
    logger.error("process request " + requestFinal + " error", e);
    } finally {
    pageCount.incrementAndGet();
    signalNewUrl();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    对于onSuccess(requestFinal)和onError(requestFinal)这个方法名,如果你看的多了一眼就知道这是个接口,那么回调在哪里?也就是SpiderMonitor的内部类

    public class SpiderMonitor {

    private static SpiderMonitor INSTANCE = new SpiderMonitor();

    private AtomicBoolean started = new AtomicBoolean(false);

    private Logger logger = LoggerFactory.getLogger(getClass());

    private MBeanServer mbeanServer;

    private String jmxServerName;

    private List<SpiderStatusMXBean> spiderStatuses = new ArrayList<SpiderStatusMXBean>();

    protected SpiderMonitor() {
    jmxServerName = "WebMagic";
    mbeanServer = ManagementFactory.getPlatformMBeanServer();
    }

    /**
    * Register spider for monitor.
    *
    * @param spiders spiders
    * @return this
    */
    public synchronized SpiderMonitor register(Spider... spiders) throws JMException {
    for (Spider spider : spiders) {
    MonitorSpiderListener monitorSpiderListener = new MonitorSpiderListener();
    if (spider.getSpiderListeners() == null) {
    List<SpiderListener> spiderListeners = new ArrayList<SpiderListener>();
    spiderListeners.add(monitorSpiderListener);
    spider.setSpiderListeners(spiderListeners);
    } else {
    spider.getSpiderListeners().add(monitorSpiderListener);
    }
    SpiderStatusMXBean spiderStatusMBean = getSpiderStatusMBean(spider, monitorSpiderListener);
    registerMBean(spiderStatusMBean);
    spiderStatuses.add(spiderStatusMBean);
    }
    return this;
    }

    protected SpiderStatusMXBean getSpiderStatusMBean(Spider spider, MonitorSpiderListener monitorSpiderListener) {
    return new SpiderStatus(spider, monitorSpiderListener);
    }

    public static SpiderMonitor instance() {
    return INSTANCE;
    }

    public class MonitorSpiderListener implements SpiderListener {

    private final AtomicInteger successCount = new AtomicInteger(0);

    private final AtomicInteger errorCount = new AtomicInteger(0);

    private List<String> errorUrls = Collections.synchronizedList(new ArrayList<String>());

    @Override
    public void onSuccess(Request request) {
    successCount.incrementAndGet();
    }

    @Override
    public void onError(Request request) {
    errorUrls.add(request.getUrl());
    errorCount.incrementAndGet();
    }

    public AtomicInteger getSuccessCount() {
    return successCount;
    }

    public AtomicInteger getErrorCount() {
    return errorCount;
    }

    public List<String> getErrorUrls() {
    return errorUrls;
    }
    }

    protected void registerMBean(SpiderStatusMXBean spiderStatus) throws MalformedObjectNameException, InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException {
    ObjectName objName = new ObjectName(jmxServerName + ":name=" + spiderStatus.getName());
    mbeanServer.registerMBean(spiderStatus, objName);
    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    我们看到了在onSuccess和onError做了一些记录,主要是为了监控,如果你希望在爬虫成功或者失败实现一些自己方法也可以实现这个接口

    public interface SpiderListener {

    public void onSuccess(Request request);

    public void onError(Request request);
    }
    1
    2
    3
    4
    5
    6
    1
    2
    3
    4
    5
    6
    如果你对于Java的接口回调不是很懂那么推荐你看看《Head First设计模式》第一章,策略模式。

    写在后面

    这篇博客还是很简单的,主要完善了Spider的细小模块,后面将会介绍Spider的四大组件,如果喜欢多多支持~

  • 相关阅读:
    Android应用开发之避免内存泄露
    史上最经典的数据库面试题之二
    某大型银行深化系统之二十一:Log4j执行性能
    ruby支持批量数组的定义
    为VIM提供python代码提示功能
    使用win7登陆远程机器时自动保存密码
    安装Beanstalk
    在linux下安装或者卸载nginx
    python的数据类型
    使Ruby自动定位查找本地路径
  • 原文地址:https://www.cnblogs.com/timssd/p/5975853.html
Copyright © 2011-2022 走看看