zoukankan      html  css  js  c++  java
  • 如何优雅的关闭golang的channel

    How to Gracefully Close Channels ,这篇博客讲了如何优雅的关闭channel的技巧,好好研读,收获良多。

    众所周知,在golang中,关闭或者向已关闭的channel发送数据都会引发panic。

    谨遵优雅关闭channel的原则

    • 不要在接受一端关闭channel
    • 不要在有多个并发的senders中关闭channel。反过来说,如果只有一个协程充当sender,那么我们可以在这个sender协程内关闭掉channel。

    一个简单的方法

    • SafeClose
    type MyChannel struct {
    	C      chan T
    	closed bool
    	mutex  sync.Mutex
    }
    
    func NewMyChannel() *MyChannel {
    	return &MyChannel{C: make(chan T)}
    }
    
    func (mc *MyChannel) SafeClose() {
    	mc.mutex.Lock()
    	defer mc.mutex.Unlock()
    	if !mc.closed {
    		close(mc.C)
    		mc.closed = true
    	}
    }
    
    func (mc *MyChannel) IsClosed() bool {
    	mc.mutex.Lock()
    	defer mc.mutex.Unlock()
    	return mc.closed
    }
    
    • SafeSend
    func SafeSend(ch chan T, value T) (closed bool) {
    	defer func() {
    		if recover() != nil {
    			closed = true
    		}
    	}()
    
    	ch <- value  // panic if ch is closed
    	return false // <=> closed = false; return
    }
    
    • [x] 那边英文博客有一句话

    One drawback of the above SafeSend function is that its calls can't be used as send operations which follow the case keyword in select blocks.

    这里指的是SafeSend方法不能用在select...case...的case接受操作中,即

    select {
        case <- SafeSend(ch, 1)
    }
    

    因为case后面需要一个channel。

    优雅关闭channel的设计

    • 多个receivers,一个sender的情况。
    package main
    
    import (
    	"time"
    	"math/rand"
    	"sync"
    	"log"
    )
    
    func main() {
    	rand.Seed(time.Now().UnixNano())
    	log.SetFlags(0)
    
    	// ...
    	const MaxRandomNumber = 100000
    	const NumReceivers = 100
    
    	wgReceivers := sync.WaitGroup{}
    	wgReceivers.Add(NumReceivers)
    
    	// ...
    	dataCh := make(chan int, 100)
    
    	// the sender
    	go func() {
    		for {
    			if value := rand.Intn(MaxRandomNumber); value == 0 {
    				// The only sender can close the channel safely.
    				close(dataCh)
    				return
    			} else {
    				dataCh <- value
    			}
    		}
    	}()
    
    	// receivers
    	for i := 0; i < NumReceivers; i++ {
    		go func() {
    			defer wgReceivers.Done()
    
    			// Receive values until dataCh is closed and
    			// the value buffer queue of dataCh is empty.
    			for value := range dataCh {
    				log.Println(value)
    			}
    		}()
    	}
    
    	wgReceivers.Wait()
    }
    
    • 一个receiver,多个senders的情况。
    package main
    
    import (
    	"time"
    	"math/rand"
    	"sync"
    	"log"
    )
    
    func main() {
    	rand.Seed(time.Now().UnixNano())
    	log.SetFlags(0)
    
    	// ...
    	const MaxRandomNumber = 100000
    	const NumSenders = 1000
    
    	wgReceivers := sync.WaitGroup{}
    	wgReceivers.Add(1)
    
    	// ...
    	dataCh := make(chan int, 100)
    	stopCh := make(chan struct{})
    		// stopCh is an additional signal channel.
    		// Its sender is the receiver of channel dataCh.
    		// Its receivers are the senders of channel dataCh.
    
    	// senders
    	for i := 0; i < NumSenders; i++ {
    		go func() {
    			for {
    				// The try-receive operation is to try to exit
    				// the goroutine as early as possible. For this
    				// specified example, it is not essential.
    				select {
    				case <- stopCh:
    					return
    				default:
    				}
    
    				// Even if stopCh is closed, the first branch in the
    				// second select may be still not selected for some
    				// loops if the send to dataCh is also unblocked.
    				// But this is acceptable for this example, so the
    				// first select block above can be omitted.
    				select {
    				case <- stopCh:
    					return
    				case dataCh <- rand.Intn(MaxRandomNumber):
    				}
    			}
    		}()
    	}
    
    	// the receiver
    	go func() {
    		defer wgReceivers.Done()
    
    		for value := range dataCh {
    			if value == MaxRandomNumber-1 {
    				// The receiver of the dataCh channel is
    				// also the sender of the stopCh channel.
    				// It is safe to close the stop channel here.
    				close(stopCh)
    				return
    			}
    
    			log.Println(value)
    		}
    	}()
    
    	// ...
    	wgReceivers.Wait()
    }
    
    • 多个receivers和多个senders
    package main
    
    import (
    	"time"
    	"math/rand"
    	"sync"
    	"log"
    	"strconv"
    )
    
    func main() {
    	rand.Seed(time.Now().UnixNano())
    	log.SetFlags(0)
    
    	// ...
    	const MaxRandomNumber = 100000
    	const NumReceivers = 10
    	const NumSenders = 1000
    
    	wgReceivers := sync.WaitGroup{}
    	wgReceivers.Add(NumReceivers)
    
    	// ...
    	dataCh := make(chan int, 100)
    	stopCh := make(chan struct{})
    		// stopCh is an additional signal channel.
    		// Its sender is the moderator goroutine shown below.
    		// Its receivers are all senders and receivers of dataCh.
    	toStop := make(chan string, 1)
    		// The channel toStop is used to notify the moderator
    		// to close the additional signal channel (stopCh).
    		// Its senders are any senders and receivers of dataCh.
    		// Its receiver is the moderator goroutine shown below.
    		// It must be a buffered channel.
    
    	var stoppedBy string
    
    	// moderator
    	go func() {
    		stoppedBy = <-toStop
    		close(stopCh)
    	}()
    
    	// senders
    	for i := 0; i < NumSenders; i++ {
    		go func(id string) {
    			for {
    				value := rand.Intn(MaxRandomNumber)
    				if value == 0 {
    					// Here, the try-send operation is to notify the
    					// moderator to close the additional signal channel.
    					select {
    					case toStop <- "sender#" + id:
    					default:
    					}
    					return
    				}
    
    				// The try-receive operation here is to try to exit the
    				// sender goroutine as early as possible. Try-receive
    				// try-send select blocks are specially optimized by the
    				// standard Go compiler, so they are very efficient.
    				select {
    				case <- stopCh:
    					return
    				default:
    				}
    
    				// Even if stopCh is closed, the first branch in this
    				// select block may be still not selected for some
    				// loops (and for ever in theory) if the send to dataCh
    				// is also non-blocking. If this is not acceptable,
    				// then the above try-receive operation is essential.
    				select {
    				case <- stopCh:
    					return
    				case dataCh <- value:
    				}
    			}
    		}(strconv.Itoa(i))
    	}
    
    	// receivers
    	for i := 0; i < NumReceivers; i++ {
    		go func(id string) {
    			defer wgReceivers.Done()
    
    			for {
    				// Same as the sender goroutine, the try-receive
    				// operation here is to try to exit the receiver
    				// goroutine as early as possible.
    				select {
    				case <- stopCh:
    					return
    				default:
    				}
    
    				// Even if stopCh is closed, the first branch in this
    				// select block may be still not selected for some
    				// loops (and for ever in theory) if the receive from
    				// dataCh is also non-blocking. If this is not acceptable,
    				// then the above try-receive operation is essential.
    				select {
    				case <- stopCh:
    					return
    				case value := <-dataCh:
    					if value == MaxRandomNumber-1 {
    						// The same trick is used to notify
    						// the moderator to close the
    						// additional signal channel.
    						select {
    						case toStop <- "receiver#" + id:
    						default:
    						}
    						return
    					}
    
    					log.Println(value)
    				}
    			}
    		}(strconv.Itoa(i))
    	}
    
    	// ...
    	wgReceivers.Wait()
    	log.Println("stopped by", stoppedBy)
    }
    
  • 相关阅读:
    k8s podpreset 参数注入
    python pipenv 包管理
    使用火狐浏览器访问双向认证的k8s api
    k8s api
    常用的排序算法的时间复杂度和空间复杂度
    flask 启动
    Redis系列-第四篇持久化与事务
    Redis系列-第五篇分布式锁与主从复制
    Redis系列-第六篇哨兵模式
    Docker系列-第一篇Docker简介
  • 原文地址:https://www.cnblogs.com/linyihai/p/10612409.html
Copyright © 2011-2022 走看看