Stories

Detail Return Return

訂單支付後庫存不扣減,如何用RabbitMQ來優化? - Stories Detail

上週在Review學員代碼的時候,我們發現了一個很基礎但很重要的問題:支付回調流程中缺少了庫存扣減環節。這類問題雖然基礎,但如果直接進入生產環境,可能導致庫存的數據和實際銷售的情況不一致,出現超賣的情況。能夠及時發現這種問題,這就是Review代碼的重要性。

先看這段有問題的代碼:

// 原來的支付回調邏輯(問題代碼)
func PaymentCallback(ctx context.Context, orderID uint32) error {
    // 只更新訂單狀態為已支付
    _, err := dao.OrderInfo.Ctx(ctx).Where("id=?", orderID).
        Data(g.Map{"status": consts.OrderStatusPaid}).Update()
    if err != nil {
        return err
    }
    
    // 缺少庫存扣減邏輯!商品庫存還是原樣
    return nil
}

這個問題的核心在於流程設計的不完整,用户支付成功後只是更新了訂單狀態,卻沒有同步調整商品庫存,可能導致其他用户購買時看到的庫存數據不正確。

想要解決這個問題,需要補充缺失的邏輯,更要考慮分佈式系統下的流程合理性,這裏我們選擇引入RabbitMQ實現事件驅動架構,既能解決當前問題,也能方便後續的業務擴展。

問題分析

業務邏輯理解不正確

原邏輯對訂單流程的理解是"創建訂單→支付成功→完成交易",但正確的流程應該要包含庫存相關的環節:

創建訂單→預扣庫存→支付成功→確認交易→後續處理

不同服務之間的協作

在微服務架構中:

  • 訂單服務負責訂單狀態流轉
  • 商品服務負責庫存數據維護

兩個服務需要通過規範的協作機制保證數據一致性,而不是簡單的同步調用。

解決方案

我們重新設計了包含庫存管理的訂單流程,通過RabbitMQ實現服務間的解耦通信:

創建訂單時預扣庫存

將庫存扣減提前到訂單創建的階段,通過數據庫事務保證操作的原子性:

// app/goods/internal/logic/goods_info/goods_info.go
func ReduceStock(ctx context.Context, req *rabbitmq.OrderCreatedEvent) error {
    // 使用數據庫事務確保原子性
    err := g.DB().Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
        for _, goods := range req.GoodsInfo {
            // 1. 查詢當前庫存
            var goodsInfo entity.GoodsInfo
            if err := dao.GoodsInfo.Ctx(ctx).TX(tx).
                Where("id = ?", goods.GoodsId).
                Fields("stock").
                Scan(&goodsInfo); err != nil {
                return gerror.Wrapf(err, "查詢商品{%d}庫存失敗", goods.GoodsId)
            }

            // 2. 判斷庫存是否足夠
            if goodsInfo.Stock < goods.Count {
                return gerror.Newf("商品{%d}庫存不足(當前:%d, 需要:%d)", 
                    goods.GoodsId, goodsInfo.Stock, goods.Count)
            }

            // 3. 扣減庫存
            newStock := goodsInfo.Stock - goods.Count
            g.Log().Infof(ctx, "商品{%d}新庫存:%d", goods.GoodsId, newStock)
            if _, err := dao.GoodsInfo.Ctx(ctx).TX(tx).
                Where("id = ?", goods.GoodsId).
                Data(g.Map{"stock": newStock}).
                Update(); err != nil {
                return gerror.Wrapf(err, "更新商品{%d}庫存失敗", goods.GoodsId)
            }
        }
        return nil
    })
    
    return err
}

設計思路

  • 提前鎖定庫存,避免支付過程中商品被重複購買
  • 事務保證庫存檢查與扣減的原子性,防止併發問題
  • 庫存不足時直接阻斷訂單創建,提升用户體驗

支付成功後的確認處理

支付完成後,通過事件通知觸發後續清理工作:

// 支付回調邏輯
func PaymentCallback(ctx context.Context, orderID uint32) error {
    // 1. 更新訂單狀態
    _, err := dao.OrderInfo.Ctx(ctx).Where("id=?", orderID).
        Data(g.Map{"status": consts.OrderStatusPaid}).Update()
    if err != nil {
        return err
    }
    
    // 2. 獲取訂單詳情(包含商品信息)
    orderDetail, err := GetOrderDetail(ctx, orderID)
    if err != nil {
        return err
    }
    
    // 3. 發佈庫存確認事件(這裏庫存已在創建訂單時預扣)
    // 主要是清理緩存等後續操作
    go func() {
        // 異步清理商品緩存
        if err := goodsRedis.DeleteKeys(context.Background(), orderDetail.GoodsIDs); err != nil {
            g.Log().Errorf(ctx, "清理商品緩存失敗: %v", err)
        }
    }()
    
    return nil
}

訂單超時的庫存返還機制

為避免用户下單後未支付導致庫存長時間鎖定,設計超時返還邏輯:

// app/order/utility/consumer/order_timeout_consumer.go
func (c *OrderTimeoutConsumer) HandleMessage(ctx context.Context, msg amqp.Delivery) error {
    // 解析訂單超時事件
    var event rabbitmq.OrderTimeoutEvent
    err := rabbitmq.UnmarshalEvent(msg.Body, &event)
    if err != nil {
        return err
    }
    
    // 判斷是否真正超時(30分鐘未支付)
    eventTime, _ := time.Parse(time.RFC3339, event.TimeStamp)
    if time.Now().After(eventTime.Add(30 * time.Minute)) {
        // 處理訂單超時
        err = order_info.HandleOrderTimeoutResult(ctx, event.OrderId)
        if err != nil {
            return err
        }
        
        // 發佈庫存返還事件
        eventReq, err := order_info.GetOrderDetail(ctx, event.OrderId)
        if err == nil {
            go rabbitmq.PublishReturnStockEvent(event.OrderId, eventReq)
        }
    }
    
    return nil
}

庫存返還的具體實現

通過併發處理提升庫存返還效率:

// app/goods/internal/logic/goods_info/goods_info.go
func ReturnStock(ctx context.Context, req *rabbitmq.OrderStockReturnEvent) ([]*rabbitmq.OrderGoodsInfo, error) {
    // 使用goroutine併發處理每個商品
    resultChan := make(chan *rabbitmq.OrderGoodsInfo, len(req.GoodsInfo))
    var wg sync.WaitGroup
    wg.Add(len(req.GoodsInfo))

    for _, stockInfo := range req.GoodsInfo {
        go func(stockInfo *rabbitmq.OrderGoodsInfo) {
            defer wg.Done()
            
            defer func() {
                if r := recover(); r != nil {
                    g.Log().Errorf(ctx, "庫存返還panic: %v", r)
                }
            }()

            // 查詢當前庫存
            var goodsInfo entity.GoodsInfo
            err := dao.GoodsInfo.Ctx(ctx).Where("id=?", stockInfo.GoodsId).
                Fields("stock").Scan(&goodsInfo)
            if err != nil {
                resultChan <- &rabbitmq.OrderGoodsInfo{
                    GoodsId: stockInfo.GoodsId,
                    Count:   stockInfo.Count,
                }
                return
            }

            // 返還庫存
            newStock := goodsInfo.Stock + stockInfo.Count
            _, err = dao.GoodsInfo.Ctx(ctx).Where("id=?", stockInfo.GoodsId).
                Data(g.Map{"stock": newStock}).Update()
            if err != nil {
                resultChan <- &rabbitmq.OrderGoodsInfo{
                    GoodsId: stockInfo.GoodsId,
                    Count:   stockInfo.Count,
                }
                return
            }

            g.Log().Infof(ctx, "商品{%d}庫存返還成功,新庫存:%d", stockInfo.GoodsId, newStock)
        }(stockInfo)
    }

    wg.Wait()
    close(resultChan)
    
    // 收集處理結果
    var resultArr []*rabbitmq.OrderGoodsInfo
    for res := range resultChan {
        resultArr = append(resultArr, res)
    }
    
    return resultArr, nil
}

消息隊列的事件驅動架構

定義核心事件實現服務解耦:

// 事件定義
type OrderCreatedEvent struct {
    OrderId   uint32             `json:"order_id"`
    GoodsInfo []*OrderGoodsInfo `json:"goods_info"`
}

type OrderStockReturnEvent struct {
    OrderId   uint32             `json:"order_id"`
    GoodsInfo []*OrderGoodsInfo `json:"goods_info"`
}

事件流設計

用户下單→OrderCreated事件→商品服務扣減庫存
    ↓
支付超時→OrderTimeout事件→商品服務返還庫存
    ↓
支付成功→訂單狀態更新+緩存清理

技術難點與解決方案

難點1:分佈式系統的數據一致性

問題:訂單與庫存數據分屬不同服務,如何保證操作協同?

解決方案

  • 採用最終一致性模型,通過事件重試確保數據對齊
  • 每個事件處理都設計冪等性,避免重複執行導致錯誤

難點2:高併發下的庫存準確性

問題:多用户同時購買時如何防止庫存數據混亂?

解決方案

// 數據庫事務+行級鎖保證併發安全
err := g.DB().Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
    // 事務內查詢自動加行鎖,阻止併發修改
    var goodsInfo entity.GoodsInfo
    dao.GoodsInfo.Ctx(ctx).TX(tx).Where("id=?", goodsId).Scan(&goodsInfo)
    
    // 檢查並更新庫存
    if goodsInfo.Stock >= count {
        dao.GoodsInfo.Ctx(ctx).TX(tx).Data(g.Map{"stock": goodsInfo.Stock - count}).Update()
    }
    return nil
})

難點3:系統性能與用户體驗平衡

問題:庫存操作頻繁,如何避免影響響應速度?

解決方案

  • 核心流程同步處理,確保用户體驗
  • 非核心操作(如緩存清理)異步化,不阻塞主流程
  • 批量操作使用併發處理提升效率

結語

很多時候一些嚴重的錯誤往往出現在一些小細節上面。通過這次庫存相關的優化案例可以發現:看似簡單的業務流程,在分佈式架構下需要考慮服務協作、併發控制、異常處理等等多個方面的因素

通過引入RabbitMQ,不僅解決了已經存在的庫存同步問題,更讓整個系統具備了更好的擴展性,比如未來要新增物流通知、積分等功能的時候,只需新增事件的消費者就ok了,不需要再去修改現有的核心代碼。

本文基於真實的GoFrame微服務電商項目,所有代碼都經過生產環境驗證,這裏是項目的介紹:(https://mp.weixin.qq.com/s/ACzEHtvGh2YsU_4fxo83fQ)。如果你也遇到類似問題,歡迎交流討論!

user avatar mianlengxincidehongjiu Avatar wuliaodechaye Avatar javaedge Avatar vistart Avatar
Favorites 4 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.