zoukankan      html  css  js  c++  java
  • 横向同步问题

        最常见到的同步是防止一个资源同时被多个人访问,这里要介绍的同步问题要不一样一点,他是多个线程之间要互相等待的一种同步,就像我们出去爬山,如果有人慢了,就要停下来等等他,这样我们才能一起到达目的地。

        这个问题最初是在金融历史数据反演的时候遇到的,简单的说,拿一份历史的股票或者外汇,期货的数据,按照时间顺序的模拟价格变东,通过一个算法来判断买卖,最后统计一下赢利的曲线。如下图所示:

    image

    举个外汇例子,一年的外汇数据,价格变化大概是2000万次,所以,只是一个非常简单的策略,模拟一下整个历史,也是要非常大的计算量。要解决这个问题一般要采用分布式的计算。当然,同时也要提高单台机器的性能,这就需要并发,把所有的cpu核都用掉。幸运的是,计算量最大的部分,交易策略内部一般是可以并行的,判断买卖内部也是可以并行的,但是有一点不幸的是,判断买卖的过程必须等所有的交易策略计算完成后,才能进行计算。这样就涉及一个同步的问题。如下图,这4个cpu不能随便并行,每一次计算完毕交易策略,都要等其他的交易策略完成,才能进入下一步判断买卖。

    image

    写成代码就是这样的(go语言):

    func main() {
        N := runtime.NumCPU()
        runtime.GOMAXPROCS(N)
        goend := make(chan int)
        t := time.Now()
        for i := 0; i < N; i++ {
            go func (index int, end chan int) {
                //fmt.Println("beg run", index)
                for i := 0; i < 1000000; i++ {
                    Step1(i, index)
                    wait(index, &inc, step)
                    Step2(i, index)
                    wait(index, &inc, step)
                    Step3(i, index)
                    wait(index, &inc, step)
                }
                //fmt.Println("end run", index)
               goend <- 1
            }(i, end)
        }
        for i := 0; i < N; i++ {
            <-goend
        }
        fmt.Println(result3, time.Now().Sub(t))
    }
    go就是打开一个新的线程。线程里面做的事情分成3步 Step1 , Step2, Step3. 每一步都有一个wait等待其他的线程也执行完成。在传统的并发编程里面,这个问题可以用条件变量来实现,但是,在go里面,因为不是非常推荐用锁定,所以我采用了channel来实现。经过我的测试,这channel 和 锁的实现方案都比较慢,而且性能非常的接近。这篇文章,我先介绍如何用channel的方法来实现,锁就不介绍了,不过我还会介绍一种不用锁,也不用channel的方法。
     
    当然,解决方案并不是关键,我想教给大家的是:我是如何想到这些方案的,是根据什么思路?遇到一个问题,如何和分析,如何找到问题的关键,这才是最重要的,要提高编程水平,就要经常这样去想问题。图中的线程,我们发现都是对等的,怎么同步呢?难道是0号线程完毕后,要通知其他线程,同时1号线程完毕后,也要通知其他线程吗?这样做肯定不好,因为通知太多,那怎么办呢?一个高效的组织必须有个领导,有一个领导,那么大家完成以后,都向领导汇报,由领导来决定是不是大家都完成了。那谁来当领导呢?还是按照中国传统,嫡长子当领导(我就是我家的嫡长子,因为我妈就生了我一个)。
    func wait(index int, inc *int32, step []chan int) {
        if index > 0 {
            atomic.AddInt32(inc, 1)
            step[index] <- 1 //往管道里面写,如果领导没有检查通过,那么就阻塞
        } else {
            for {
                if int(*inc) == len(step) - 1 { //大家已经都准备好了
                    break
                }
                runtime.Gosched()
            }
            atomic.StoreInt32(inc, 0)
            for i := 1; i < len(step); i++ {
                <-step[i] //领导检查完毕,可以继续执行了
            }
        }
    }

        inc是一个计数器,线程之间共享的,用来表示有几个线程已经执行完毕了。step是一个channel的数组,每个线程对应一个。代码已经能很清晰的说明问题了。不过眼尖的同学可能会看到,有个死循环啊,会不会很浪费cpu时间。我的答案是不会。runtime.Gosched() 表示把时间片让给别人,自己一般是阻塞了,这是一个很高级的sleep,我们经常会遇到要sleep多久的问题,这里不用考虑了,别人完成后,自然会通知你。根据我最终的统计,runtime.Gosched()  每次wait平均会执行两次左右,而这里如果用sleep,会发现性能非常的差。

        这个方案是非常的清晰,但是,性能上有点差,300万次同步的耗时在20s左右。这个性能,对我们的问题实际上是够用的,一年2000万的数据,我们一个策略一般要跑1个小时以上,损失两分钟同步的时间,非常值得了。但是,也有某些给客户演示的策略,非常简单,比如一个均线,那么可能跑一次一共就10s,调度时间花去了两分钟,这就有点多了。所以,还得改进一些。


    一般写程序,遇到这样的问题是最头痛了。我们用了推荐的方法解决了一个问题,最后发现这个问题太慢了。于是,我想到用采用条件变量,发现,和chan的性能非常接近,就在山重水复疑无路的时候,我玩了两盘三国杀,结果灵感又来了。仔细分析上面版本的程序,实际上,经历了两个过程:一个是大家向领导汇报的过程,一个过程是领导通知大家都已经完成了。于是,我想到用一个变量表示一下完成了,直接发现不行,因为,这样的话,领导要等大家都接到通知后,清除变量,这个大家都接到通知然后通知给领导不好处理。

        如果从一个自动机的角度来说,要比较简单,那么就是经过很多状态后,最终回归到某一个状态。你会发现channel版本就是做了这样一个事情,经过一次wait后,有恢复到初始状态,可以进入下次wait了。

        这个状态自动回归你可能在其他地方,或者书上没有怎么听说过,只是个人的一些经验。很多人,写并发的程序没有思路,而且bug丛生,很多时候,没有考虑到状态回归的问题。如果把并发的状态看成一个自动机,那么基本上,所有状态之间都能跃迁,比串行程序的状态要多的多。因为并发了之后,之前很多很多状态之间发生的顺序不定,就像,愤怒小鸟那个很多皮球跳的那一关,一个小鸟稍微干扰一下跳动,那么皮球就能跳到每个空间,把猪猪都杀死。而这样的多的状态下面,往往会引起一些很不起眼的bug。

        我们设计程序的时候,首先要保证产生的状态要越少越好。比如上面的问题,如果我引入一个变量来表示“大家都已经完成”,那么等通知完毕大家,大家进入下一步的时候,就必须清除这个状态,这样必然会引入一个什么时候清除这样一个态。

        好了,不废话了。看最终版本的代码:

    func wait(index int, inc *int32) {
        if index == (N - 1) {
            for {
                if *inc == READY {
                    atomic.AddInt32(inc, READY << uint(N-1))
                    break
                }
                runtime.Gosched()
            }
        } else {
            atomic.AddInt32(inc, 1 << uint(index))
            for {
                if *inc & (COMMIT << uint(index)) == 0 {
                    runtime.Gosched()
                    continue
                }
                //clean bit
                atomic.AddInt32(inc, -(COMMIT << uint(index) + 1 << uint(index)))
                break
            }
        }
    }

    这里,和上次不一样,不再是嫡长子当领导了,当领导的是最后一个线程(这里这样处理要方便一点,所以要么当鸡头,要么当凤尾,夹在中间做事情,反而没有机会,哈哈)

    0 到 N-2 位 表示线程是否准备好了

    N –1 到 2N – 3 位表示线程是否可以进入下一步(大家都完成了)了

    READY = 1 << (N - 1) - 1,如果N = 4 ,那么表示成二进制就是 00000111

    COMMIT = 1 << (N -1), 如果N = 4,那么表示成二进制就是  00001000

    如果领导检查到 READY(00000111)了,那么把状态就设置为,00111111

    0 号线程通知到了 ,就变成: 00110110

    1 号线程通知到了,就变成  00100100,当然,这不是唯一状态,在这个时候,2号线程可能先清除,这个时候是 00000000

    也可能是0号线程已经进入下一步了,00100101

    这也说明了一个问题,几乎是所有状态都能跑到,但是,我们必须让所有的线程最终都能进入下一个READY,这就是我说的回归。经过这些年的锤炼,我明白的,并发状态要减少复杂性,就是要巧妙的设计一个方案,让状态回归,否则问题复杂了,怎么死都不知道。可以看看golang sync.Mutex的源代码,非常符合我说的让状态最终回归的思想。

    下面我把最终的测试代码贴出来,注意,如果要在实际中使用,还是要注意一下接口的合理设计和包装。不能直接这样使用。

    在我的机器上经过测试,这个基本上100万个并发,消耗1s,比channel版本的有很大的提高。你也可以设计自己的wait同步方法,可以和我交流交流。最终这个程序的result3 == 0 那么,就是wait写的没有问题。题外话,对这类高性能计算问题,go比c#要快很多,而且内存也要省很多(我的意思说,用C#推荐的方式编程,而不是用各种方式死抠性能,不知道C#的这样方式的同步速度有多快)。

    package main
     
    import "fmt"
    import "sync"
    import "sync/atomic"
    import "time"
    import "runtime"
     
    const N = 4
    const READY = int32(1 << (N - 1) - 1)
    const COMMIT = int32(1 << (N -1))
    var mutex sync.Mutex
    var inc int32
     
    var result1 int32
    var result2 int32
    var result3 int32
     
    func main() {
        fmt.Println("ncpu = ", runtime.NumCPU())
        runtime.GOMAXPROCS(runtime.NumCPU())
        end := make(chan int)
        t := time.Now()
        for i := 0; i < N; i++ {
            go run(i, end)
        }
        for i := 0; i < N; i++ {
          <-end
        }
        fmt.Println(result3, time.Now().Sub(t))
    }
     
    func run(index int, end chan int) {
        //fmt.Println("beg run", index)
        for i := 0; i < 1000000; i++ {
            Step1(i, index)
            wait(index, &inc)
            Step2(i, index)
            wait(index, &inc)
            Step3(i, index)
            wait(index, &inc)
        }
        //fmt.Println("end run", index)
        end <- 1
    }
     
    func Step1(i int, index int) {
        atomic.AddInt32(&result1, int32(i))
        //fmt.Println("step 1", index)
    }
     
    func Step2(i int, index int) {
        atomic.AddInt32(&result2, int32(i))
        //fmt.Println("step 2", index)
    }
     
    func Step3(i int, index int) {
        mutex.Lock()
        if result2 != result1 {
            //fmt.Println("error", result1, result2, result2 - result1, i, index)
        }
        atomic.AddInt32(&result3, result2)
        atomic.AddInt32(&result3, -result1)
        atomic.StoreInt32(&result2, 0)
        atomic.StoreInt32(&result1, 0)
        mutex.Unlock()
    }
     
    //no chan wait. no lock. suport ncpu = 16, fast wait
    func wait(index int, inc *int32) {
        if index == (N - 1) {
            for {
                if *inc == READY {
                    atomic.AddInt32(inc, READY << uint(N-1))
                    break
                }
                runtime.Gosched()
            }
        } else {
            atomic.AddInt32(inc, 1 << uint(index))
            for {
                if *inc & (COMMIT << uint(index)) == 0 {
                    runtime.Gosched()
                    continue
                }
                //clean bit
                atomic.AddInt32(inc, -(COMMIT << uint(index) + 1 << uint(index)))
                break
            }
        }
    }
     
  • 相关阅读:
    POJ 2411 状态压缩递,覆盖方案数
    POJ 2774 最长公共子串
    POJ 1743 不可重叠的最长重复子串
    POJ 3294 出现在至少K个字符串中的子串
    POJ 3261 出现至少K次的可重叠最长子串
    POJ 1741/1987 树的点分治
    HDU1556 Color the ball
    解决linux系统时间不对的问题
    CentOS 6.9使用Setup配置网络(解决dhcp模式插入网线不自动获取IP的问题)
    Linux网络配置(setup)
  • 原文地址:https://www.cnblogs.com/niniwzw/p/2743296.html
Copyright © 2011-2022 走看看