url: /posts/65ce343cc5df9faf3a8e2eeaab42ae45/
title: FastAPI秒殺庫存總變負數?Redis分佈式鎖能幫你守住底線嗎
date: 2025-09-17T03:43:34+08:00
lastmod: 2025-09-17T03:43:34+08:00
author: cmdragon
summary:
分佈式鎖在FastAPI中用於解決多實例併發訪問共享資源時的數據一致性問題。其核心原理包括互斥性、安全性、可用性和容錯性,常用Redis分佈式鎖實現。Redlock算法通過多節點投票確保鎖的可靠性。FastAPI中通過aioredis實現異步分佈式鎖,支持鎖的獲取、釋放和續約。測試策略覆蓋單實例、多實例併發及鎖超時等場景,確保鎖的正確性和穩定性。
categories:
- fastapi
tags:
- FastAPI
- 分佈式鎖
- Redis
- 異步編程
- Redlock算法
- 併發控制
- 測試策略
<img src="https://api2.cmdragon.cn/upload/cmder/20250304_012821924.jpg" title="cmdragon_cn.png" alt="cmdragon_cn.png"/>
掃描二維碼關注或者微信搜一搜:編程智域 前端至全棧交流與成長
發現1000+提升效率與開發的AI工具和實用程序:https://tools.cmdragon.cn/
一、分佈式鎖在FastAPI中的作用與原理
1.1 為什麼需要分佈式鎖?
想象一個場景:你做了個FastAPI秒殺接口,商品庫存只有1件。如果同時有100個請求打進來,單實例FastAPI能用asyncio.Lock(本地鎖)保證同一時間只有一個請求處理庫存。但如果部署了3個FastAPI實例(多進程/多機器),本地鎖就失效了——每個實例都有自己的鎖,100個請求會同時衝進3個實例,導致庫存變成-99,徹底亂套。
分佈式鎖的本質:給跨進程、跨機器的資源競爭“上全局鎖”,不管多少個FastAPI實例,同一時間只有一個請求能拿到鎖,確保數據一致。
1.2 分佈式鎖的核心原理
分佈式鎖要滿足4個核心要求:
- 互斥性:同一時間只有一個請求能拿到鎖;
- 安全性:不能讓A的鎖被B釋放;
- 可用性:Redis掛了一個節點,還能正常用;
- 容錯性:持有鎖的進程崩潰,鎖要能自動釋放。
FastAPI裏最常用的是Redis分佈式鎖(輕量、性能高),底層用Redlock算法(解決Redis單點故障問題)。測試環境可以簡化成單Redis節點,生產環境建議用3-5個節點。
1.3 Redlock算法簡化理解
Redlock是“多節點投票制”:
- 向5個Redis節點發“鎖請求”;
- 超過3個節點同意(半數以上),就算拿到鎖;
- 計算總耗時,如果比鎖超時時間短,鎖有效;
- 否則,把所有節點的鎖都刪了,重新來。
測試環境不用這麼複雜——先拿單Redis節點練手,生產再擴展。
二、FastAPI中分佈式鎖的實現
2.1 依賴準備與配置
首先裝依賴:
pip install fastapi==0.109 aioredis==2.0.1 pydantic==2.5.3 pytest-asyncio==0.23.2
用pydantic寫個配置類(統一管理Redis連接參數):
# lock_config.py
from pydantic import BaseModel, Field
class RedisLockConfig(BaseModel):
redis_url: str = Field(default="redis://localhost:6379", description="Redis連接地址")
lock_prefix: str = Field(default="dist_lock:", description="鎖鍵前綴,避免key衝突")
timeout: int = Field(default=10, description="鎖超時時間(秒),防止死鎖")
renew_interval: int = Field(default=3, description="鎖續約間隔(秒),防止業務超時")
2.2 異步分佈式鎖實現(aioredis)
因為FastAPI是異步的,必須用aioredis(異步Redis客户端)。寫個RedisDistributedLock類,封裝鎖的獲取、釋放、續約:
# distributed_lock.py
from aioredis import Redis, RedisError
from pydantic import BaseModel
import uuid
import asyncio
class RedisDistributedLock:
def __init__(self, config: RedisLockConfig):
self.config = config
self.redis: Redis | None = None # Redis客户端實例
self.lock_key: str | None = None # 當前鎖的key
self.lock_value: str | None = None # 唯一標識(防誤刪別人的鎖)
self.renew_task: asyncio.Task | None = None # 鎖續約任務
# 異步上下文管理器:自動連接/斷開Redis
async def __aenter__(self) -> "RedisDistributedLock":
await self._connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.release()
await self._disconnect()
# 連接Redis
async def _connect(self):
if not self.redis:
self.redis = await Redis.from_url(self.config.redis_url)
# 斷開Redis連接
async def _disconnect(self):
if self.redis:
await self.redis.close()
await self.redis.wait_closed()
self.redis = None
# 獲取鎖:原子操作(SETNX + EX)
async def acquire(self, lock_name: str) -> bool:
self.lock_key = f"{self.config.lock_prefix}{lock_name}"
self.lock_value = str(uuid.uuid4()) # 生成唯一值,防誤刪
try:
# SET key value NX(不存在才設置) EX(過期時間)
success = await self.redis.set(
self.lock_key, self.lock_value,
nx=True,
ex=self.config.timeout
)
except RedisError as e:
print(f"獲取鎖失敗: {e}")
return False
if success:
# 啓動鎖續約任務(防止業務超時)
self.renew_task = asyncio.create_task(self._renew_lock())
return True
return False
# 鎖續約:用Lua腳本原子驗證+續期
async def _renew_lock(self):
while self.lock_key and self.lock_value:
try:
# Lua腳本:如果鎖是自己的,就續期
script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('EXPIRE', KEYS[1], ARGV[2])
else
return 0
end
"""
# 執行腳本:KEYS是鎖key,ARGV是鎖值+超時時間
result = await self.redis.eval(
script,
keys=[self.lock_key],
args=[self.lock_value, self.config.timeout]
)
if result == 0: # 續約失敗(鎖不是自己的)
break
except Exception as e:
print(f"續約失敗: {e}")
break
await asyncio.sleep(self.config.renew_interval) # 每隔3秒續一次
# 釋放鎖:用Lua腳本原子驗證+刪除
async def release(self):
# 先取消續約任務
if self.renew_task:
self.renew_task.cancel()
try:
await self.renew_task
except asyncio.CancelledError:
pass
if self.lock_key and self.lock_value and self.redis:
try:
# Lua腳本:只有鎖是自己的,才刪除
script = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
"""
await self.redis.eval(
script,
keys=[self.lock_key],
args=[self.lock_value]
)
except RedisError as e:
print(f"釋放鎖失敗: {e}")
# 重置狀態
self.lock_key = None
self.lock_value = None
2.3 FastAPI路由中使用鎖(依賴注入)
把鎖封裝成依賴,方便路由調用:
# main.py
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from lock_config import RedisLockConfig
from distributed_lock import RedisDistributedLock
import asyncio
app = FastAPI()
# 1. 配置依賴(讀取Redis連接參數)
async def get_lock_config() -> RedisLockConfig:
return RedisLockConfig() # 實際項目可以從環境變量讀,比如os.getenv("REDIS_URL")
# 2. 鎖依賴:用異步生成器管理生命週期
async def get_distributed_lock(
config: RedisLockConfig = Depends(get_lock_config)
) -> RedisDistributedLock:
async with RedisDistributedLock(config) as lock:
yield lock
# 模擬庫存(實際用數據庫)
fake_inventory = {"iphone15": 1}
# 3. 秒殺接口(用鎖保護庫存扣減)
@app.post("/seckill/{product_id}")
async def seckill(
product_id: str,
lock: RedisDistributedLock = Depends(get_distributed_lock)
):
# 先拿鎖,拿不到返回429(請求過多)
if not await lock.acquire(lock_name=product_id):
raise HTTPException(status_code=429, detail="搶的人太多啦,再試一次~")
try:
# 業務邏輯:扣減庫存
if fake_inventory.get(product_id, 0) <= 0:
raise HTTPException(status_code=400, detail="手慢了,商品已售罄!")
fake_inventory[product_id] -= 1
return {"msg": "秒殺成功!", "剩餘庫存": fake_inventory[product_id]}
finally:
# 不管成功失敗,都釋放鎖(重要!)
await lock.release()
三、分佈式鎖的測試策略與用例設計
3.1 要測什麼?
分佈式鎖的測試要覆蓋正常場景和異常場景:
- 單實例併發:同一FastAPI實例下,多個請求搶鎖;
- 多實例併發:啓動多個FastAPI實例(比如用
uvicorn main:app --port 8000和--port 8001),用Postman批量發請求; - 鎖超時:持有鎖的進程超時,鎖自動釋放;
- 異常崩潰:持有鎖的進程突然死掉,鎖是否自動釋放;
- 鎖續約:業務邏輯超時,續約是否成功。
3.2 異步測試用例(pytest-asyncio)
用pytest-asyncio寫異步測試,示例:
# test_seckill.py
import pytest
from httpx import AsyncClient
from main import app, fake_inventory
import asyncio
# 1. 測試客户端 fixture
@pytest.fixture(scope="module")
async def client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
# 2. 重置庫存 fixture(每個測試前重置)
@pytest.fixture(autouse=True)
def reset_inv():
fake_inventory["iphone15"] = 1
yield
# 3. 測試1:單請求秒殺成功
@pytest.mark.asyncio
async def test_seckill_success(client: AsyncClient):
resp = await client.post("/seckill/iphone15")
assert resp.status_code == 200
assert resp.json() == {"msg": "秒殺成功!", "剩餘庫存": 0}
# 4. 測試2:併發請求,只有1個成功
@pytest.mark.asyncio
async def test_seckill_concurrent(client: AsyncClient):
# 定義併發請求函數
async def send_req():
resp = await client.post("/seckill/iphone15")
return resp.status_code, resp.json()
# 發5個併發請求
tasks = [send_req() for _ in range(5)]
results = await asyncio.gather(*tasks)
# 統計結果:1個200(成功),4個429/400(失敗)
success = sum(1 for status, _ in results if status == 200)
assert success == 1
# 5. 測試3:鎖超時後釋放
@pytest.mark.asyncio
async def test_lock_timeout(client: AsyncClient):
fake_inventory["iphone15"] = 2 # 庫存改成2,方便測試
# 模擬一個持有鎖超時的進程
async def hold_lock():
async with RedisDistributedLock(RedisLockConfig(timeout=2)) as lock:
await lock.acquire("iphone15")
await asyncio.sleep(3) # 超過鎖超時時間(2秒)
# 先啓動hold_lock,1秒後發秒殺請求
task = asyncio.create_task(hold_lock())
await asyncio.sleep(1)
resp = await client.post("/seckill/iphone15")
await task
# 驗證:鎖超時釋放,請求成功
assert resp.status_code == 200
assert fake_inventory["iphone15"] == 1
運行測試:
pytest test_seckill.py -v
四、課後Quiz:鞏固知識
問題1:為什麼FastAPI異步應用要用aioredis而不是redis-py?
答案解析:
redis-py是同步庫,會阻塞FastAPI的事件循環(相當於“堵住了水管”),導致所有請求變慢。而aioredis是異步庫,能和FastAPI的異步機制完美配合,不會阻塞,性能更高。
問題2:鎖的“超時時間”設太短或太長有什麼問題?
答案解析:
- 設太短:如果業務邏輯沒處理完,鎖就自動釋放了,其他請求會拿到鎖,導致數據衝突(比如庫存變成負數);
- 設太長:如果持有鎖的進程崩潰,鎖要等很久才釋放,其他請求一直拿不到鎖,導致假死鎖(系統像“卡住了”)。
五、常見報錯與解決
報錯1:aioredis.exceptions.ConnectionClosedError
原因:Redis沒啓動,或者連接URL錯了(比如端口不是6379)。
解決:
- 檢查Redis是否在運行:
redis-cli ping(返回PONG就對了); - 驗證
RedisLockConfig裏的redis_url是否正確(比如redis://localhost:6379); - 增加連接超時時間:
Redis.from_url(redis_url, timeout=10)。
報錯2:HTTP 429 Too Many Requests
原因:併發請求太多,鎖被佔了。
解決:
- 優化業務邏輯,縮短鎖的持有時間(比如把非核心邏輯移到鎖外面);
- 用隊列限流(比如Redis隊列,把請求排成隊,一個一個處理);
- 返回友好提示(比如“再試一次”)。
報錯3:鎖釋放失敗(Lua腳本返回0)
原因:鎖已經被其他進程釋放了(比如超時),或者鎖值不對。
解決:
- 確保
release方法在finally塊裏(不管成功失敗都釋放); - 檢查鎖的
timeout設置,不要太短; - 用唯一鎖值(
uuid),避免釋放別人的鎖。
六、實戰運行步驟
- 啓動Redis:
redis-server(Windows用redis-server.exe); - 啓動FastAPI:
uvicorn main:app --reload; - 測試接口:用Postman發
POST http://localhost:8000/seckill/iphone15,看返回結果; - 運行測試:
pytest test_seckill.py -v。
餘下文章內容請點擊跳轉至 個人博客頁面 或者 掃碼關注或者微信搜一搜:編程智域 前端至全棧交流與成長,閲讀完整的文章:FastAPI秒殺庫存總變負數?Redis分佈式鎖能幫你守住底線嗎
<details>
<summary>往期文章歸檔</summary>
- FastAPI的CI流水線怎麼自動測端點,還能讓Allure報告美到犯規? - cmdragon's Blog
- 如何用GitHub Actions為FastAPI項目打造自動化測試流水線? - cmdragon's Blog
- 如何用Git Hook和CI流水線為FastAPI項目保駕護航? - cmdragon's Blog
- FastAPI如何用契約測試確保API的「菜單」與「菜品」一致?
- 為什麼TDD能讓你的FastAPI開發飛起來? - cmdragon's Blog
- 如何用FastAPI玩轉多模塊測試與異步任務,讓代碼不再“鬧脾氣”? - cmdragon's Blog
- 如何在FastAPI中玩轉“時光倒流”的數據庫事務回滾測試?
- 如何在FastAPI中優雅地模擬多模塊集成測試? - cmdragon's Blog
- 多環境配置切換機制能否讓開發與生產無縫銜接? - cmdragon's Blog
- 如何在 FastAPI 中巧妙覆蓋依賴注入並攔截第三方服務調用? - cmdragon's Blog
- 為什麼你的單元測試需要Mock數據庫才能飛起來? - cmdragon's Blog
- 如何在FastAPI中巧妙隔離依賴項,讓單元測試不再頭疼? - cmdragon's Blog
- 如何在FastAPI中巧妙隔離依賴項,讓單元測試不再頭疼? - cmdragon's Blog
- 測試覆蓋率不夠高?這些技巧讓你的FastAPI測試無懈可擊! - cmdragon's Blog
- 為什麼你的FastAPI測試覆蓋率總是低得讓人想哭? - cmdragon's Blog
- 如何讓FastAPI測試不再成為你的噩夢? - cmdragon's Blog
- FastAPI測試環境配置的秘訣,你真的掌握了嗎? - cmdragon's Blog
- 全鏈路追蹤如何讓FastAPI微服務架構的每個請求都無所遁形? - cmdragon's Blog
- 如何在API高併發中玩轉資源隔離與限流策略? - cmdragon's Blog
- 任務分片執行模式如何讓你的FastAPI性能飆升? - cmdragon's Blog
- 冷熱任務分離:是提升Web性能的終極秘籍還是技術噱頭? - cmdragon's Blog
- 如何讓FastAPI在百萬級任務處理中依然遊刃有餘? - cmdragon's Blog
- 如何讓FastAPI與消息隊列的聯姻既甜蜜又可靠? - cmdragon's Blog
- 如何在FastAPI中巧妙實現延遲隊列,讓任務乖乖等待? - cmdragon's Blog
- FastAPI的死信隊列處理機制:為何你的消息系統需要它? - cmdragon's Blog
- 如何讓FastAPI任務系統在失敗時自動告警並自我修復? - cmdragon's Blog
- 如何用Prometheus和FastAPI打造任務監控的“火眼金睛”? - cmdragon's Blog
- 如何用APScheduler和FastAPI打造永不宕機的分佈式定時任務系統? - cmdragon's Blog
- 如何在 FastAPI 中玩轉 APScheduler,讓任務定時自動執行? - cmdragon's Blog
- 定時任務系統如何讓你的Web應用自動完成那些煩人的重複工作? - cmdragon's Blog
- Celery任務監控的魔法背後藏着什麼秘密? - cmdragon's Blog
- 如何讓Celery任務像VIP客户一樣享受優先待遇? - cmdragon's Blog
- 如何讓你的FastAPI Celery Worker在壓力下優雅起舞? - cmdragon's Blog
- FastAPI與Celery的完美邂逅,如何讓異步任務飛起來? - cmdragon's Blog
- FastAPI消息持久化與ACK機制:如何確保你的任務永不迷路? - cmdragon's Blog
- FastAPI的BackgroundTasks如何玩轉生產者-消費者模式? - cmdragon's Blog
- BackgroundTasks 還是 RabbitMQ?你的異步任務到底該選誰? - cmdragon's Blog
</details>
<details>
<summary>免費好用的熱門在線工具</summary>
- 歌詞生成工具 - 應用商店 | By cmdragon
- 網盤資源聚合搜索 - 應用商店 | By cmdragon
- ASCII字符畫生成器 - 應用商店 | By cmdragon
- JSON Web Tokens 工具 - 應用商店 | By cmdragon
- Bcrypt 密碼工具 - 應用商店 | By cmdragon
- GIF 合成器 - 應用商店 | By cmdragon
- GIF 分解器 - 應用商店 | By cmdragon
- 文本隱寫術 - 應用商店 | By cmdragon
- CMDragon 在線工具 - 高級AI工具箱與開發者套件 | 免費好用的在線工具
- 應用商店 - 發現1000+提升效率與開發的AI工具和實用程序 | 免費好用的在線工具
- CMDragon 更新日誌 - 最新更新、功能與改進 | 免費好用的在線工具
- 支持我們 - 成為贊助者 | 免費好用的在線工具
- AI文本生成圖像 - 應用商店 | 免費好用的在線工具
- 臨時郵箱 - 應用商店 | 免費好用的在線工具
- 二維碼解析器 - 應用商店 | 免費好用的在線工具
- 文本轉思維導圖 - 應用商店 | 免費好用的在線工具
- 正則表達式可視化工具 - 應用商店 | 免費好用的在線工具
- 文件隱寫工具 - 應用商店 | 免費好用的在線工具
- IPTV 頻道探索器 - 應用商店 | 免費好用的在線工具
- 快傳 - 應用商店 | 免費好用的在線工具
- 隨機抽獎工具 - 應用商店 | 免費好用的在線工具
- 動漫場景查找器 - 應用商店 | 免費好用的在線工具
- 時間工具箱 - 應用商店 | 免費好用的在線工具
- 網速測試 - 應用商店 | 免費好用的在線工具
- AI 智能摳圖工具 - 應用商店 | 免費好用的在線工具
- 背景替換工具 - 應用商店 | 免費好用的在線工具
- 藝術二維碼生成器 - 應用商店 | 免費好用的在線工具
- Open Graph 元標籤生成器 - 應用商店 | 免費好用的在線工具
- 圖像對比工具 - 應用商店 | 免費好用的在線工具
- 圖片壓縮專業版 - 應用商店 | 免費好用的在線工具
- 密碼生成器 - 應用商店 | 免費好用的在線工具
- SVG優化器 - 應用商店 | 免費好用的在線工具
- 調色板生成器 - 應用商店 | 免費好用的在線工具
- 在線節拍器 - 應用商店 | 免費好用的在線工具
- IP歸屬地查詢 - 應用商店 | 免費好用的在線工具
- CSS網格佈局生成器 - 應用商店 | 免費好用的在線工具
- 郵箱驗證工具 - 應用商店 | 免費好用的在線工具
- 書法練習字帖 - 應用商店 | 免費好用的在線工具
- 金融計算器套件 - 應用商店 | 免費好用的在線工具
- 中國親戚關係計算器 - 應用商店 | 免費好用的在線工具
- Protocol Buffer 工具箱 - 應用商店 | 免費好用的在線工具
- IP歸屬地查詢 - 應用商店 | 免費好用的在線工具
- 圖片無損放大 - 應用商店 | 免費好用的在線工具
- 文本比較工具 - 應用商店 | 免費好用的在線工具
- IP批量查詢工具 - 應用商店 | 免費好用的在線工具
- 域名查詢工具 - 應用商店 | 免費好用的在線工具
- DNS工具箱 - 應用商店 | 免費好用的在線工具
- 網站圖標生成器 - 應用商店 | 免費好用的在線工具
- XML Sitemap
</details>