Stories

Detail Return Return

FastAPI秒殺庫存總變負數?Redis分佈式鎖能幫你守住底線嗎 - Stories Detail


url: /posts/65ce343cc5df9faf3a8e2eeaab42ae45/
title: FastAPI秒殺庫存總變負數?Redis分佈式鎖能幫你守住底線嗎
date: 2025-09-17T03:43:34+08:00
lastmod: 2025-09-17T03:43:34+08:00
author: cmdragon

summary:
分佈式鎖在FastAPI中用於解決多實例併發訪問共享資源時的數據一致性問題。其核心原理包括互斥性、安全性、可用性和容錯性,常用Redis分佈式鎖實現。Redlock算法通過多節點投票確保鎖的可靠性。FastAPI中通過aioredis實現異步分佈式鎖,支持鎖的獲取、釋放和續約。測試策略覆蓋單實例、多實例併發及鎖超時等場景,確保鎖的正確性和穩定性。

categories:

  • fastapi

tags:

  • FastAPI
  • 分佈式鎖
  • Redis
  • 異步編程
  • Redlock算法
  • 併發控制
  • 測試策略

<img src="https://api2.cmdragon.cn/upload/cmder/20250304_012821924.jpg" title="cmdragon_cn.png" alt="cmdragon_cn.png"/>

掃描二維碼關注或者微信搜一搜:編程智域 前端至全棧交流與成長

發現1000+提升效率與開發的AI工具和實用程序:https://tools.cmdragon.cn/

一、分佈式鎖在FastAPI中的作用與原理

1.1 為什麼需要分佈式鎖?

想象一個場景:你做了個FastAPI秒殺接口,商品庫存只有1件。如果同時有100個請求打進來,單實例FastAPI能用asyncio.Lock(本地鎖)保證同一時間只有一個請求處理庫存。但如果部署了3個FastAPI實例(多進程/多機器),本地鎖就失效了——每個實例都有自己的鎖,100個請求會同時衝進3個實例,導致庫存變成-99,徹底亂套。

分佈式鎖的本質:給跨進程、跨機器的資源競爭“上全局鎖”,不管多少個FastAPI實例,同一時間只有一個請求能拿到鎖,確保數據一致。

1.2 分佈式鎖的核心原理

分佈式鎖要滿足4個核心要求:

  • 互斥性:同一時間只有一個請求能拿到鎖;
  • 安全性:不能讓A的鎖被B釋放;
  • 可用性:Redis掛了一個節點,還能正常用;
  • 容錯性:持有鎖的進程崩潰,鎖要能自動釋放。

FastAPI裏最常用的是Redis分佈式鎖(輕量、性能高),底層用Redlock算法(解決Redis單點故障問題)。測試環境可以簡化成單Redis節點,生產環境建議用3-5個節點。

1.3 Redlock算法簡化理解

Redlock是“多節點投票制”:

  1. 向5個Redis節點發“鎖請求”;
  2. 超過3個節點同意(半數以上),就算拿到鎖;
  3. 計算總耗時,如果比鎖超時時間短,鎖有效;
  4. 否則,把所有節點的鎖都刪了,重新來。

測試環境不用這麼複雜——先拿單Redis節點練手,生產再擴展。

二、FastAPI中分佈式鎖的實現

2.1 依賴準備與配置

首先裝依賴:

pip install fastapi==0.109 aioredis==2.0.1 pydantic==2.5.3 pytest-asyncio==0.23.2

pydantic寫個配置類(統一管理Redis連接參數):

# lock_config.py
from pydantic import BaseModel, Field

class RedisLockConfig(BaseModel):
    redis_url: str = Field(default="redis://localhost:6379", description="Redis連接地址")
    lock_prefix: str = Field(default="dist_lock:", description="鎖鍵前綴,避免key衝突")
    timeout: int = Field(default=10, description="鎖超時時間(秒),防止死鎖")
    renew_interval: int = Field(default=3, description="鎖續約間隔(秒),防止業務超時")

2.2 異步分佈式鎖實現(aioredis)

因為FastAPI是異步的,必須用aioredis(異步Redis客户端)。寫個RedisDistributedLock類,封裝鎖的獲取、釋放、續約:

# distributed_lock.py
from aioredis import Redis, RedisError
from pydantic import BaseModel
import uuid
import asyncio

class RedisDistributedLock:
    def __init__(self, config: RedisLockConfig):
        self.config = config
        self.redis: Redis | None = None  # Redis客户端實例
        self.lock_key: str | None = None  # 當前鎖的key
        self.lock_value: str | None = None  # 唯一標識(防誤刪別人的鎖)
        self.renew_task: asyncio.Task | None = None  # 鎖續約任務

    # 異步上下文管理器:自動連接/斷開Redis
    async def __aenter__(self) -> "RedisDistributedLock":
        await self._connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.release()
        await self._disconnect()

    # 連接Redis
    async def _connect(self):
        if not self.redis:
            self.redis = await Redis.from_url(self.config.redis_url)

    # 斷開Redis連接
    async def _disconnect(self):
        if self.redis:
            await self.redis.close()
            await self.redis.wait_closed()
            self.redis = None

    # 獲取鎖:原子操作(SETNX + EX)
    async def acquire(self, lock_name: str) -> bool:
        self.lock_key = f"{self.config.lock_prefix}{lock_name}"
        self.lock_value = str(uuid.uuid4())  # 生成唯一值,防誤刪

        try:
            # SET key value NX(不存在才設置) EX(過期時間)
            success = await self.redis.set(
                self.lock_key, self.lock_value,
                nx=True,
                ex=self.config.timeout
            )
        except RedisError as e:
            print(f"獲取鎖失敗: {e}")
            return False

        if success:
            # 啓動鎖續約任務(防止業務超時)
            self.renew_task = asyncio.create_task(self._renew_lock())
            return True
        return False

    # 鎖續約:用Lua腳本原子驗證+續期
    async def _renew_lock(self):
        while self.lock_key and self.lock_value:
            try:
                # Lua腳本:如果鎖是自己的,就續期
                script = """
                    if redis.call('GET', KEYS[1]) == ARGV[1] then
                        return redis.call('EXPIRE', KEYS[1], ARGV[2])
                    else
                        return 0
                    end
                """
                # 執行腳本:KEYS是鎖key,ARGV是鎖值+超時時間
                result = await self.redis.eval(
                    script,
                    keys=[self.lock_key],
                    args=[self.lock_value, self.config.timeout]
                )
                if result == 0:  # 續約失敗(鎖不是自己的)
                    break
            except Exception as e:
                print(f"續約失敗: {e}")
                break
            await asyncio.sleep(self.config.renew_interval)  # 每隔3秒續一次

    # 釋放鎖:用Lua腳本原子驗證+刪除
    async def release(self):
        # 先取消續約任務
        if self.renew_task:
            self.renew_task.cancel()
            try:
                await self.renew_task
            except asyncio.CancelledError:
                pass

        if self.lock_key and self.lock_value and self.redis:
            try:
                # Lua腳本:只有鎖是自己的,才刪除
                script = """
                    if redis.call('GET', KEYS[1]) == ARGV[1] then
                        return redis.call('DEL', KEYS[1])
                    else
                        return 0
                    end
                """
                await self.redis.eval(
                    script,
                    keys=[self.lock_key],
                    args=[self.lock_value]
                )
            except RedisError as e:
                print(f"釋放鎖失敗: {e}")

        # 重置狀態
        self.lock_key = None
        self.lock_value = None

2.3 FastAPI路由中使用鎖(依賴注入)

把鎖封裝成依賴,方便路由調用:

# main.py
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from lock_config import RedisLockConfig
from distributed_lock import RedisDistributedLock
import asyncio

app = FastAPI()

# 1. 配置依賴(讀取Redis連接參數)
async def get_lock_config() -> RedisLockConfig:
    return RedisLockConfig()  # 實際項目可以從環境變量讀,比如os.getenv("REDIS_URL")

# 2. 鎖依賴:用異步生成器管理生命週期
async def get_distributed_lock(
    config: RedisLockConfig = Depends(get_lock_config)
) -> RedisDistributedLock:
    async with RedisDistributedLock(config) as lock:
        yield lock

# 模擬庫存(實際用數據庫)
fake_inventory = {"iphone15": 1}

# 3. 秒殺接口(用鎖保護庫存扣減)
@app.post("/seckill/{product_id}")
async def seckill(
    product_id: str,
    lock: RedisDistributedLock = Depends(get_distributed_lock)
):
    # 先拿鎖,拿不到返回429(請求過多)
    if not await lock.acquire(lock_name=product_id):
        raise HTTPException(status_code=429, detail="搶的人太多啦,再試一次~")

    try:
        # 業務邏輯:扣減庫存
        if fake_inventory.get(product_id, 0) <= 0:
            raise HTTPException(status_code=400, detail="手慢了,商品已售罄!")
        fake_inventory[product_id] -= 1
        return {"msg": "秒殺成功!", "剩餘庫存": fake_inventory[product_id]}
    finally:
        # 不管成功失敗,都釋放鎖(重要!)
        await lock.release()

三、分佈式鎖的測試策略與用例設計

3.1 要測什麼?

分佈式鎖的測試要覆蓋正常場景異常場景

  1. 單實例併發:同一FastAPI實例下,多個請求搶鎖;
  2. 多實例併發:啓動多個FastAPI實例(比如用uvicorn main:app --port 8000--port 8001),用Postman批量發請求;
  3. 鎖超時:持有鎖的進程超時,鎖自動釋放;
  4. 異常崩潰:持有鎖的進程突然死掉,鎖是否自動釋放;
  5. 鎖續約:業務邏輯超時,續約是否成功。

3.2 異步測試用例(pytest-asyncio)

pytest-asyncio寫異步測試,示例:

# test_seckill.py
import pytest
from httpx import AsyncClient
from main import app, fake_inventory
import asyncio

# 1. 測試客户端 fixture
@pytest.fixture(scope="module")
async def client():
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

# 2. 重置庫存 fixture(每個測試前重置)
@pytest.fixture(autouse=True)
def reset_inv():
    fake_inventory["iphone15"] = 1
    yield

# 3. 測試1:單請求秒殺成功
@pytest.mark.asyncio
async def test_seckill_success(client: AsyncClient):
    resp = await client.post("/seckill/iphone15")
    assert resp.status_code == 200
    assert resp.json() == {"msg": "秒殺成功!", "剩餘庫存": 0}

# 4. 測試2:併發請求,只有1個成功
@pytest.mark.asyncio
async def test_seckill_concurrent(client: AsyncClient):
    # 定義併發請求函數
    async def send_req():
        resp = await client.post("/seckill/iphone15")
        return resp.status_code, resp.json()

    # 發5個併發請求
    tasks = [send_req() for _ in range(5)]
    results = await asyncio.gather(*tasks)

    # 統計結果:1個200(成功),4個429/400(失敗)
    success = sum(1 for status, _ in results if status == 200)
    assert success == 1

# 5. 測試3:鎖超時後釋放
@pytest.mark.asyncio
async def test_lock_timeout(client: AsyncClient):
    fake_inventory["iphone15"] = 2  # 庫存改成2,方便測試

    # 模擬一個持有鎖超時的進程
    async def hold_lock():
        async with RedisDistributedLock(RedisLockConfig(timeout=2)) as lock:
            await lock.acquire("iphone15")
            await asyncio.sleep(3)  # 超過鎖超時時間(2秒)

    # 先啓動hold_lock,1秒後發秒殺請求
    task = asyncio.create_task(hold_lock())
    await asyncio.sleep(1)
    resp = await client.post("/seckill/iphone15")
    await task

    # 驗證:鎖超時釋放,請求成功
    assert resp.status_code == 200
    assert fake_inventory["iphone15"] == 1

運行測試:

pytest test_seckill.py -v

四、課後Quiz:鞏固知識

問題1:為什麼FastAPI異步應用要用aioredis而不是redis-py?

答案解析
redis-py是同步庫,會阻塞FastAPI的事件循環(相當於“堵住了水管”),導致所有請求變慢。而aioredis是異步庫,能和FastAPI的異步機制完美配合,不會阻塞,性能更高。

問題2:鎖的“超時時間”設太短或太長有什麼問題?

答案解析

  • 設太短:如果業務邏輯沒處理完,鎖就自動釋放了,其他請求會拿到鎖,導致數據衝突(比如庫存變成負數);
  • 設太長:如果持有鎖的進程崩潰,鎖要等很久才釋放,其他請求一直拿不到鎖,導致假死鎖(系統像“卡住了”)。

五、常見報錯與解決

報錯1:aioredis.exceptions.ConnectionClosedError

原因:Redis沒啓動,或者連接URL錯了(比如端口不是6379)。
解決

  1. 檢查Redis是否在運行:redis-cli ping(返回PONG就對了);
  2. 驗證RedisLockConfig裏的redis_url是否正確(比如redis://localhost:6379);
  3. 增加連接超時時間:Redis.from_url(redis_url, timeout=10)

報錯2:HTTP 429 Too Many Requests

原因:併發請求太多,鎖被佔了。
解決

  1. 優化業務邏輯,縮短鎖的持有時間(比如把非核心邏輯移到鎖外面);
  2. 用隊列限流(比如Redis隊列,把請求排成隊,一個一個處理);
  3. 返回友好提示(比如“再試一次”)。

報錯3:鎖釋放失敗(Lua腳本返回0)

原因:鎖已經被其他進程釋放了(比如超時),或者鎖值不對。
解決

  1. 確保release方法在finally塊裏(不管成功失敗都釋放);
  2. 檢查鎖的timeout設置,不要太短;
  3. 用唯一鎖值(uuid),避免釋放別人的鎖。

六、實戰運行步驟

  1. 啓動Redisredis-server(Windows用redis-server.exe);
  2. 啓動FastAPIuvicorn main:app --reload
  3. 測試接口:用Postman發POST http://localhost:8000/seckill/iphone15,看返回結果;
  4. 運行測試pytest test_seckill.py -v

餘下文章內容請點擊跳轉至 個人博客頁面 或者 掃碼關注或者微信搜一搜:編程智域 前端至全棧交流與成長,閲讀完整的文章:FastAPI秒殺庫存總變負數?Redis分佈式鎖能幫你守住底線嗎

<details>
<summary>往期文章歸檔</summary>

  • FastAPI的CI流水線怎麼自動測端點,還能讓Allure報告美到犯規? - cmdragon's Blog
  • 如何用GitHub Actions為FastAPI項目打造自動化測試流水線? - cmdragon's Blog
  • 如何用Git Hook和CI流水線為FastAPI項目保駕護航? - cmdragon's Blog
  • FastAPI如何用契約測試確保API的「菜單」與「菜品」一致?
  • 為什麼TDD能讓你的FastAPI開發飛起來? - cmdragon's Blog
  • 如何用FastAPI玩轉多模塊測試與異步任務,讓代碼不再“鬧脾氣”? - cmdragon's Blog
  • 如何在FastAPI中玩轉“時光倒流”的數據庫事務回滾測試?
  • 如何在FastAPI中優雅地模擬多模塊集成測試? - cmdragon's Blog
  • 多環境配置切換機制能否讓開發與生產無縫銜接? - cmdragon's Blog
  • 如何在 FastAPI 中巧妙覆蓋依賴注入並攔截第三方服務調用? - cmdragon's Blog
  • 為什麼你的單元測試需要Mock數據庫才能飛起來? - cmdragon's Blog
  • 如何在FastAPI中巧妙隔離依賴項,讓單元測試不再頭疼? - cmdragon's Blog
  • 如何在FastAPI中巧妙隔離依賴項,讓單元測試不再頭疼? - cmdragon's Blog
  • 測試覆蓋率不夠高?這些技巧讓你的FastAPI測試無懈可擊! - cmdragon's Blog
  • 為什麼你的FastAPI測試覆蓋率總是低得讓人想哭? - cmdragon's Blog
  • 如何讓FastAPI測試不再成為你的噩夢? - cmdragon's Blog
  • FastAPI測試環境配置的秘訣,你真的掌握了嗎? - cmdragon's Blog
  • 全鏈路追蹤如何讓FastAPI微服務架構的每個請求都無所遁形? - cmdragon's Blog
  • 如何在API高併發中玩轉資源隔離與限流策略? - cmdragon's Blog
  • 任務分片執行模式如何讓你的FastAPI性能飆升? - cmdragon's Blog
  • 冷熱任務分離:是提升Web性能的終極秘籍還是技術噱頭? - cmdragon's Blog
  • 如何讓FastAPI在百萬級任務處理中依然遊刃有餘? - cmdragon's Blog
  • 如何讓FastAPI與消息隊列的聯姻既甜蜜又可靠? - cmdragon's Blog
  • 如何在FastAPI中巧妙實現延遲隊列,讓任務乖乖等待? - cmdragon's Blog
  • FastAPI的死信隊列處理機制:為何你的消息系統需要它? - cmdragon's Blog
  • 如何讓FastAPI任務系統在失敗時自動告警並自我修復? - cmdragon's Blog
  • 如何用Prometheus和FastAPI打造任務監控的“火眼金睛”? - cmdragon's Blog
  • 如何用APScheduler和FastAPI打造永不宕機的分佈式定時任務系統? - cmdragon's Blog
  • 如何在 FastAPI 中玩轉 APScheduler,讓任務定時自動執行? - cmdragon's Blog
  • 定時任務系統如何讓你的Web應用自動完成那些煩人的重複工作? - cmdragon's Blog
  • Celery任務監控的魔法背後藏着什麼秘密? - cmdragon's Blog
  • 如何讓Celery任務像VIP客户一樣享受優先待遇? - cmdragon's Blog
  • 如何讓你的FastAPI Celery Worker在壓力下優雅起舞? - cmdragon's Blog
  • FastAPI與Celery的完美邂逅,如何讓異步任務飛起來? - cmdragon's Blog
  • FastAPI消息持久化與ACK機制:如何確保你的任務永不迷路? - cmdragon's Blog
  • FastAPI的BackgroundTasks如何玩轉生產者-消費者模式? - cmdragon's Blog
  • BackgroundTasks 還是 RabbitMQ?你的異步任務到底該選誰? - cmdragon's Blog

</details>

<details>
<summary>免費好用的熱門在線工具</summary>

  • 歌詞生成工具 - 應用商店 | By cmdragon
  • 網盤資源聚合搜索 - 應用商店 | By cmdragon
  • ASCII字符畫生成器 - 應用商店 | By cmdragon
  • JSON Web Tokens 工具 - 應用商店 | By cmdragon
  • Bcrypt 密碼工具 - 應用商店 | By cmdragon
  • GIF 合成器 - 應用商店 | By cmdragon
  • GIF 分解器 - 應用商店 | By cmdragon
  • 文本隱寫術 - 應用商店 | By cmdragon
  • CMDragon 在線工具 - 高級AI工具箱與開發者套件 | 免費好用的在線工具
  • 應用商店 - 發現1000+提升效率與開發的AI工具和實用程序 | 免費好用的在線工具
  • CMDragon 更新日誌 - 最新更新、功能與改進 | 免費好用的在線工具
  • 支持我們 - 成為贊助者 | 免費好用的在線工具
  • AI文本生成圖像 - 應用商店 | 免費好用的在線工具
  • 臨時郵箱 - 應用商店 | 免費好用的在線工具
  • 二維碼解析器 - 應用商店 | 免費好用的在線工具
  • 文本轉思維導圖 - 應用商店 | 免費好用的在線工具
  • 正則表達式可視化工具 - 應用商店 | 免費好用的在線工具
  • 文件隱寫工具 - 應用商店 | 免費好用的在線工具
  • IPTV 頻道探索器 - 應用商店 | 免費好用的在線工具
  • 快傳 - 應用商店 | 免費好用的在線工具
  • 隨機抽獎工具 - 應用商店 | 免費好用的在線工具
  • 動漫場景查找器 - 應用商店 | 免費好用的在線工具
  • 時間工具箱 - 應用商店 | 免費好用的在線工具
  • 網速測試 - 應用商店 | 免費好用的在線工具
  • AI 智能摳圖工具 - 應用商店 | 免費好用的在線工具
  • 背景替換工具 - 應用商店 | 免費好用的在線工具
  • 藝術二維碼生成器 - 應用商店 | 免費好用的在線工具
  • Open Graph 元標籤生成器 - 應用商店 | 免費好用的在線工具
  • 圖像對比工具 - 應用商店 | 免費好用的在線工具
  • 圖片壓縮專業版 - 應用商店 | 免費好用的在線工具
  • 密碼生成器 - 應用商店 | 免費好用的在線工具
  • SVG優化器 - 應用商店 | 免費好用的在線工具
  • 調色板生成器 - 應用商店 | 免費好用的在線工具
  • 在線節拍器 - 應用商店 | 免費好用的在線工具
  • IP歸屬地查詢 - 應用商店 | 免費好用的在線工具
  • CSS網格佈局生成器 - 應用商店 | 免費好用的在線工具
  • 郵箱驗證工具 - 應用商店 | 免費好用的在線工具
  • 書法練習字帖 - 應用商店 | 免費好用的在線工具
  • 金融計算器套件 - 應用商店 | 免費好用的在線工具
  • 中國親戚關係計算器 - 應用商店 | 免費好用的在線工具
  • Protocol Buffer 工具箱 - 應用商店 | 免費好用的在線工具
  • IP歸屬地查詢 - 應用商店 | 免費好用的在線工具
  • 圖片無損放大 - 應用商店 | 免費好用的在線工具
  • 文本比較工具 - 應用商店 | 免費好用的在線工具
  • IP批量查詢工具 - 應用商店 | 免費好用的在線工具
  • 域名查詢工具 - 應用商店 | 免費好用的在線工具
  • DNS工具箱 - 應用商店 | 免費好用的在線工具
  • 網站圖標生成器 - 應用商店 | 免費好用的在線工具
  • XML Sitemap

</details>

user avatar ivictor Avatar life2refuel Avatar pingcap Avatar icollection Avatar
Favorites 4 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.