zoukankan      html  css  js  c++  java
  • 在Golang中实现有无限容量的同步Queue

    chan对象是Golang的一个核心卖点,可以轻松实现goroutine之间的通信。Golang允许我们为chan设置不同的缓冲大小。当默认缓冲大小为0的时候,一个goroutine对chan的写入操作必须要等到有其他goroutine对chan进行读取的时候才会返回,反之一个goroutine对chan进行读取的时候要等到另外一个goroutine对chan进行写入才会返回。如果我们不希望每次对chan进行读取和写入都堵塞的话,可以对chan设置缓冲大小。这样,在缓冲区没满之前,goroutine可以不停的对chan进行写入而不会发生堵塞,直到缓冲区被填满。

    有时候我们需要把某个请求或者数据放入到chan中,然后立刻返回去做其他事情。这种情况下为了避免chan发生堵塞,我们需要为chan设置一个足够大的缓冲大小。如果缓冲大小设置的过小,就很难避免出现堵塞,而把缓冲大小设置的过大,又会造成额外的内存开销,因为chan对象在创建(make)的时候就已经分配了足够的内存作为缓冲。

    因此我在实际项目中经常使用一个同步的先入先出队列(SyncQueue)。数据生产者调用队列的Push函数将数据添加到队列中,Push函数在任何情况下不会发生堵塞。数据消费者使用Pop函数获得一个数据。如果队列中当前为空,则Pop函数会挂起当前goroutine,直到有其他goroutine Push新的数据到队列中。SyncQueue不需要提前生成一个巨大的缓存,因此不会占用大量的内存,并且提供无限(除非内存满)的队列长度。

    同步队列(SyncQueue)实现:https://github.com/xiaonanln/go-xnsyncutil/blob/master/xnsyncutil/sync_queue.go

    接口文档:https://godoc.org/github.com/xiaonanln/go-xnsyncutil/xnsyncutil#SyncQueue

     1 package xnsyncutil
     2 
     3 import (
     4     "sync"
     5 
     6     "gopkg.in/eapache/queue.v1"
     7 )
     8 
     9 // Synchronous FIFO queue
    10 type SyncQueue struct {
    11     lock    sync.Mutex
    12     popable *sync.Cond
    13     buffer  *queue.Queue
    14     closed  bool
    15 }
    16 
    17 // Create a new SyncQueue
    18 func NewSyncQueue() *SyncQueue {
    19     ch := &SyncQueue{
    20         buffer: queue.New(),
    21     }
    22     ch.popable = sync.NewCond(&ch.lock)
    23     return ch
    24 }
    25 
    26 // Pop an item from SyncQueue, will block if SyncQueue is empty
    27 func (q *SyncQueue) Pop() (v interface{}) {
    28     c := q.popable
    29     buffer := q.buffer
    30 
    31     q.lock.Lock()
    32     for buffer.Length() == 0 && !q.closed {
    33         c.Wait()
    34     }
    35 
    36     if buffer.Length() > 0 {
    37         v = buffer.Peek()
    38         buffer.Remove()
    39     }
    40 
    41     q.lock.Unlock()
    42     return
    43 }
    44 
    45 // Try to pop an item from SyncQueue, will return immediately with bool=false if SyncQueue is empty
    46 func (q *SyncQueue) TryPop() (v interface{}, ok bool) {
    47     buffer := q.buffer
    48 
    49     q.lock.Lock()
    50 
    51     if buffer.Length() > 0 {
    52         v = buffer.Peek()
    53         buffer.Remove()
    54         ok = true
    55     } else if q.closed {
    56         ok = true
    57     }
    58 
    59     q.lock.Unlock()
    60     return
    61 }
    62 
    63 // Push an item to SyncQueue. Always returns immediately without blocking
    64 func (q *SyncQueue) Push(v interface{}) {
    65     q.lock.Lock()
    66     if !q.closed {
    67         q.buffer.Add(v)
    68         q.popable.Signal()
    69     }
    70     q.lock.Unlock()
    71 }
    72 
    73 // Get the length of SyncQueue
    74 func (q *SyncQueue) Len() (l int) {
    75     q.lock.Lock()
    76     l = q.buffer.Length()
    77     q.lock.Unlock()
    78     return
    79 }
    80 
    81 func (q *SyncQueue) Close() {
    82     q.lock.Lock()
    83     if !q.closed {
    84         q.closed = true
    85         q.popable.Signal()
    86     }
    87     q.lock.Unlock()
    88 }

    Category: Golang 标签:

  • 相关阅读:
    <<一线架构师实践指南>>读书笔记之二PA阶段
    【读书笔记】简约至上交互设计四策略第4章 删除
    大数据量简单数据查询设计思考
    识别项目干系人
    【读书笔记】简约至上交互设计四策略第3章 简约四策略
    【读书笔记】简约至上交互设计四策略第2章 明确认识
    【读书笔记】简约至上交互设计四策略第1章 话说简单
    采购管理计划
    项目管理整体的一些基本概念1
    【读书笔记】简约至上交互设计四策略第5章 组织
  • 原文地址:https://www.cnblogs.com/isaiah/p/7266302.html
Copyright © 2011-2022 走看看