zoukankan      html  css  js  c++  java
  • [分布式系统学习] 6.824 LEC2 RPC和线程 笔记

    6.824的课程通常是在课前让你做一些准备。一般来说是先读一篇论文,然后请你提一个问题,再请你回答一个问题。然后上课,然后布置Lab。

    第二课的准备-Crawler

    第二课的准备不是论文,是让你实现Go Tour里面的crawler。Go Tour里面原有的实现是串行的,并且可能爬到相同的url。要求让你并行并去重。

    简单想法就是,为了实现并行,爬每个url都是用goroutine;为了实现去重,每次开爬就把url放到map中。

    不过这里有个知识点,Crawler函数最后返回成功,所有url都要爬完,所以需要一个机制去等待所有goroutine完成。查了一下可以用sync.WaitGroup。那一个直观的实现:

    // Crawl uses fetcher to recursively crawl
    // pages starting with url, to a maximum of depth.
    func Crawl(url string, depth int, fetcher Fetcher) {
    var collector Collector;
    collector.fetchedUrl = make(map[string]bool)
    CrawlInt(url, depth, fetcher, &collector)
    collector.Wait()
    }

    type Collector struct {
    sync.Mutex
    sync.WaitGroup
    fetchedUrl map[string]bool
    }

    func CrawlInt(url string, depth int, fetcher Fetcher, collector *Collector) {
    if depth <= 0 {
    return
    }
    collector.Lock()
    if _, ok := collector.fetchedUrl[url]; ok {
    //visited,
    collector.Unlock()
    return
    }
    collector.fetchedUrl[url] = true
    collector.Unlock()
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
    fmt.Println(err)
    return
    }
    collector.Add(len(urls))
    fmt.Printf("found: %s %q ", url, body)
    for _, u := range urls {
    go func (u string) {
    CrawlInt(u, depth-1, fetcher, collector)
    collector.Done()
    }(u)
    }
    return
    }

    不过看到了答案,觉得答案很简洁,不仅没有用到WaitGroup,甚至连一个Lock都没有使用。

    //
    // Concurrent crawler with channels
    //
    
    func dofetch(url1 string, ch chan []string, fetcher Fetcher) {
    	body, urls, err := fetcher.Fetch(url1)
    	if err != nil {
    		fmt.Println(err)
    		ch <- []string{}
    	} else {
    		fmt.Printf("found: %s %q
    ", url1, body)
    		ch <- urls
    	}
    }
    
    func master(ch chan []string, fetcher Fetcher) {
    	n := 1
    	fetched := make(map[string]bool)
    	for urls := range ch {
    		for _, u := range urls {
    			if _, ok := fetched[u]; ok == false {
    				fetched[u] = true
    				n += 1
    				go dofetch(u, ch, fetcher)
    			}
    		}
    		n -= 1
    		if n == 0 {
    			break
    		}
    	}
    }
    
    func CrawlConcurrentChannel(url string, fetcher Fetcher) {
    	ch := make(chan []string)
    	go func() {
    		ch <- []string{url}
    	}()
    	master(ch, fetcher)
    }

    Crawler函数是那个CrawlConcurrentChannel。ch里面放的是每次fetch返回的页面数组。为什么不用到Lock呢?因为fetched map的判断和加入都在主线程中。

    ch里面的urls当然可能重复,但是在主线程中已经判断过了不会重复fetch。

    而通过n来判断是否所有页面都被爬取了。所以有n==sizeof (ch) == sizeof (fetched)。这里的sizeof指的是所有放入的,不是某一时刻的。

    Go 的RPC

    我们在前面一个Lab里面已经遇到过了。觉得有点像Soap的方式,不过完全没有Soap那么复杂,需要定义wsdl。

    至少发送一次 vs 至多发送一次

    至少发送一次:RPC lib 等待返回,如果超时,再发。这样多尝试几次,始终没有返回,就报错。

    这样能解决问题么?如果是发送的克扣余额会出现什么问题?

    所以“至少发送一次”对于只读操作,和可重入操作是有效的。比如我们上一个Lab中的Map和Reduce,都是可重入的。

    至多发送一次:问题在于如何检测重复请求。

    client可以发送一个唯一的ID(XID)用于验证重复。服务器做如下处理。

    server:
        if seen[xid]:
          r = old[xid]
        else
          r = handler()
          old[xid] = r
          seen[xid] = true

    这里要处理的问题是:

    1. client怎么保证XID唯一?现在UUID可以做到,另外也可以通过ip地址加上序列号来做hash值。

    2. 服务器要在某时刻清理调之前的请求,否则每个请求都放到seen map里面,那要爆掉了。client可以在每条RPC中都包含一个”已经收到#<X的回复“的信息,这样,服务器就可以抛弃它们。

    3. 服务器正在处理某个request,但是新的request已经进来了,服务器不想做第二次,那么他可以设置一个”pending“flag,让新的request等待或者忽略。

    Go语言的RPC策略是”至多发送一次“。

  • 相关阅读:
    Spring Boot 中使用 @Transactional 注解配置事务管理
    springboot 整合Swagger2的使用
    Vue的参数请求与传递
    SpringMVC的全局异常处理
    SpringBoot集成MyBatis的Bean配置方式
    Springboot整合通用mapper
    个人作业——软件工程实践总结作业
    团队作业第二次—项目选题报告(追光的人)
    结对第二次—文献摘要热词统计及进阶需求
    结对第一次—原型设计(文献摘要热词统计)
  • 原文地址:https://www.cnblogs.com/lichen782/p/7020464.html
Copyright © 2011-2022 走看看