zoukankan      html  css  js  c++  java
  • Scheduler

    先看看文档对于Scheduler的作用介绍
    https://code4craft.gitbooks.io/webmagic-in-action/content/zh/posts/ch1-overview/architecture.html
    之前我们也介绍过了,Scheduler主要负责爬虫的下一步爬取的规划,包括一些去重等功能。在主流程中也看到了Scheduler,现在来具体结合源码分析

    源码

    Scheduler是一个接口

    public interface Scheduler {

    /**
    * add a url to fetch
    *
    * @param request
    * @param task
    */
    public void push(Request request, Task task);

    /**
    * get an url to crawl
    *
    * @param task the task of spider
    * @return the url to crawl
    */
    public Request poll(Task task);

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    其主要的实现是DuplicateRemovedScheduler,使用模板模式定义了push的步骤。

    public abstract class DuplicateRemovedScheduler implements Scheduler {

    protected Logger logger = LoggerFactory.getLogger(getClass());

    private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover();

    public DuplicateRemover getDuplicateRemover() {
    return duplicatedRemover;
    }

    public DuplicateRemovedScheduler setDuplicateRemover(DuplicateRemover duplicatedRemover) {
    this.duplicatedRemover = duplicatedRemover;
    return this;
    }

    @Override
    public void push(Request request, Task task) {
    logger.trace("get a candidate url {}", request.getUrl());
    if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) {
    logger.debug("push to queue {}", request.getUrl());
    pushWhenNoDuplicate(request, task);
    }
    }

    protected boolean shouldReserved(Request request) {
    return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
    }

    protected void pushWhenNoDuplicate(Request request, Task task) {

    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    我们来看看负责去重的接口DuplicateRemover,其实现类有HashSetDuplicateRemover使用HashSet来去重,RedisScheduler接触Redis来去重和BloomFilterDuplicateRemover使用BloomFilter去重。默认使用HashSetDuplicateRemover

    public class HashSetDuplicateRemover implements DuplicateRemover {

    private Set<String> urls = Sets.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

    @Override
    public boolean isDuplicate(Request request, Task task) {
    return !urls.add(getUrl(request));
    }

    protected String getUrl(Request request) {
    return request.getUrl();
    }

    @Override
    public void resetDuplicateCheck(Task task) {
    urls.clear();
    }

    @Override
    public int getTotalRequestsCount(Task task) {
    return urls.size();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    DuplicateRemovedScheduler抽象类有四个具体实现类QueueScheduler,PriorityScheduler,FileCacheQueueScheduler和RedisScheduler。默认使用QueueScheduler

    @ThreadSafe
    public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {

    private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();

    @Override
    public void pushWhenNoDuplicate(Request request, Task task) {
    queue.add(request);
    }

    @Override
    public synchronized Request poll(Task task) {
    return queue.poll();
    }

    @Override
    public int getLeftRequestsCount(Task task) {
    return queue.size();
    }

    @Override
    public int getTotalRequestsCount(Task task) {
    return getDuplicateRemover().getTotalRequestsCount(task);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    其内部是使用了一个LinkedBlockingQueue这个无界队列来存储Request,我们应该看到了@ThreadSafe注解,那我抛一个问题吧。Scheduler是否存在线程同步问题呢,如果存在那是如何解决的呢?
    再来看下一个

    @ThreadSafe
    public class PriorityScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {

    public static final int INITIAL_CAPACITY = 5;

    private BlockingQueue<Request> noPriorityQueue = new LinkedBlockingQueue<Request>();

    private PriorityBlockingQueue<Request> priorityQueuePlus = new PriorityBlockingQueue<Request>(INITIAL_CAPACITY, new Comparator<Request>() {
    @Override
    public int compare(Request o1, Request o2) {
    return -NumberUtils.compareLong(o1.getPriority(), o2.getPriority());
    }
    });

    private PriorityBlockingQueue<Request> priorityQueueMinus = new PriorityBlockingQueue<Request>(INITIAL_CAPACITY, new Comparator<Request>() {
    @Override
    public int compare(Request o1, Request o2) {
    return -NumberUtils.compareLong(o1.getPriority(), o2.getPriority());
    }
    });

    @Override
    public void pushWhenNoDuplicate(Request request, Task task) {
    if (request.getPriority() == 0) {
    noPriorityQueue.add(request);
    } else if (request.getPriority() > 0) {
    priorityQueuePlus.put(request);
    } else {
    priorityQueueMinus.put(request);
    }
    }

    @Override
    public synchronized Request poll(Task task) {
    Request poll = priorityQueuePlus.poll();
    if (poll != null) {
    return poll;
    }
    poll = noPriorityQueue.poll();
    if (poll != null) {
    return poll;
    }
    return priorityQueueMinus.poll();
    }

    @Override
    public int getLeftRequestsCount(Task task) {
    return noPriorityQueue.size();
    }

    @Override
    public int getTotalRequestsCount(Task task) {
    return getDuplicateRemover().getTotalRequestsCount(task);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    我们看到了两个PriorityBlockingQueue和一个LinkedBlockingQueue。在poll的时候存在一个顺序。
    继续

    public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {

    private String filePath = System.getProperty("java.io.tmpdir");

    private String fileUrlAllName = ".urls.txt";

    private Task task;

    private String fileCursor = ".cursor.txt";

    private PrintWriter fileUrlWriter;

    private PrintWriter fileCursorWriter;

    private AtomicInteger cursor = new AtomicInteger();

    private AtomicBoolean inited = new AtomicBoolean(false);

    private BlockingQueue<Request> queue;

    private Set<String> urls;

    public FileCacheQueueScheduler(String filePath) {
    if (!filePath.endsWith("/") && !filePath.endsWith("\")) {
    filePath += "/";
    }
    this.filePath = filePath;
    }

    private void flush() {
    fileUrlWriter.flush();
    fileCursorWriter.flush();
    }

    private void init(Task task) {
    this.task = task;
    File file = new File(filePath);
    if (!file.exists()) {
    file.mkdirs();
    }
    readFile();
    initWriter();
    initFlushThread();
    inited.set(true);
    logger.info("init cache scheduler success");
    }

    private void initFlushThread() {
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    flush();
    }
    }, 10, 10, TimeUnit.SECONDS);
    }

    private void initWriter() {
    try {
    fileUrlWriter = new PrintWriter(new FileWriter(getFileName(fileUrlAllName), true));
    fileCursorWriter = new PrintWriter(new FileWriter(getFileName(fileCursor), false));
    } catch (IOException e) {
    throw new RuntimeException("init cache scheduler error", e);
    }
    }

    private void readFile() {
    try {
    queue = new LinkedBlockingQueue<Request>();
    urls = new LinkedHashSet<String>();
    readCursorFile();
    readUrlFile();
    } catch (FileNotFoundException e) {
    //init
    logger.info("init cache file " + getFileName(fileUrlAllName));
    } catch (IOException e) {
    logger.error("init file error", e);
    }
    }

    private void readUrlFile() throws IOException {
    String line;
    BufferedReader fileUrlReader = null;
    try {
    fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
    int lineReaded = 0;
    while ((line = fileUrlReader.readLine()) != null) {
    urls.add(line.trim());
    lineReaded++;
    if (lineReaded > cursor.get()) {
    queue.add(new Request(line));
    }
    }
    } finally {
    if (fileUrlReader != null) {
    IOUtils.closeQuietly(fileUrlReader);
    }
    }
    }

    private void readCursorFile() throws IOException {
    BufferedReader fileCursorReader = null;
    try {
    fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
    String line;
    //read the last number
    while ((line = fileCursorReader.readLine()) != null) {
    cursor = new AtomicInteger(NumberUtils.toInt(line));
    }
    } finally {
    if (fileCursorReader != null) {
    IOUtils.closeQuietly(fileCursorReader);
    }
    }
    }

    private String getFileName(String filename) {
    return filePath + task.getUUID() + filename;
    }

    @Override
    protected void pushWhenNoDuplicate(Request request, Task task) {
    if (!inited.get()) {
    init(task);
    }
    queue.add(request);
    fileUrlWriter.println(request.getUrl());
    }

    @Override
    public synchronized Request poll(Task task) {
    if (!inited.get()) {
    init(task);
    }
    fileCursorWriter.println(cursor.incrementAndGet());
    return queue.poll();
    }

    @Override
    public int getLeftRequestsCount(Task task) {
    return queue.size();
    }

    @Override
    public int getTotalRequestsCount(Task task) {
    return getDuplicateRemover().getTotalRequestsCount(task);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    会将url和已经执行的url指针存在两个文件中,创建了scheduleExecutor定期的flush,所有内存中的url还是存在BlockingQueue中。
    RedisScheduler不是很懂。。目前还没有接触过:)

    使用

    具体使用过程还是需要自己根据自己的爬虫特点然后选择特定的Scheduler及DuplicateRemover,只有懂得其原理才能选择最合适的组件。
    WebMagic组件都可以自行设置这点真的太棒了~

  • 相关阅读:
    sql-lib闯关秘籍之1-10关
    简单的SQL注入
    五分钟带你读懂 TCP全连接队列(图文并茂)
    Ambari HDP集群搭建全攻略
    Spring Cloud Security OAuth2.0 认证授权系列(入门篇)
    敲黑板:InnoDB的Double Write,你必须知道
    重要,知识点:InnoDB的插入缓冲
    你不知道的内存知识
    每日一个知识点:关于磁盘的一些事儿
    Spring Boot 系列:日志动态配置详解
  • 原文地址:https://www.cnblogs.com/timssd/p/5975854.html
Copyright © 2011-2022 走看看