Saga分佈式事務框架執行邏輯
📋 目錄
- 框架概述
- 核心組件架構
- 數據庫表設計
- 完整執行流程
- 節點發現與調用機制
- 精簡補償策略設計
- 總結
框架概述
這是一個基於數據庫驅動的Saga分佈式事務框架,專門用於解決跨服務間數據同步的一致性問題。框架採用了混合編排模式,結合了集中式任務分解和分佈式執行的優勢。
核心設計理念
- 🎯 分層解耦: 任務分解與任務執行完全分離
- 🌐 節點自治: 消費端節點獨立執行和管理任務
- 📊 狀態透明: 完整的執行日誌和狀態追蹤
- 🔄 容錯恢復: 失敗重試與自動補償機制
- ⚖️ 負載均衡: 基於節點負載的智能調度
業務場景
- 空間同步開啓: 跨服務複製空間、頁面、權限等數據
- 增量數據同步: 已開啓同步的項目進行增量更新
- 同步關閉清理: 關閉同步時清理相關數據
核心組件架構
``mermaid
數據庫表設計
📋 核心表結構
1. distribute_event (主事務表)
記錄頂層業務事務的基本信息和整體狀態。
2. distribute_event_step (步驟表)
記錄事務分解後的各個原子步驟信息。
3. distribute_event_step_log (執行日誌表) ✨ 完整設計
記錄消費端節點的執行日誌,實現簡潔而強大的冪等性保證、重試機制和通知狀態管理。
CREATE TABLE distribute_event_step_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主鍵',
step_code VARCHAR(64) NOT NULL COMMENT '關聯業務端distribute_event_step.code',
job_code VARCHAR(64) NOT NULL COMMENT '主事務編碼,關聯業務端distribute_event.code',
consumer_node VARCHAR(50) NOT NULL COMMENT '消費者節點地址',
-- 冪等性保證字段
execution_key VARCHAR(128) NOT NULL COMMENT '執行唯一鍵: {step_code}_{consumer_node}_{yyyyMMdd}',
business_key VARCHAR(64) COMMENT '業務唯一鍵,基於業務數據哈希值',
-- 執行狀態管理
exec_status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '執行狀態: PENDING, EXECUTING, SUCCESS, FAILED, RETRYING',
retry_count INT DEFAULT 0 COMMENT '當前重試次數',
max_retry INT DEFAULT 3 COMMENT '最大重試次數',
-- 通知狀態管理(新增)
notify_status VARCHAR(20) DEFAULT 'NOT_REQUIRED' COMMENT '通知狀態: NOT_REQUIRED, PENDING, SUCCESS, FAILED',
notify_retry_count INT DEFAULT 0 COMMENT '通知重試次數',
max_notify_retry INT DEFAULT 3 COMMENT '最大通知重試次數',
next_notify_time TIMESTAMP NULL COMMENT '下次通知時間',
notify_url VARCHAR(255) COMMENT '通知回調地址',
-- 執行信息
payload TEXT COMMENT '執行數據載荷',
result_data TEXT COMMENT '執行結果數據',
error_message TEXT COMMENT '執行錯誤信息',
notify_error_message TEXT COMMENT '通知錯誤信息',
start_time TIMESTAMP NULL COMMENT '開始執行時間',
end_time TIMESTAMP NULL COMMENT '結束執行時間',
next_retry_time TIMESTAMP NULL COMMENT '下次執行重試時間',
-- 回滾支持
rollback_data TEXT COMMENT '回滾數據快照,JSON格式',
is_rollback TINYINT(1) DEFAULT 0 COMMENT '是否已回滾',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
-- 冪等性保證索引
UNIQUE KEY uk_execution_key (execution_key),
UNIQUE KEY uk_business_key (step_code, business_key),
-- 查詢優化索引
INDEX idx_exec_status (exec_status),
INDEX idx_notify_status (notify_status, next_notify_time),
INDEX idx_step_code (step_code),
INDEX idx_consumer_node (consumer_node),
INDEX idx_retry_time (next_retry_time),
INDEX idx_job_code (job_code)
);
📊 字段設計詳解與使用説明
1. 冪等性保證字段
/**
* execution_key: 執行唯一鍵
* 用途: 防止同一步驟在同一節點同一天重複執行
* 格式: SPACE_SYNC_001_192.168.1.10:8080_20240316
* 使用場景: 消費端接收HTTP請求時檢查是否已存在相同的execution_key
*/
public String generateExecutionKey(String stepCode, String consumerNode) {
String dateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
return String.format("%s_%s_%s", stepCode, consumerNode, dateStr);
}
/**
* business_key: 業務唯一鍵
* 用途: 基於業務數據內容的去重,防止相同業務數據重複處理
* 生成: 對payload業務數據進行MD5哈希
* 使用場景: 當業務數據完全相同時避免重複執行
*/
public String generateBusinessKey(Object payload) {
String payloadJson = JSON.toJSONString(payload);
return DigestUtils.md5DigestAsHex(payloadJson.getBytes()).substring(0, 8);
}
2. 狀態管理字段
/**
* exec_status: 執行狀態
* PENDING: 待執行 - 剛接收到任務,等待定時器掃描
* EXECUTING: 執行中 - 正在執行業務邏輯
* SUCCESS: 執行成功 - 業務邏輯執行完成
* FAILED: 執行失敗 - 重試次數耗盡後的最終失敗狀態
* RETRYING: 重試中 - 執行失敗後等待重試的狀態
*/
/**
* notify_status: 通知狀態
* NOT_REQUIRED: 無需通知 - 執行中或失敗時的默認狀態
* PENDING: 待通知 - 執行成功後需要通知業務端
* SUCCESS: 通知成功 - 業務端已收到通知並確認
* FAILED: 通知失敗 - 通知重試次數耗盡後的狀態
*/
3. 重試機制字段
/**
* retry_count / max_retry: 執行重試控制
* 用途: 控制業務邏輯執行的重試次數,避免無限重試
* 邏輯: 失敗時retry_count+1,超過max_retry則標記為FAILED
*/
/**
* notify_retry_count / max_notify_retry: 通知重試控制
* 用途: 控制通知業務端的重試次數,確保通知到位
* 邏輯: 通知失敗時notify_retry_count+1,採用指數退避策略
*/
/**
* next_retry_time / next_notify_time: 重試時間控制
* 用途: 指數退避算法的時間調度
* 計算: delay = Math.pow(2, retryCount) * baseDelaySeconds
*/
4. 數據存儲字段
/**
* payload: 執行數據載荷
* 用途: 存儲從業務端接收的步驟執行數據
* 格式: JSON字符串,包含業務邏輯執行所需的所有參數
*/
/**
* result_data: 執行結果數據
* 用途: 存儲業務邏輯執行後的結果,用於通知業務端
* 格式: JSON字符串,包含執行結果、影響的數據ID等
*/
/**
* rollback_data: 回滾數據快照
* 用途: 存儲執行前的原始數據狀態,用於失敗回滾
* 格式: JSON字符串,包含需要刪除的數據ID列表等
*/
完整執行流程
🎆 整體流程時序圖(數據庫分離)
整體執行流程圖
節點發現與調用機制
服務節點通過心跳註冊,基於負載權重選擇最優節點執行任務。
@Service
public class DistributeNodeRegistry {
@Scheduled(fixedRate = 30000) // 每30秒心跳
public void heartbeat() {
String nodeAddress = getLocalNodeAddress();
NodeMetadata metadata = collectNodeMetadata(); // 收集CPU、內存、任務數量等
nodeRegistryMapper.upsertNode(NodeRegistryRecord.builder()
.serviceName(getServiceName())
.nodeAddress(nodeAddress)
.status(1) // 在線
.lastHeartbeatTime(new Date())
.metadata(JSON.toJSONString(metadata))
.build());
}
// 選擇最優節點(基於負載權重)
public String selectOptimalNode(String serviceName) {
List<NodeRegistryRecord> nodes = nodeRegistryMapper.selectAvailableNodes(serviceName);
return nodes.stream()
.min(Comparator.comparing(this::calculateNodeLoad))
.map(NodeRegistryRecord::getNodeAddress)
.orElseThrow(() -> new NoAvailableNodeException("無可用節點"));
}
}
🔄 精簡補償策略設計
執行重試與通知重試機制
@Component
public class RetryAndNotificationService {
/**
* 處理執行失敗 - 重試機制(刪除重新插入)
*/
public void handleExecutionFailure(DistributeEventStepLog stepLog, String errorMessage) {
int currentRetry = stepLog.getRetryCount();
if (currentRetry < stepLog.getMaxRetry()) {
// 還有重試機會 - 刪除重新插入
executeRetryByReinsert(stepLog, errorMessage);
} else {
// 重試次數耗盡,觸發回滾
triggerRollback(stepLog, errorMessage);
}
}
/**
* 執行重試邏輯:刪除重新插入
*/
private void executeRetryByReinsert(DistributeEventStepLog stepLog, String errorMessage) {
try {
// 1. 更新狀態為 RETRYING
stepLogMapper.updateStatus(stepLog.getId(), "RETRYING", errorMessage);
// 2. 計算下次重試時間(指數退避)
long delaySeconds = (long) Math.pow(2, stepLog.getRetryCount() + 1) * 30;
Timestamp nextRetryTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);
// 3. 刪除當前記錄
stepLogMapper.deleteById(stepLog.getId());
// 4. 重新插入新記錄
DistributeEventStepLog newStepLog = DistributeEventStepLog.builder()
.stepCode(stepLog.getStepCode())
.jobCode(stepLog.getJobCode())
.consumerNode(stepLog.getConsumerNode())
.executionKey(stepLog.getExecutionKey())
.businessKey(stepLog.getBusinessKey())
.execStatus("PENDING")
.retryCount(stepLog.getRetryCount() + 1)
.maxRetry(stepLog.getMaxRetry())
.nextRetryTime(nextRetryTime)
.payload(stepLog.getPayload())
.rollbackData(stepLog.getRollbackData())
.build();
stepLogMapper.insert(newStepLog);
log.info("執行重試安排成功,第{}次重試,下次執行時間: {}",
newStepLog.getRetryCount(), nextRetryTime);
} catch (Exception e) {
log.error("執行重試安排失敗: {}", stepLog.getStepCode(), e);
}
}
/**
* 處理通知失敗 - 指數退避重試
*/
public void handleNotificationFailure(DistributeEventStepLog stepLog, String notifyErrorMessage) {
int currentNotifyRetry = stepLog.getNotifyRetryCount();
if (currentNotifyRetry < stepLog.getMaxNotifyRetry()) {
// 還有通知重試機會 - 指數退避
scheduleNotificationRetry(stepLog, notifyErrorMessage);
} else {
// 通知重試次數耗盡
markNotificationFailed(stepLog, notifyErrorMessage);
}
}
/**
* 安排通知重試(指數退避)
*/
private void scheduleNotificationRetry(DistributeEventStepLog stepLog, String notifyErrorMessage) {
try {
int nextNotifyRetryCount = stepLog.getNotifyRetryCount() + 1;
// 指數退避算法: 2^n * 60秒
long delaySeconds = (long) Math.pow(2, nextNotifyRetryCount) * 60;
Timestamp nextNotifyTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);
stepLogMapper.updateNotificationForRetry(
stepLog.getId(),
nextNotifyRetryCount,
nextNotifyTime,
notifyErrorMessage
);
log.info("通知重試安排成功,第{}次重試,下次通知時間: {}",
nextNotifyRetryCount, nextNotifyTime);
} catch (Exception e) {
log.error("通知重試安排失敗: {}", stepLog.getStepCode(), e);
}
}
/**
* 回滾機制:刪除重新開始
*/
private void triggerRollback(DistributeEventStepLog stepLog, String errorMessage) {
try {
// 1. 標記為執行失敗
stepLogMapper.updateStatus(stepLog.getId(), "FAILED", errorMessage);
// 2. 執行回滾操作(刪除相關數據)
executeRollbackAction(stepLog);
// 3. 標記回滾完成
stepLogMapper.markAsRollback(stepLog.getId());
// 4. 通知業務端失敗
notifyBusinessFailure(stepLog);
} catch (Exception e) {
log.error("回滾執行失敗: {}", stepLog.getStepCode(), e);
alertService.sendRollbackFailureAlert(stepLog, e);
}
}
// 執行具體的回滾操作
private void executeRollbackAction(DistributeEventStepLog stepLog) {
String rollbackData = stepLog.getRollbackData();
if (StringUtils.isBlank(rollbackData)) return;
RollbackSnapshot snapshot = JSON.parseObject(rollbackData, RollbackSnapshot.class);
switch (snapshot.getStepType()) {
case "PAGE_CREATE":
pageService.deleteById(snapshot.getEntityId());
break;
case "DATA_COPY":
dataService.deleteBatch(snapshot.getDataIds());
break;
case "PERMISSION_GRANT":
permissionService.revoke(snapshot.getPermissionIds());
break;
}
}
}
總結
主要特點:
✅ 流程清晰: 業務拆解 → HTTP分發 → 定時消費 → 狀態同步
✅ 冪等性簡單: 一天一次執行保證,避免重複處理
✅ 重試機制: 最多3次,指數退避,失敗後智能回滾
✅ 回滾策略: 刪除重新開始,簡單有效
✅ 併發控制: 2線程限流,避免資源爭搶
✅ 狀態追蹤: 完整的執行鏈路監控
核心優勢:
🎯 設計精簡: 去除複雜的多重冪等性策略,採用基於日期的簡單方案
💡 實用性強: 回滾即刪除,符合業務實際需求
🔧 易於維護: 清晰的代碼結構和執行流程
⚡ 性能優化: 合理的併發控制和索引設計
🛡️ 可靠性高: 完善的重試和回滾機制