zoukankan      html  css  js  c++  java
  • golang异步任务

    golang异步任务

    1. 文件目录

    ├── go.mod
    ├── main.go
    └── workpool
        └── workpool.go
    

    2. workpool/workpool.go文件

    package workpool
    
    import (
    	"errors"
    	"fmt"
    	"log"
    	"runtime"
    	"sync"
    	"sync/atomic"
    )
    
    var (
    	ErrCapacity = errors.New("Thread Pool At Capacity")
    )
    
    type (
    	// poolWork is passed into the queue for work to be performed.
    	poolWork struct {
    		work          PoolWorker // The Work to be performed.
    		resultChannel chan error // Used to inform the queue operaion is complete.
    	}
    
    	// WorkPool implements a work pool with the specified concurrency level and queue capacity.
    	WorkPool struct {
    		shutdownQueueChannel chan string     // Channel used to shut down the queue routine.
    		shutdownWorkChannel  chan struct{}   // Channel used to shut down the work routines.
    		shutdownWaitGroup    sync.WaitGroup  // The WaitGroup for shutting down existing routines.
    		queueChannel         chan poolWork   // Channel used to sync access to the queue.
    		workChannel          chan PoolWorker // Channel used to process work.
    		queuedWork           int32           // The number of work items queued.
    		activeRoutines       int32           // The number of routines active.
    		queueCapacity        int32           // The max number of items we can store in the queue.
    	}
    )
    
    // PoolWorker must be implemented by the object we will perform work on, now.
    type PoolWorker interface {
    	DoWork(workRoutine int)
    }
    
    // init is called when the system is inited.
    func init() {
    	log.SetPrefix("TRACE: ")
    	log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
    }
    
    // New creates a new WorkPool.
    func New(numberOfRoutines int, queueCapacity int32) *WorkPool {
    	workPool := WorkPool{
    		shutdownQueueChannel: make(chan string),
    		shutdownWorkChannel:  make(chan struct{}),
    		queueChannel:         make(chan poolWork),
    		workChannel:          make(chan PoolWorker, queueCapacity),
    		queuedWork:           0,
    		activeRoutines:       0,
    		queueCapacity:        queueCapacity,
    	}
    
    	// Add the total number of routines to the wait group
    	workPool.shutdownWaitGroup.Add(numberOfRoutines)
    
    	// Launch the work routines to process work
    	for workRoutine := 0; workRoutine < numberOfRoutines; workRoutine++ {
    		go workPool.workRoutine(workRoutine)
    	}
    
    	// Start the queue routine to capture and provide work
    	go workPool.queueRoutine()
    
    	return &workPool
    }
    
    // Shutdown will release resources and shutdown all processing.
    func (workPool *WorkPool) Shutdown(goRoutine string) (err error) {
    	defer catchPanic(&err, goRoutine, "Shutdown")
    
    	writeStdout(goRoutine, "Shutdown", "Started")
    	writeStdout(goRoutine, "Shutdown", "Queue Routine")
    
    	workPool.shutdownQueueChannel <- "Down"
    	<-workPool.shutdownQueueChannel
    
    	close(workPool.queueChannel)
    	close(workPool.shutdownQueueChannel)
    
    	writeStdout(goRoutine, "Shutdown", "Shutting Down Work Routines")
    
    	// Close the channel to shut things down.
    	close(workPool.shutdownWorkChannel)
    	workPool.shutdownWaitGroup.Wait()
    
    	close(workPool.workChannel)
    
    	writeStdout(goRoutine, "Shutdown", "Completed")
    	return err
    }
    
    // PostWork will post work into the WorkPool. This call will block until the Queue routine reports back
    // success or failure that the work is in queue.
    func (workPool *WorkPool) PostWork(goRoutine string, work PoolWorker) (err error) {
    	defer catchPanic(&err, goRoutine, "PostWork")
    
    	poolWork := poolWork{work, make(chan error)}
    
    	defer close(poolWork.resultChannel)
    
    	workPool.queueChannel <- poolWork
    	err = <-poolWork.resultChannel
    
    	return err
    }
    
    // QueuedWork will return the number of work items in queue.
    func (workPool *WorkPool) QueuedWork() int32 {
    	return atomic.AddInt32(&workPool.queuedWork, 0)
    }
    
    // ActiveRoutines will return the number of routines performing work.
    func (workPool *WorkPool) ActiveRoutines() int32 {
    	return atomic.AddInt32(&workPool.activeRoutines, 0)
    }
    
    // CatchPanic is used to catch any Panic and log exceptions to Stdout. It will also write the stack trace.
    func catchPanic(err *error, goRoutine string, functionName string) {
    	if r := recover(); r != nil {
    		// Capture the stack trace
    		buf := make([]byte, 10000)
    		runtime.Stack(buf, false)
    
    		writeStdoutf(goRoutine, functionName, "PANIC Defered [%v] : Stack Trace : %v", r, string(buf))
    
    		if err != nil {
    			*err = fmt.Errorf("%v", r)
    		}
    	}
    }
    
    // writeStdout is used to write a system message directly to stdout.
    func writeStdout(goRoutine string, functionName string, message string) {
    	log.Printf("%s : %s : %s
    ", goRoutine, functionName, message)
    }
    
    // writeStdoutf is used to write a formatted system message directly stdout.
    func writeStdoutf(goRoutine string, functionName string, format string, a ...interface{}) {
    	writeStdout(goRoutine, functionName, fmt.Sprintf(format, a...))
    }
    
    // workRoutine performs the work required by the work pool
    func (workPool *WorkPool) workRoutine(workRoutine int) {
    	for {
    		select {
    		// Shutdown the WorkRoutine.
    		case <-workPool.shutdownWorkChannel:
    			writeStdout(fmt.Sprintf("WorkRoutine %d", workRoutine), "workRoutine", "Going Down")
    			workPool.shutdownWaitGroup.Done()
    			return
    
    		// There is work in the queue.
    		case poolWorker := <-workPool.workChannel:
    			workPool.safelyDoWork(workRoutine, poolWorker)
    			break
    		}
    	}
    }
    
    // safelyDoWork executes the user DoWork method.
    func (workPool *WorkPool) safelyDoWork(workRoutine int, poolWorker PoolWorker) {
    	defer catchPanic(nil, "WorkRoutine", "SafelyDoWork")
    	defer atomic.AddInt32(&workPool.activeRoutines, -1)
    
    	// Update the counts
    	atomic.AddInt32(&workPool.queuedWork, -1)
    	atomic.AddInt32(&workPool.activeRoutines, 1)
    
    	// Perform the work
    	poolWorker.DoWork(workRoutine)
    }
    
    // queueRoutine captures and provides work.
    func (workPool *WorkPool) queueRoutine() {
    	for {
    		select {
    		// Shutdown the QueueRoutine.
    		case <-workPool.shutdownQueueChannel:
    			writeStdout("Queue", "queueRoutine", "Going Down")
    			workPool.shutdownQueueChannel <- "Down"
    			return
    
    		// Post work to be processed.
    		case queueItem := <-workPool.queueChannel:
    			// If the queue is at capacity don't add it.
    			if atomic.AddInt32(&workPool.queuedWork, 0) == workPool.queueCapacity {
    				queueItem.resultChannel <- ErrCapacity
    				continue
    			}
    
    			// Increment the queued work count.
    			atomic.AddInt32(&workPool.queuedWork, 1)
    
    			// Queue the work for the WorkRoutine to process.
    			workPool.workChannel <- queueItem.work
    
    			// Tell the caller the work is queued.
    			queueItem.resultChannel <- nil
    			break
    		}
    	}
    }
    

    3. main.go 调用

    package main
    import (
    	"bufio"
    	"fmt"
    	"goroutine-demo/workpool"
    	"os"
    	"runtime"
    	"strconv"
    	"time"
    )
    
    type MyWork struct {
    	Name      string "The Name of a person"
    	BirthYear int    "The Yea the person was born"
    	WP        *workpool.WorkPool
    }
    
    func (workPool *MyWork) DoWork(workRoutine int) {
    	fmt.Printf("%s : %d
    ", workPool.Name, workPool.BirthYear)
    	fmt.Printf("*******> WR: %d  QW: %d  AR: %d
    ", workRoutine, workPool.WP.QueuedWork(), workPool.WP.ActiveRoutines())
    	time.Sleep(100 * time.Millisecond)
    	//panic("test")
    }
    func main() {
    	runtime.GOMAXPROCS(runtime.NumCPU())
    	workPool := workpool.New(runtime.NumCPU()*3, 10)
    	shutdown := false // Just for testing, I Know
    	go func() {
    		for i := 0; i < 1000; i++ {
    			work := &MyWork{
    				Name:      "A" + strconv.Itoa(i),
    				BirthYear: i,
    				WP:        workPool,
    			}
    			err := workPool.PostWork("name_routine", work)
    			if err != nil {
    				fmt.Printf("ERROR: %s
    ", err)
    				time.Sleep(100 * time.Millisecond)
    			}
    			if shutdown == true {
    				return
    			}
    		}
    	}()
    	fmt.Println("Hit any key to exit")
    	reader := bufio.NewReader(os.Stdin)
    	reader.ReadString('
    ')
    	shutdown = true
    	fmt.Println("Shutting Down
    ")
    	workPool.Shutdown("name_routine")
    }
    

    相关链接

    https://studygolang.com/articles/14481
    https://github.com/goinggo/workpool

    【励志篇】: 古之成大事掌大学问者,不惟有超世之才,亦必有坚韧不拔之志。
  • 相关阅读:
    C#多线程操作界面控件的解决方案
    InvokeHelper,让跨线程访问/修改主界面控件不再麻烦
    .netCF中后台多线程与UI界面交互的冻结问题
    c#设计模式第一天
    C#代理
    界面
    第一章面向对象涉及原则
    C# 为webBrowser设置代理
    设计模式等
    下载: Intel® 64 and IA32 Architectures Software Developer Manuals
  • 原文地址:https://www.cnblogs.com/tomtellyou/p/13870226.html
Copyright © 2011-2022 走看看