zoukankan      html  css  js  c++  java
  • go并发版爬虫

    并发版爬虫

    代码实现

    /crawler/main.go
    package main
    
    import (
    	"learn/crawler/engine"
    	"learn/crawler/scheduler"
    	"learn/crawler/zhenai/parser"
    )
    
    func main() {
    	e := engine.ConcurrentEngine{
    		Scheduler: &scheduler.QueuedScheduler{},
    		WorkerCount: 20,
    	}
    	e.Run(engine.Request{
    		Url:       "http://www.zhenai.com/zhenghun",
    		ParseFunc: parser.ParseCityList,
    	})
    	//测试上海单个城市
    	//e.Run(engine.Request{
    	//	Url:       "http://www.zhenai.com/zhenghun/shanghai",
    	//	ParseFunc: parser.ParseCity,
    	//})
    }
    
    /crawler/engine/simple.go
    package engine
    
    import (
    	"learn/crawler/fetcher"
    	"log"
    )
    
    type SimpleEngine struct {
    
    }
    
    func (e SimpleEngine) Run(seeds ...Request)  {
    	var requests []Request
    	for _, r := range seeds {
    		requests = append(requests, r)
    	}
    	for len(requests) > 0 {
    		r := requests[0]
    		requests = requests[1:]
    
    		parseResult, err := worker(r)
    		if err != nil {
    			continue
    		}
    		requests = append(requests, parseResult.Requests...)
    		for _, item := range parseResult.Items{
    			log.Printf("Got item %v", item)
    		}
    	}
    }
    func worker(r Request) (ParseResult, error) {
    	log.Printf("Fetching %s", r.Url)
    	body, err := fetcher.Fetch(r.Url)
    	if err != nil {
    		log.Printf("Fetcher: error" + "fetching url %s: %v", r.Url, err)
    		return ParseResult{}, err
    	}
    	return r.ParseFunc(body), nil
    }
    
    
    /crawler/engine/concurrent.go
    package engine
    
    import (
    	"log"
    )
    
    type ConcurrentEngine struct {
    	Scheduler Scheduler
    	WorkerCount int
    }
    type Scheduler interface {
    	ReadyNotifier
    	Submit(Request)
    	WorkerChan() chan Request
    	Run()
    }
    type ReadyNotifier interface {
    	WorkerReady(chan Request)
    }
    func (e *ConcurrentEngine) Run(seeds ...Request)  {
    	out := make(chan ParseResult)
    	e.Scheduler.Run()
    
    	for i := 0; i < e.WorkerCount; i++ {
    		createWork(e.Scheduler.WorkerChan(), out, e.Scheduler)
    	}
    	for _, r := range seeds {
    		e.Scheduler.Submit(r)
    	}
    	itemCount := 0
    	for {
    		result := <- out
    		for _, item := range result.Items {
    			log.Printf("Got item #%d: %v", itemCount, item)
    			itemCount++
    		}
    		for _, request := range result.Requests {
    			e.Scheduler.Submit(request)
    		}
    	}
    }
    func createWork(in chan Request, out chan ParseResult, ready ReadyNotifier)  {
    	go func() {
    		for  {
    			ready.WorkerReady(in)
    			request := <- in
    			result, err := worker(request)
    			if err != nil {
    				continue
    			}
    			out <- result
    		}
    	}()
    }
    
    /crawler/engine/typers.go
    package engine
    
    type Request struct {
    	Url string
    	ParseFunc func([]byte) ParseResult
    }
    type ParseResult struct {
    	Requests []Request
    	Items []interface{}
    }
    func NilParser([]byte) ParseResult{
    	return ParseResult{}
    }
    
    /crawler/fetcher/fetcher.go
    package fetcher
    
    import (
    	"bufio"
    	"fmt"
    	"golang.org/x/net/html/charset"
    	"golang.org/x/text/encoding"
    	"golang.org/x/text/encoding/unicode"
    	"golang.org/x/text/transform"
    	"io/ioutil"
    	"log"
    	"net/http"
    	"time"
    )
    
    var rateLimiter = time.Tick(100 * time.Millisecond)
    func Fetch(url string) ([]byte, error)  {
    	<- rateLimiter
    	client := &http.Client{}
    	req, err := http.NewRequest("GET", url, nil)
    	if err != nil {
    		return nil, err
    	}
    	req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36")
    	resp, err := client.Do(req)
    	if err != nil {
    		return nil, err
    	}
    	defer resp.Body.Close()
    	if resp.StatusCode != http.StatusOK {
    		return nil, fmt.Errorf("Wrong status code: %d", resp.StatusCode)
    	}
    	bodyReader := bufio.NewReader(resp.Body)
    	e := determineEncoding(bodyReader)
    	utf8Reader := transform.NewReader(bodyReader, e.NewDecoder())
    	return ioutil.ReadAll(utf8Reader)
    }
    func determineEncoding(r *bufio.Reader) encoding.Encoding  {
    	bytes, err := r.Peek(1024)
    	if err != nil {
    		log.Printf("Fetcher error: %v", err)
    		return unicode.UTF8
    	}
    	e, _, _ := charset.DetermineEncoding(bytes, "")
    	return e
    }
    
    /crawler/zhenai/parser/citylist.go
    package parser
    
    import (
    	"learn/crawler/engine"
    	"regexp"
    )
    
    const cityListRe  = `<a href="(http://www.zhenai.com/zhenghun/[0-9a-z]+)" [^>]*>([^<]+)</a>`
    func ParseCityList(contents []byte) engine.ParseResult {
    	re := regexp.MustCompile(cityListRe)
    	matches := re.FindAllSubmatch(contents, -1)
    	result := engine.ParseResult{}
    	for _, m := range matches {
    		result.Items = append(result.Items, "City: "+string(m[2]))
    		result.Requests = append(result.Requests, engine.Request{
    			Url:       string(m[1]),
    			ParseFunc: ParseCity,
    		})
    	}
    	return result
    }
    
    /crawler/zhenai/parser/city.go
    package parser
    
    import (
    	"learn/crawler/engine"
    	"regexp"
    )
    var (
    	profileRe = regexp.MustCompile(`<a href="(http://album.zhenai.com/u/[0-9]+)" [^>]*>([^<]+)</a>`)
    	cityUrlRe = regexp.MustCompile(`href="(http://www.zhenai.com/zhenghun/[^"]+)"`)
    )
    func ParseCity(contents []byte) engine.ParseResult {
    	matches := profileRe.FindAllSubmatch(contents, -1)
    	result := engine.ParseResult{}
    	for _, m := range matches {
    		name := string(m[2])
    		result.Items = append(result.Items, "User "+name)
    		result.Requests = append(result.Requests, engine.Request{
    			Url:       string(m[1]),
    			ParseFunc: func(c []byte) engine.ParseResult {
    				return ParseProfile(c, "name:"+name)
    			},
    		})
    	}
    	matches = cityUrlRe.FindAllSubmatch(contents, -1)
    	for _, m := range matches {
    		result.Requests = append(result.Requests, engine.Request{
    			Url:       string(m[1]),
    			ParseFunc: ParseCity,
    		})
    	}
    	return result
    }
    
    /crawler/zhenai/parser/profile.go
    package parser
    
    import (
    	"learn/crawler/engine"
    	"learn/crawler/model"
    	"regexp"
    )
    
    const all = `<div class="m-btn purple" data-v-8b1eac0c>([^<]+)</div>`
    func ParseProfile(contents []byte, name string) engine.ParseResult {
    	profile := model.Profile{}
    	profile.User = append(profile.User, name)
    	re := regexp.MustCompile(all)
    	match := re.FindAllSubmatch(contents,-1)
    	if match != nil {
    		for _, m := range match {
    			profile.User = append(profile.User, string(m[1]))
    		}
    	}
    
    	result := engine.ParseResult{
    		Items: []interface{}{profile},
    	}
    	return result
    }
    
    /crawler/model/profile.go
    package model
    
    type Profile struct {
    	User []string
    }
    
    /crawler/scheduler/queued.go
    package scheduler
    
    import "learn/crawler/engine"
    
    type QueuedScheduler struct {
    	requestChan chan engine.Request
    	workChan chan chan engine.Request
    }
    
    func (s *QueuedScheduler) WorkerChan() chan engine.Request {
    	return make(chan engine.Request)
    }
    
    func (s *QueuedScheduler) Submit(r engine.Request) {
    	s.requestChan <- r
    }
    func (s *QueuedScheduler) WorkerReady(w chan engine.Request){
    	s.workChan <- w
    }
    func (s *QueuedScheduler) Run(){
    	s.workChan = make(chan chan engine.Request)
    	s.requestChan = make(chan engine.Request)
    	go func() {
    		var requestQ []engine.Request
    		var workerQ []chan engine.Request
    		for {
    			var activeRequest engine.Request
    			var activeWorker chan engine.Request
    			if len(requestQ) > 0 && len(workerQ) > 0 {
    				activeRequest = requestQ[0]
    				activeWorker = workerQ[0]
    			}
    			select {
    				case r := <-s.requestChan:
    					requestQ = append(requestQ, r)
    				case w := <-s.workChan:
    					workerQ = append(workerQ, w)
    				case activeWorker <- activeRequest:
    					workerQ = workerQ[1:]
    					requestQ = requestQ[1:]
    			}
    		}
    	}()
    }
    
    
    
    /crawler/scheduler/simple.go
    package scheduler
    
    import "learn/crawler/engine"
    
    type SimpleScheduler struct {
    	workerChan chan engine.Request
    }
    
    func (s *SimpleScheduler) WorkerChan() chan engine.Request {
    	return s.workerChan
    }
    
    func (s *SimpleScheduler) WorkerReady(chan engine.Request) {
    }
    
    func (s *SimpleScheduler) Run() {
    	s.workerChan = make(chan engine.Request)
    }
    
    
    func (s *SimpleScheduler) Submit(r engine.Request) {
    	go func() { s.workerChan <- r }()
    }
    

    完整项目

    https://gitee.com/FenYiYuan/golang-cpdcrawler.git

  • 相关阅读:
    linux系统防火墙对访问服务器的影响
    Android 样式的开发(转)
    Android开发学习总结(三)——appcompat_v7项目说明
    Android开发:碎片Fragment完全解析fragment_main.xml/activity_main.xml
    BootStrap 常用控件总结
    mybatis自定义代码生成器(Generator)——自动生成model&dao代码
    MySql的下载和安装(解压版)
    jquery mobile 表单提交 图片/文件 上传
    java读取.properties配置文件的几种方法
    AngularJS------认识AngularJS
  • 原文地址:https://www.cnblogs.com/chenwenyin/p/12361351.html
Copyright © 2011-2022 走看看