作者:林冠宏 / 指尖下的幽灵
GitHub : https://github.com/af913337456/
目录
- 前序
- 一般的订单流程
- 思考瓶颈点
- 订单队列
- 第一种订单队列
- 第二种订单队列
- 总结
- 实现队列的选择
- 解答
- 实现队列的选择
- 第二种队列的 Go 版本例子代码
前序
目前的开发工作主要是将
传统电商应用
和区块链技术
相结合,区块链平台依然是以太坊
,此外地,这几天由我编写,经清华大学出版社出版的书籍,历经八月,终于出版上架了,名称是:《区块链以太坊DApp开发实战》
,现已可以网购。
本文所要分享的思路就是电商应用中常用的
订单队列
。
一般的订单流程
电商应用中,简单直观的用户从下单到付款,最终完成整个流程的步骤可以用下图表示:
其中,订单信息持久化
,就是存储数据到数据库中。而最终客户端完成支付后的更新订单状态
的操作是由第三方支付平台进行回调设置好的回调链接 NotifyUrl
,来进行的。
补全订单状态的更新流程,如下图表示:
思考瓶颈点
服务端的直接瓶颈点
,首先要考虑 TPS
。去除细分点,我们主要看订单信息持久化
瓶颈点。
在高并发业务场景中,例如 秒杀
、优惠价抢购
等。短时间内的下单请求数会很多,如果订单信息持久化
部分,不做优化,而是直接对数据库层进行频繁的
读写操作,数据库会承受不了,容易成为第一个垮掉的服务,比如下图的所示的常规写单流程:
可以看到,每
持久化一个订单信息,一般要经历网络连接操作(链接数据库),以及多个 I/O
操作。
得益于连接池
技术,我们可以在链接数据库的时候,不用每次都重新发起一次完整的HTTP请求,而可以直接从池中获取已打开了的连接句柄,而直接使用,这点和线程池的原理差不多。
此外,我们还可以在上面的流程中加入更多的优化,例如对于一些需要读取的信息,可以事先存置到内存缓存层,并加于更新维护,这样在使用的时候,可以快速读取。
即使我们都具备了上述的一些优化手段,但是对于写操作
的I/O
阻塞耗时,在高并发请求
的时候,依然容易导致数据库承受不住,容易出现链接多开异常
,操作超时
等问题。
在该层进行优化的操作,除了上面谈到的之外,还有下面一些手段:
- 数据库集群,采用读写分离,减少写时压力
- 分库,不同业务的表放到不同的数据库,会引入分布式事务问题
- 采用队列模型削峰
每种方式有各自的特点,因为本文谈的是订单队列
的架构思想,所以下面我们来看下如何在订单系统中引入订单队列。
订单队列
网上有不少文章谈到订单队列的做法,大部分都漏了说明请求与响应的一致性问题。
第一种订单队列
流程图:
上图是大多文章提到的队列模型,有两个没有解析的问题:
- 如果订单存在第三方支付情况,① 和 ② 的一致性如何保证,比如其中一处处理失败;
- 如果订单存在第三方支付情况,① 完成了支付,且三方支付平台回调了
notifyUrl
,而此时 ② 还在排队等待处理,这种情况又如何处理。
首先,要肯定的是,上面的订单流程图是没有问题的。它有下面的优缺点,所提到的两个问题也是有解决方案的。
优点:
- 用户无需等待订单持久化处理,而能直接获得响应,实现快速下单
- 持久化处理,采用排队的先来先处理,不会像上面谈到的高并发请求一起冲击数据库层面的情况。
- 可变性强,
搭配中间件
的组合性强。
缺点:
- 多订单入队时,② 步骤的处理速度跟不上。从而导致第二点问题。
- 实现较复杂
上面谈及的问题点,我后面都会给出解决方案。下面我们来看下另外一种订单队列流程图。
第二种订单队列
流程图:
第二种订单队列的设计模型,注意它的同步等待
持久化处理的结果,解决了持久化与响应的一致性问题,但是有个严重的耗时等待问题,它的优缺点如下:
优点:
- 持久化与响应的强一致性。
- 持久化处理,采用排队的先来先处理,不会像上面谈到的高并发请求一起冲击数据库层面的情况。
- 实现简单
缺点:
- 多订单入队时,持久化单元处理速度跟不上,造成客户端同步等待响应。
这类订单队列,我下面会放出 Golang
实现的版本代码。
总结
对比上面两种常见的订单模型,如果从用户体验的角度
去优先考虑,第一种不需要用户等待持久化处理
结果的是明显优于第二种的。如果技术团队完善,且技术过硬,也应该考虑第一种的实现方式。
如果仅仅想要达到宁愿用户等待到超时
也不愿意存储层服务被冲垮,那么有限考虑第二种。
实现队列的选择
在这里,我们进一步细分一下,实现队列模块的功能有哪些选择。
相信很多后端开发经验比较老道的同志已经想到了,使用现有的中间件,比如知名的 Redis
、RocketMQ
,以及 Kafka
等,它们都是一种选择。
此外地,我们还可以直接编写代码,在当前的服务系统中实现一个消息队列来达到目的,下面我用图来分类下队列类型。
不同的队列实现方式,能直接导致不同的功能,也有不同的优缺点:
一级缓存优点:
- 一级缓存,最快。无需链接,直接从内存层获取;
- 如果不考虑持久化和集群,那么它实现简单。
一级缓存缺点:
- 如果考虑持久化和集群,那么它实现比较复杂。
- 不考虑持久化情况下,如果服务器断电或其它原因导致服务中断,那么排队中的订单信息将丢失
中间件的优点:
- 软件成熟,一般出名的消息中间件都是经过实践使用的,文档丰富;
- 支持多种持久化的策略,比如 Redis 有
增量
持久化,能最大程度减少因不可预料的崩溃导致订单信息丢失; - 支持集群,主从同步,这对于分布式系统来说,是必不可少的要求。
中间件的缺点:
- 分布式部署时,需要建立链接通讯,导致读写操作需要走网络通讯。
解答
回到第一种订单模型中:
问题1:
如果订单存在第三方支付情况,① 和 ② 的一致性如何保证?
首先我们看下,不一致性的时候,会产生什么结果:
- ① 失败,用户因为网络原因或返回其它页面,不能获取结果。而 ② 成功,那么最终该订单的状态是待支付。用户进入到个人订单中心完成订单支付即可;
- ① 和 ② 都失败,那么下单失败;
- ① 成功,② 失败,此时用户在
响应页面
完成了支付动作,用户查看订单信息为空白。
上述的情况,明显地,只有 3 是需要恢复订单信息的,应对的方案有:
- 当服务端支付回调接口被第三方支付平台访问时,无法找到对应的订单信息。那么先将这类支付了却没订单信息的数据存储起来先,比如存储到
表A
。同时启动一个定时任务B
专门遍历表A,然后去订单列表寻找是否已经有了对应的订单信息,有则更新,没则继续,或跟随制定的检测策略走。 - 当 ② 是由于服务端的
非崩溃性原因
而导致失败时:- 失败的时候同时将原始订单数据重新插入到
队列头部
,等待下一次的重新持久化处理。
- 失败的时候同时将原始订单数据重新插入到
- 当 ② 因服务端的
崩溃性
原因而导致失败时:定时任务B
在进行了多次检测无果后,那么根据第三方支付平台在回调时候传递过来的订单附属信息
对订单进行恢复。
- 整个过程订单恢复的过程,用户查看订单信息为空白。
定时任务B
所在服务最好
和回调链接notifyUrl
所在的接口服务一致,这样能保证当 B 挂掉的时候,回调服务也跟随挂掉,然后第三方支付平台在调用回调失败的情况下,他们会有重试逻辑
,依赖这个,在回调服务重启时,可以完成订单信息恢复。
问题2:
如果订单存在第三方支付情况,① 完成了支付,且三方支付平台回调了 notifyUrl,而此时 ② 还在排队等待处理,这种情况又如何处理?
应对的方案参考 问题1
的 定时任务B
检测修改机制。
第二种队列的 Go 版本例子代码
定义一些常量
const (
QueueOrderKey = "order_queue"
QueueBufferSize = 1024 // 请求队列大小
QueueHandleTime = time.Second * 7 // 单个 mission 超时时间
)
定义出入队接口,方便多种实现
// 定义出入队接口,方便多种实现
type IQueue interface {
Push(key string,data []byte) error
Pop(key string) ([]byte,error)
}
定义请求与响应实体
// 定义请求与响应实体
type QueueTimeoutResp struct {
Timeout bool // 超时标志位
Response chan interface{}
}
type QueueRequest struct {
ReqId string `json:"req_id"` // 单次请求 id
Order *model.OrderCombine `json:"order"` // 订单信息 bean
AccessTime int64 `json:"access_time"` // 请求时间
ResponseChan *QueueTimeoutResp `json:"-"`
}
定义队列实体
// 定义队列实体
type Queue struct {
mapLock sync.Mutex
RequestChan chan *QueueRequest // 缓存管道,装载请求
RequestMap map[string]*QueueTimeoutResp
Queue IQueue
}
实例化队列,接收接口参数
// 实例化队列,接收接口参数
func NewQueue(queue IQueue) *Queue {
return &Queue{
mapLock: sync.Mutex{},
RequestChan: make(chan *QueueRequest, QueueBufferSize),
RequestMap: make(map[string]*QueueTimeoutResp, QueueBufferSize),
Queue: queue,
}
}
接收请求
// 接收请求
func (q *Queue) AcceptRequest(req *QueueRequest) interface{} {
if req.ResponseChan == nil {
req.ResponseChan = &QueueTimeoutResp{
Timeout: false,
Response: make(chan interface{},1),
}
}
userKey := key(req) // 唯一 key 生成函数
req.ReqId = userKey
q.mapLock.Lock()
q.RequestMap[userKey] = req.ResponseChan // 内存层存储对应的 req 的 resp 管道指针
q.mapLock.Unlock()
q.RequestChan <- req // 接收请求
log("userKey : ", userKey)
ticker := time.NewTicker(QueueHandleTime) // 以超时时间 QueueHandleTime 启动一个定时器
defer func() {
ticker.Stop() // 释放定时器
q.mapLock.Lock()
delete(q.RequestMap,userKey) // 当处理完一个 req,从 map 中移出
q.mapLock.Unlock()
}()
select {
case <-ticker.C: // 超时
req.ResponseChan.Timeout = true
Queue_TimeoutCounter++ // 辅助计数,int 类型
log("timeout: ",userKey)
return lghError.HandleTimeOut // 返回超时错误的信息
case result := <-req.ResponseChan.Response: // req 被完整处理
return result
}
}
从请求管道中取出 req 放入到队列容器中,该函数在 gorutine
中运行
// 从请求管道中取出 req 放入到队列容器中,该函数在 gorutine 中运行
func (q *Queue) addToQueue() {
for {
req := <-q.RequestChan // 取出一个 req
data, err := json.Marshal(req)
if err != nil {
log("redis queue parse req failed : ", err.Error())
continue
}
if err = q.Queue.Push(QueueOrderKey, data);err != nil { // push 入队,这里有时间消耗
log("lpush req failed. Error : ", err.Error())
continue
}
log("lpush req success. req time: ", req.AccessTime)
}
}
取出 req 处理,该函数在 gorutine
中运行
// 取出 req 处理,该函数在 gorutine 中运行
func (q *Queue) readFromQueue() {
for {
data, err := q.Queue.Pop(QueueOrderKey) // pop 出队,这里也有时间消耗
if err != nil {
log("lpop failed. Error : ", err.Error())
continue
}
if data == nil || len(data) == 0 {
time.Sleep(time.Millisecond * 100) // 空数据的 req,停顿下再取
continue
}
req := &QueueRequest{}
if err = json.Unmarshal(data, req);err != nil {
log("Lpop: json.Unmarshal failed. Error : ", err.Error())
continue
}
userKey := req.ReqId
q.mapLock.Lock()
resultChan, ok := q.RequestMap[userKey] // 取出对应的 resp 管道指针
q.mapLock.Unlock()
if !ok {
// 中间件重启时,比如 redis 重启而读取旧 key,会进入这里
Queue_KeyNotFound ++ // 计数 int 类型
log("key not found, rollback: ", userKey)
continue
}
simulationTimeOutReq4(req) // 模拟出来任务的函数,入参为 req
if resultChan.Timeout {
// 处理期间,已经超时,这里做可以拓展回滚操作
Queue_MissionTimeout ++
log("handle mission timeout: ", userKey)
continue
}
log("request result send to chan succeee, userKey : ", userKey)
ret := util.GetCommonSuccess(req.AccessTime)
resultChan.Response <- &ret // 输入处理成功
}
}
启动
func (q *Queue) Start() {
go q.addToQueue()
go q.readFromQueue()
}
运行例子
func test(){
...
runtime.GOMAXPROCS(4)
redisQueue := NewQueue(NewFastCacheQueue())
redisQueue.Start()
reqNumber := testReqNumber
wg := sync.WaitGroup{}
wg.Add(reqNumber)
for i :=0;i<reqNumber;i++ {
go func(index int) {
combine := model.OrderCombine{}
ret := AcceptRequest(&QueueRequest{
UserId: int64(index),
Order: &combine,
AccessTime: time.Now().Unix(),
ResponseChan: nil,
})
fmt.Println("ret: ------------- ",ret.String())
wg.Done()
}(i)
}
wg.Wait()
time.Sleep(3*time.Second)
fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout)
}