通过Go来处理每分钟达百万的数据请求
我读《通过Go来处理每分钟达百万的数据请求》
我读《通过Go来处理每分钟达百万的数据请求》
原文
原文作者为Malwarebytes公司的首席架构师Marcio Castilho http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
问题描述
当我们的服务端需要处理大量的耗时任务时,我们一般都会考虑将耗时任务异步处理。
简单粗暴法
golang恰恰给我们的异步处理带来了很大的便利--go func()
。然而,绝大多数的时候,我们不能简单粗暴的创建协程来处理异步任务,原因是不可控。虽然协程相对于线程占用的系统资源更少,但这并不代表我们可以无休止的创建协程。积水成江,不停创建协程也有压垮系统的风险。这里引用原作者的demo,一个执行耗时任务的handler。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
这就是我们遇到的第一个问题,简单粗暴起协程处理耗时任务导致的系统不可控性。我们自然而然就会想,怎么能让系统更可控呢。
优雅的方法
一个很自然的思路是,建立任务队列。golang提供了线程安全的任务队列实现方式--带缓冲的channal。但是这样只是延后了请求的爆发。作者意识到,要解决这一问题,必须控制协程的数量。如何控制协程的数量?Job/Worker模式!这里我将作者的代码修改了一下,单文件执行,以记录这一模式。
package main
import (
"fmt"
"reflect"
"time"
)
var (
MaxWorker = 10
)
type Payload struct {
Num int
}
//待执行的工作
type Job struct {
Payload Payload
}
//任务channal
var JobQueue chan Job
//执行任务的工作者单元
type Worker struct {
WorkerPool chan chan Job //工作者池--每个元素是一个工作者的私有任务channal
JobChannel chan Job //每个工作者单元包含一个任务管道 用于获取任务
quit chan bool //退出信号
no int //编号
}
//创建一个新工作者单元
func NewWorker(workerPool chan chan Job, no int) Worker {
fmt.Println("创建一个新工作者单元")
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),
no: no,
}
}
//循环 监听任务和结束信号
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
fmt.Println("w.WorkerPool <- w.JobChannel", w)
select {
case job := <-w.JobChannel:
fmt.Println("job := <-w.JobChannel")
// 收到任务
fmt.Println(job)
time.Sleep(100 * time.Second)
case <-w.quit:
// 收到退出信号
return
}
}
}()
}
// 停止信号
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
//调度中心
type Dispatcher struct {
//工作者池
WorkerPool chan chan Job
//工作者数量
MaxWorkers int
}
//创建调度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}
//工作者池的初始化
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 1; i < d.MaxWorkers+1; i++ {
worker := NewWorker(d.WorkerPool, i)
worker.Start()
}
go d.dispatch()
}
//调度
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
fmt.Println("job := <-JobQueue:")
go func(job Job) {
//等待空闲worker (任务多的时候会阻塞这里)
jobChannel := <-d.WorkerPool
fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
// 将任务放到上述woker的私有任务channal中
jobChannel <- job
fmt.Println("jobChannel <- job")
}(job)
}
}
}
func main() {
JobQueue = make(chan Job, 10)
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
time.Sleep(1 * time.Second)
go addQueue()
time.Sleep(1000 * time.Second)
}
func addQueue() {
for i := 0; i < 20; i++ {
// 新建一个任务
payLoad := Payload{Num: 1}
work := Job{Payload: payLoad}
// 任务放入任务队列channal
JobQueue <- work
fmt.Println("JobQueue <- work")
time.Sleep(1 * time.Second)
}
}
/*
一个任务的执行过程如下
JobQueue <- work 新任务入队
job := <-JobQueue: 调度中心收到任务
jobChannel := <-d.WorkerPool 从工作者池取到一个工作者
jobChannel <- job 任务给到工作者
job := <-w.JobChannel 工作者取出任务
{{1}} 执行任务
w.WorkerPool <- w.JobChannel 工作者在放回工作者池