zoukankan      html  css  js  c++  java
  • hyperledger fabric v0.6 pbft源码分析(二)broadcast.go

    这部分和golang相关的特性紧密相连,所以先大致讲一下golang的特性

    go goroutine

    先看一个例子:

    // 例1
    func main() {
    	go fmt.Println("routine")
    	fmt.Println("main")
    
    	// 至此,程序运行结束,
    	// 所有活跃的goroutine被杀死
    }
    

    这里的go关键字创建了一个goroutine,它可以理解为一个轻量级线程。当main函数执行完时,会杀死所有goroutine,所以运行这个代码有时候输出:

    main

    也可能输出:

    main
    routine

    下面是个类似的例子:

    // 例2
    func main() {
    	go other()
    	fmt.Println("main")
    	time.Sleep(time.Second * 1)        // 等待1秒
    }
    
    func other()  {
    	fmt.Println("before")
    	time.Sleep(time.Second * 2)
    	fmt.Println("after")
    }
    

    这里after一定不会被输出,before可能非常大会输出(除非1s等待后,other还没有执行),main一定会输出。

    但是,这总有不确定的因素,一般来说,通过sleep的方式来实现线程之间的通信是不太可能的。下面介绍golang的另一个特性-channel

    go channel

    // 例3
    var exitChan = make(chan struct{})
    
    func main() {
    	go other()
    	fmt.Println("main")
    
    	<-exitChan
    }
    
    func other()  {
    	fmt.Println("before")
    	time.Sleep(time.Second * 2)
    	fmt.Println("after")
    
    	close(exitChan)
    }
    

    channel是有类型的,这里定义了一个struct{}类型的channel,定义channel需要使用chan修饰。这里使用了空结构体的管道:struct{}。这明确地指明该管道仅用于发信号,而不是传递数据。
    在主函数中,使用<-exitChan来读取channel内容,如果channel是空的,线程就会被阻塞,当调用close(exitChan)关闭管道时,会返回一个零值,使得主函数退出。这段代码一定会输出3个单词(顺序可能不一样)。

    再看一个类似的例子

    // 例4
    var exit1Chan = make(chan struct{})
    var exit2Chan = make(chan struct{})
    
    func main() {
    	go work1("work1")
    	go work2("work2")
    
    	<-exit1Chan
    	<-exit2Chan
    
    	fmt.Println("main")
    }
    
    func work1(text string)  {
    	time.Sleep(time.Second * 2)
    
    	fmt.Println("working:" + text)
    	close(exit1Chan)
    }
    
    func work2(text string)  {
    	time.Sleep(time.Second * 2)
    	fmt.Println("working:" + text)
    	close(exit2Chan)
    }
    

    主goroutine会一直等待两个线程全部完工后才继续,这是典型的master/slave模式的实现。

    死锁

    再看一个非常相似的例子:

    // 例5
    var exit1Chan = make(chan struct{})
    var exit2Chan = make(chan struct{})
    
    var work1Chan = make(chan struct{})
    var work2Chan = make(chan struct{})
    
    func main() {
    	go work1("work1")
    	go work2("work2")
    
    	<-exit1Chan
    	<-exit2Chan
    
    	fmt.Println("main")
    }
    
    func work1(text string)  {
    	time.Sleep(time.Second * 2)
    
    	fmt.Println("working:" + text)
    
    	<-work2Chan
    	work1Chan <- struct{}{}
    
    	close(exit1Chan)
    }
    
    func work2(text string)  {
    	time.Sleep(time.Second * 2)
    	fmt.Println("working:" + text)
    
    	<-work1Chan
    	work2Chan <- struct{}{}
    	close(exit2Chan)
    }
    

    work1与work2相互竞争彼此的资源,导致程序死锁。但golang对死锁提供了检测机制,使得死锁也不是那么难解决。

    select

    select是Go语言并发工具集中非常重要的工具。select用于从一组可能的分支中选择一个进行处理。如果任意一个分支都可以进一步处理,则从中随机选择一个,执行对应的语句。否则,如果又没有默认分支(default case),select语句则会阻塞,直到其中一个分支可以处理。

    // 例6
    var okchanel = make(chan bool)
    
    func main() {
    
    	go work1()
    
    	select {
    	case <-okchanel:
    		fmt.Println("work1 ok")
    	case <-time.After(time.Second * 2):
    		fmt.Println("Time out")
    	}
    
    	go work2()
    
    	select {
    	case <-okchanel:
    		fmt.Println("work2 ok")
    	case <-time.After(time.Second * 2):
    		fmt.Println("Time out")
    	}
    
    	fmt.Println("main")
    }
    
    func work1() {
    	time.Sleep(time.Second)
    	fmt.Print("finished work1
    ")
    	okchanel <- true
    }
    
    func work2() {
    	time.Sleep(time.Second * 3)
    	fmt.Print("finished work2
    ")
    	okchanel <- true
    }
    

    输出结果为:

    finished work1
    work1 ok
    Time out
    main

    work1由于等待时间短,完成了任务,而work2等待时间过长,未完成任务,本例在实际场景中,使用的非常多。

    代码分析

    回到hyperledger来,我们还是从测试看起:

    // consensus/pbft/broadcast_test.go
    func TestBroadcast(t *testing.T) {
    	m := &mockComm{
    		self:  1,
    		n:     4,
    		msgCh: make(chan mockMsg, 4),
    	}
    	sent := make(map[string]int)
    	go func() {
    		for msg := range m.msgCh {
    			sent[msg.dest.Name]++
    		}
    	}()
    
    	b := newBroadcaster(1, 4, 1, time.Second, m)
    
    	msg := &pb.Message{Payload: []byte("hi")}
    	b.Broadcast(msg)
    	time.Sleep(100 * time.Millisecond)
    	b.Close()
    
    	sentCount := 0
    	for _, q := range sent {
    		if q == 1 {
    			sentCount++
    		}
    	}
    
    	if sentCount < 2 {
    		t.Errorf("broadcast did not send to all peers: %v", sent)
    	}
    }
    

    先构造了一个mockComm,它实现了communicator所有接口。

    	m := &mockComm{
    		self:  1,
    		n:     4,
    		msgCh: make(chan mockMsg, 4),
    	}
    

    指定了自己的编号1,节点数4,消息通道缓冲大小为4

    	sent := make(map[string]int)
    	go func() {
    		for msg := range m.msgCh {
    			sent[msg.dest.Name]++
    		}
    	}()
    

    这里开启了一个goroutine,一个带有range子句的for语句会依次读取发往管道的值,直到该管道关闭。这里读取m.msgCh后,将对应节点的消息数加一。

    	b := newBroadcaster(1, 4, 1, time.Second, m)
    
    	msg := &pb.Message{Payload: []byte("hi")}
    	b.Broadcast(msg)
    	time.Sleep(100 * time.Millisecond)
    	b.Close()
    
    	sentCount := 0
    	for _, q := range sent {
    		if q == 1 {
    			sentCount++
    		}
    	}
    
    	if sentCount < 2 {
    		t.Errorf("broadcast did not send to all peers: %v", sent)
    	}
    

    构造了一个新的Broadcaster产生一个消息,并广播,然后收集消息数为1的节点(因为广播要保证消息只能被目标节点群各接收1遍),当节点数为3的时候表示测试成功。

    继续跟踪到源码

    func newBroadcaster(self uint64, N int, f int, broadcastTimeout time.Duration, c communicator) *broadcaster {
    	queueSize := 10 // XXX increase after testing
    
    	chans := make(map[uint64]chan *sendRequest)
    	b := &broadcaster{
    		comm:             c,
    		f:                f,
    		broadcastTimeout: broadcastTimeout,
    		msgChans:         chans,
    		closedCh:         make(chan struct{}),
    	}
    	for i := 0; i < N; i++ {
    		if uint64(i) == self {
    			continue
    		}
    		chans[uint64(i)] = make(chan *sendRequest, queueSize)
    	}
    
    	// We do not start the go routines in the above loop to avoid concurrent map read/writes
    	for i := 0; i < N; i++ {
    		if uint64(i) == self {
    			continue
    		}
    		go b.drainer(uint64(i))
    	}
    
    	return b
    }
    

    先创建了一个broadcaster对象,其中比较关键的是msgChans成员,它是一个map,键对应的是peer的id,值对应的是sendRequest类型的channel,并且将它的缓冲区设置为queueSize。msgChans不包括自己id的channel( != self)。

    创建完后,针对每一个id启动了go b.drainer(uint64(i))

    func (b *broadcaster) drainer(dest uint64) {
    	successLastTime := false
    	destChan, exsit := b.msgChans[dest] // Avoid doing the map lookup every send
    	if !exsit {
    		logger.Warningf("could not get message channel for replica %d", dest)
    		return
    	}
    
    	for {
    		select {
    		case send := <-destChan:
    			successLastTime = b.drainerSend(dest, send, successLastTime)
    		case <-b.closedCh:
    			for {
    				// Drain the message channel to free calling waiters before we shut down
    				select {
    				case send := <-destChan:
    					send.done <- false
    					b.closed.Done()
    				default:
    					return
    				}
    			}
    		}
    	}
    }
    

    先取出id对应的channel,然后就是个死循环。

    当destChan有值的时候,调用drainerSend进行发送。
    当b.closedCh关闭时,将对应的destChan的msg取出来,置为false,然后返回。

    初始的时候destChan没有值,所以阻塞到send函数被调用。

    • b.Broadcast(msg)
    func (b *broadcaster) Broadcast(msg *pb.Message) error {
    	return b.send(msg, nil)
    }
    

    继续看

    func (b *broadcaster) send(msg *pb.Message, dest *uint64) error {
    	select {
    	case <-b.closedCh:
    		return fmt.Errorf("broadcaster closed")
    	default:
    	}
    
    	var destCount int
    	var required int
    	if dest != nil {
    		destCount = 1
    		required = 1
    	} else {
    		destCount = len(b.msgChans)
    		required = destCount - b.f
    	}
    
    	wait := make(chan bool, destCount)
    
    	if dest != nil {
    		b.closed.Add(1)
    		b.unicastOne(msg, *dest, wait)
    	} else {
    		b.closed.Add(len(b.msgChans))
    		for i := range b.msgChans {
    			b.unicastOne(msg, i, wait)
    		}
    	}
    
    	succeeded := 0
    	timer := time.NewTimer(b.broadcastTimeout)
    
    	// This loop will try to send, until one of:
    	// a) the required number of sends succeed
    	// b) all sends complete regardless of success
    	// c) the timeout expires and the required number of sends have returned
    outer:
    	for i := 0; i < destCount; i++ {
    		select {
    		case success := <-wait:
    			if success {
    				succeeded++
    				if succeeded >= required {
    					break outer
    				}
    			}
    		case <-timer.C:
    			for i := i; i < required; i++ {
    				<-wait
    			}
    			break outer
    		}
    	}
    
    	return nil
    }
    

    先确定destCount(目标发送的个数)和 required(pbft要求的个数2f+1),然后一个个调用b.unicastOne(msg, i, wait)(这个函数很简单,就是向destChan放入msg),后面使用一个死循环来监视发送的进程,需要满足3个条件之一才能退出循环:

    1.收到了required个ok
    2.收到了所有的回复
    3.如果超时,收到了required个消息

    与此同时,阻塞的函数drainer由于有了msg,于是调用drainerSend进行真正的发送,由于这里具体发送依赖于Unicast的实现,测试端已经实现了这个函数,就是将所有需要发送的消息,放入m.msgCh所以测试代码最开始才有:

    		for msg := range m.msgCh {
    			sent[msg.dest.Name]++
    		}
    

    看到这里基本上逻辑上就通了。还有就是这个send.done其实就是wait这个channel,每当一个消息发送成功的时候就向wait写入一个true,否则写入false。

    总的流程如下:
    绘图文件
    44.png

    所以正常情况下,只要收到正确的2个以上消息,就会测试成功。

    看下一个例子:

    func TestBroadcastStuck(t *testing.T) {
    	m := &mockStuckComm{
    		mockComm: mockComm{
    			self:  1,
    			n:     4,
    			msgCh: make(chan mockMsg),
    		},
    		done: make(chan struct{}),
    	}
    	sent := make(map[string][]string)
    	go func() {
    		for msg := range m.msgCh {
    			key := string(msg.msg.Payload)
    			sent[key] = append(sent[key], msg.dest.Name)
    		}
    	}()
    
    	b := newBroadcaster(1, 4, 1, time.Second, m)
    
    	maxc := 20
    	for c := 0; c < maxc; c++ {
    		b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", c))})
    	}
    
    	done := make(chan struct{})
    	go func() {
    		select {
    		case <-done:
    			return
    		case <-time.After(time.Second):
    			t.Fatal("blocked")
    		}
    	}()
    	time.Sleep(100 * time.Millisecond)
    	close(m.done)
    	b.Close()
    	close(done)
    
    	sendDone := 0
    	for _, q := range sent {
    		if len(q) >= 2 {
    			sendDone++
    		}
    	}
    	if sendDone != maxc {
    		t.Errorf("expected %d sent messages: %v", maxc, sent)
    	}
    }
    

    这个例子和上面的差不多,但是重写了Unicast,它将自己设定为vp1,并且当收到vp0消息时,总是返回错误(超时或者channel关闭错误)。定义了一个新的map————sent,它将每一个消息及其收到这个消息的节点存起来,最后看节点数超过两个的消息个数等不等于预设的maxc值,相等表示测试成功。

    	for _, q := range sent {
    		fmt.Printf("----%d
    ", len(q))
    		if len(q) >= 2 {
    			sendDone++
    		}
    	}
    	if sendDone != maxc {
    		t.Errorf("expected %d sent messages: %v", maxc, sent)
    	}
    

    (ps..这里我觉得可以改成 : len(q) > 2)

    接下来的 func TestBroadcastUnicast(t *testing.T)非常简单,就是测试单播的函数

    然后

    func TestBroadcastAllFail(t *testing.T)
    将接收到的消息全部失败,如果不阻塞测试成功,否则会抛出超时的错误。

    下面这个例子有点意思

    func TestBroadcastTimeout(t *testing.T) {
    	expectTime := 10 * time.Second
    	deltaTime := 50 * time.Millisecond
    	m := &mockIndefinitelyStuckComm{
    		mockComm: mockComm{
    			self:  1,
    			n:     4,
    			msgCh: make(chan mockMsg),
    		},
    		done: make(chan struct{}),
    	}
    
    	b := newBroadcaster(1, 4, 1, expectTime, m)
    	broadcastDone := make(chan time.Time)
    
    	beginTime := time.Now()
    	go func() {
    		b.Broadcast(&pb.Message{Payload: []byte(fmt.Sprintf("%d", 1))})
    		broadcastDone <- time.Now()
    	}()
    
    	checkTime := expectTime + deltaTime
    	select {
    	case endTime := <-broadcastDone:
    		t.Log("Broadcast consume time: ", endTime.Sub(beginTime))
    		close(broadcastDone)
    		close(m.done)
    		return
    	case <-time.After(checkTime):
    		close(broadcastDone)
    		close(m.done)
    		t.Fatalf("Broadcast timeout after %v, expected %v", checkTime, expectTime)
    	}
    }
    

    它在启动的时候,设置了一个带有超时时间的Broadcaster,然后将得到当前时间记为beginTime,调用发送广播的函数,而函数一定vp0阻塞,且vp2,vp3立即失败,所以Broadcaster超时后返回,返回后将当前时间写下来,看一共花了多久。误差不超过expectTime + deltaTime,算测试成功。

    下一个测试TestBroadcastIndefinitelyStuck与之前的比较类似,区别在于把超时时间降低了,然后让它不断超时,但是不能超时到10s,若正常退出则成功。

    总结

    通过几个测试的案例,基本上覆盖了全部的源码,从中不仅学到了源码的设计思想,而且学到了测试的一些方法。

  • 相关阅读:
    把git项目放到个人服务器上
    关于fcitx无法切换输入法的问题解决
    博客变迁通知
    (欧拉回路 并查集 别犯傻逼的错了) 7:欧拉回路 OpenJudge 数据结构与算法MOOC / 第七章 图 练习题(Excercise for chapter7 graphs)
    (并查集) HDU 1856 More is better
    (并查集 不太会) HDU 1272 小希的迷宫
    (并查集 注意别再犯傻逼的错了) HDU 1213 How Many Tables
    (最小生成树 Kruskal算法) 51nod 1212 无向图最小生成树
    (并查集) HDU 1232 畅通工程
    (最小生成树 Prim) HDU 1233 还是畅通工程
  • 原文地址:https://www.cnblogs.com/xiaodeshan/p/7878135.html
Copyright © 2011-2022 走看看