动态

详情 返回 返回

Saga分佈式事務框架執行邏輯 - 动态 详情

Saga分佈式事務框架執行邏輯

📋 目錄

  • 框架概述
  • 核心組件架構
  • 數據庫表設計
  • 完整執行流程
  • 節點發現與調用機制
  • 精簡補償策略設計
  • 總結

框架概述

這是一個基於數據庫驅動的Saga分佈式事務框架,專門用於解決跨服務間數據同步的一致性問題。框架採用了混合編排模式,結合了集中式任務分解和分佈式執行的優勢。

核心設計理念

  • 🎯 分層解耦: 任務分解與任務執行完全分離
  • 🌐 節點自治: 消費端節點獨立執行和管理任務
  • 📊 狀態透明: 完整的執行日誌和狀態追蹤
  • 🔄 容錯恢復: 失敗重試與自動補償機制
  • ⚖️ 負載均衡: 基於節點負載的智能調度

業務場景

  • 空間同步開啓: 跨服務複製空間、頁面、權限等數據
  • 增量數據同步: 已開啓同步的項目進行增量更新
  • 同步關閉清理: 關閉同步時清理相關數據

核心組件架構

``mermaid

graph TB
    subgraph "業務觸發層"
        A1[空間同步開啓] --> B1[業務端拆解步驟]
        A2[增量數據更新] --> B1
        A3[同步關閉清理] --> B1
    end
    
    subgraph "任務分解層"
        B1 --> C1[存儲distribute_event]
        C1 --> C2[存儲distribute_event_step]
        C2 --> C3[HTTP發送步驟數據]
    end
    
    subgraph "消費端接收層"
        C3 --> D1[消費端接收HTTP請求]
        D1 --> D2[存儲distribute_event_step_log]
        D2 --> D3[返回接收確認]
        D3 --> D4[業務端更新狀態為待消費]
    end
    
    subgraph "定時執行層"
        D4 --> E1[定時任務掃描待執行記錄]
        E1 --> E2[2線程併發控制]
        E2 --> E3[執行具體業務邏輯]
        E3 --> E4[HTTP回調通知結果]
        E4 --> E5[業務端更新狀態]
    end

數據庫表設計

📋 核心表結構

1. distribute_event (主事務表)

記錄頂層業務事務的基本信息和整體狀態。

2. distribute_event_step (步驟表)

記錄事務分解後的各個原子步驟信息。

3. distribute_event_step_log (執行日誌表) ✨ 完整設計

記錄消費端節點的執行日誌,實現簡潔而強大的冪等性保證、重試機制和通知狀態管理。

CREATE TABLE distribute_event_step_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主鍵',
    step_code VARCHAR(64) NOT NULL COMMENT '關聯業務端distribute_event_step.code',
    job_code VARCHAR(64) NOT NULL COMMENT '主事務編碼,關聯業務端distribute_event.code',
    consumer_node VARCHAR(50) NOT NULL COMMENT '消費者節點地址',
    
    -- 冪等性保證字段
    execution_key VARCHAR(128) NOT NULL COMMENT '執行唯一鍵: {step_code}_{consumer_node}_{yyyyMMdd}',
    business_key VARCHAR(64) COMMENT '業務唯一鍵,基於業務數據哈希值',
    
    -- 執行狀態管理
    exec_status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '執行狀態: PENDING, EXECUTING, SUCCESS, FAILED, RETRYING',
    retry_count INT DEFAULT 0 COMMENT '當前重試次數',
    max_retry INT DEFAULT 3 COMMENT '最大重試次數',
    
    -- 通知狀態管理(新增)
    notify_status VARCHAR(20) DEFAULT 'NOT_REQUIRED' COMMENT '通知狀態: NOT_REQUIRED, PENDING, SUCCESS, FAILED',
    notify_retry_count INT DEFAULT 0 COMMENT '通知重試次數',
    max_notify_retry INT DEFAULT 3 COMMENT '最大通知重試次數',
    next_notify_time TIMESTAMP NULL COMMENT '下次通知時間',
    notify_url VARCHAR(255) COMMENT '通知回調地址',
    
    -- 執行信息
    payload TEXT COMMENT '執行數據載荷',
    result_data TEXT COMMENT '執行結果數據',
    error_message TEXT COMMENT '執行錯誤信息',
    notify_error_message TEXT COMMENT '通知錯誤信息',
    start_time TIMESTAMP NULL COMMENT '開始執行時間',
    end_time TIMESTAMP NULL COMMENT '結束執行時間',
    next_retry_time TIMESTAMP NULL COMMENT '下次執行重試時間',
    
    -- 回滾支持
    rollback_data TEXT COMMENT '回滾數據快照,JSON格式',
    is_rollback TINYINT(1) DEFAULT 0 COMMENT '是否已回滾',
    
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
    
    -- 冪等性保證索引
    UNIQUE KEY uk_execution_key (execution_key),
    UNIQUE KEY uk_business_key (step_code, business_key),
    
    -- 查詢優化索引
    INDEX idx_exec_status (exec_status),
    INDEX idx_notify_status (notify_status, next_notify_time),
    INDEX idx_step_code (step_code),
    INDEX idx_consumer_node (consumer_node),
    INDEX idx_retry_time (next_retry_time),
    INDEX idx_job_code (job_code)
);

📊 字段設計詳解與使用説明

1. 冪等性保證字段
/**
 * execution_key: 執行唯一鍵
 * 用途: 防止同一步驟在同一節點同一天重複執行
 * 格式: SPACE_SYNC_001_192.168.1.10:8080_20240316
 * 使用場景: 消費端接收HTTP請求時檢查是否已存在相同的execution_key
 */
public String generateExecutionKey(String stepCode, String consumerNode) {
    String dateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
    return String.format("%s_%s_%s", stepCode, consumerNode, dateStr);
}

/**
 * business_key: 業務唯一鍵
 * 用途: 基於業務數據內容的去重,防止相同業務數據重複處理
 * 生成: 對payload業務數據進行MD5哈希
 * 使用場景: 當業務數據完全相同時避免重複執行
 */
public String generateBusinessKey(Object payload) {
    String payloadJson = JSON.toJSONString(payload);
    return DigestUtils.md5DigestAsHex(payloadJson.getBytes()).substring(0, 8);
}
2. 狀態管理字段
/**
 * exec_status: 執行狀態
 * PENDING: 待執行 - 剛接收到任務,等待定時器掃描
 * EXECUTING: 執行中 - 正在執行業務邏輯
 * SUCCESS: 執行成功 - 業務邏輯執行完成
 * FAILED: 執行失敗 - 重試次數耗盡後的最終失敗狀態
 * RETRYING: 重試中 - 執行失敗後等待重試的狀態
 */

/**
 * notify_status: 通知狀態
 * NOT_REQUIRED: 無需通知 - 執行中或失敗時的默認狀態
 * PENDING: 待通知 - 執行成功後需要通知業務端
 * SUCCESS: 通知成功 - 業務端已收到通知並確認
 * FAILED: 通知失敗 - 通知重試次數耗盡後的狀態
 */
3. 重試機制字段
/**
 * retry_count / max_retry: 執行重試控制
 * 用途: 控制業務邏輯執行的重試次數,避免無限重試
 * 邏輯: 失敗時retry_count+1,超過max_retry則標記為FAILED
 */

/**
 * notify_retry_count / max_notify_retry: 通知重試控制
 * 用途: 控制通知業務端的重試次數,確保通知到位
 * 邏輯: 通知失敗時notify_retry_count+1,採用指數退避策略
 */

/**
 * next_retry_time / next_notify_time: 重試時間控制
 * 用途: 指數退避算法的時間調度
 * 計算: delay = Math.pow(2, retryCount) * baseDelaySeconds
 */
4. 數據存儲字段
/**
 * payload: 執行數據載荷
 * 用途: 存儲從業務端接收的步驟執行數據
 * 格式: JSON字符串,包含業務邏輯執行所需的所有參數
 */

/**
 * result_data: 執行結果數據
 * 用途: 存儲業務邏輯執行後的結果,用於通知業務端
 * 格式: JSON字符串,包含執行結果、影響的數據ID等
 */

/**
 * rollback_data: 回滾數據快照
 * 用途: 存儲執行前的原始數據狀態,用於失敗回滾
 * 格式: JSON字符串,包含需要刪除的數據ID列表等
 */

完整執行流程

🎆 整體流程時序圖(數據庫分離)

sequenceDiagram
    participant BIZ as 業務服務A
    participant BIZ_DB as 業務端數據庫
    participant BIZ2 as 業務服務B(消費端)
    participant CONSUMER_DB as 消費端數據庫
    participant TASK as 定時任務
    
    Note over BIZ,TASK: 階段一: 業務觸發與任務分解
    BIZ->>BIZ: 1. 業務觸發(空間同步/增量更新等)
    BIZ->>BIZ: 2. 執行步驟拆解邏輯
    BIZ->>BIZ_DB: 3. 存儲主事務 distribute_event
    BIZ->>BIZ_DB: 4. 存儲子步驟 distribute_event_step
    
    Note over BIZ,TASK: 階段二: HTTP任務分發
    BIZ->>BIZ2: 5. HTTP請求發送步驟數據
    BIZ2->>CONSUMER_DB: 6. 存儲執行日誌 distribute_event_step_log
    BIZ2->>BIZ: 7. 返回接收確認
    BIZ->>BIZ_DB: 8. 更新步驟狀態為'待消費'
    
    Note over BIZ,TASK: 階段三: 定時消費執行(限流2線程)
    TASK->>CONSUMER_DB: 9. 掃描待執行狀態記錄
    TASK->>TASK: 10. 併發控制(最多2線程)
    TASK->>CONSUMER_DB: 11. 更新exec_status為'EXECUTING'
    TASK->>TASK: 12. 執行具體業務邏輯
    
    alt 執行成功
        TASK->>CONSUMER_DB: 13a. 更新exec_status為'SUCCESS'
        TASK->>CONSUMER_DB: 14a. 設置notify_status為'PENDING'
        
        Note over BIZ,TASK: 階段四: 通知業務端(指數退避重試)
        TASK->>BIZ: 15a. HTTP回調通知執行結果
        alt 通知成功
            BIZ->>TASK: 16a. 返回200狀態
            TASK->>CONSUMER_DB: 17a. 更新notify_status為'SUCCESS'
            BIZ->>BIZ_DB: 18a. 更新distribute_event_step狀態
            BIZ->>BIZ_DB: 19a. 更新distribute_event主事務狀態
        else 通知失敗
            BIZ->>TASK: 16b. 返回非200狀態或網絡異常
            TASK->>CONSUMER_DB: 17b. notify_retry_count+1
            alt 通知重試次數 < 3
                TASK->>CONSUMER_DB: 18b. 計算next_notify_time(指數退避)
                Note right of TASK: 等待指數退避時間後重新通知
                TASK->>BIZ: 19b. 重新發送通知(循環至16a)
            else 通知重試次數 >= 3
                TASK->>CONSUMER_DB: 20b. 更新notify_status為'FAILED'
                Note right of TASK: 通知失敗,需人工介入
            end
        end
        
    else 執行失敗
        TASK->>CONSUMER_DB: 13c. 更新retry_count+1
        alt 執行重試次數 < 3
            TASK->>CONSUMER_DB: 14c. 更新exec_status為'RETRYING'
            TASK->>CONSUMER_DB: 15c. 刪除當前記錄
            TASK->>CONSUMER_DB: 16c. 重新插入新記錄(計算next_retry_time)
            Note right of TASK: 等待指數退避時間後重新執行
        else 執行重試次數 >= 3
            TASK->>CONSUMER_DB: 17c. 更新exec_status為'FAILED'
            TASK->>BIZ: 18c. 通知執行最終失敗
            BIZ->>BIZ: 19c. 觸發回滾邏輯(刪除重新開始)
        end
    end

整體執行流程圖

flowchart TD
    A[業務觸發] --> B[拆解步驟存儲]
    B --> C[HTTP發送步驟數據]
    C --> D[消費端存儲日誌]
    D --> E[更新狀態為待消費]
    E --> F[定時任務掃描]
    F --> G[2線程併發執行]
    G --> H{執行結果}
    H -->|成功| I[HTTP回調通知]
    H -->|失敗| J{重試次數}
    J -->|<3次| K[指數退避重試]
    J -->|>=3次| L[觸發回滾刪除]
    I --> M[更新主事務狀態]
    L --> N[通知失敗完成]

節點發現與調用機制

服務節點通過心跳註冊,基於負載權重選擇最優節點執行任務。

@Service
public class DistributeNodeRegistry {
    
    @Scheduled(fixedRate = 30000) // 每30秒心跳
    public void heartbeat() {
        String nodeAddress = getLocalNodeAddress();
        NodeMetadata metadata = collectNodeMetadata(); // 收集CPU、內存、任務數量等
        
        nodeRegistryMapper.upsertNode(NodeRegistryRecord.builder()
            .serviceName(getServiceName())
            .nodeAddress(nodeAddress)
            .status(1) // 在線
            .lastHeartbeatTime(new Date())
            .metadata(JSON.toJSONString(metadata))
            .build());
    }
    
    // 選擇最優節點(基於負載權重)
    public String selectOptimalNode(String serviceName) {
        List<NodeRegistryRecord> nodes = nodeRegistryMapper.selectAvailableNodes(serviceName);
        return nodes.stream()
            .min(Comparator.comparing(this::calculateNodeLoad))
            .map(NodeRegistryRecord::getNodeAddress)
            .orElseThrow(() -> new NoAvailableNodeException("無可用節點"));
    }
}

🔄 精簡補償策略設計

執行重試與通知重試機制

@Component
public class RetryAndNotificationService {
    
    /**
     * 處理執行失敗 - 重試機制(刪除重新插入)
     */
    public void handleExecutionFailure(DistributeEventStepLog stepLog, String errorMessage) {
        int currentRetry = stepLog.getRetryCount();
        
        if (currentRetry < stepLog.getMaxRetry()) {
            // 還有重試機會 - 刪除重新插入
            executeRetryByReinsert(stepLog, errorMessage);
        } else {
            // 重試次數耗盡,觸發回滾
            triggerRollback(stepLog, errorMessage);
        }
    }
    
    /**
     * 執行重試邏輯:刪除重新插入
     */
    private void executeRetryByReinsert(DistributeEventStepLog stepLog, String errorMessage) {
        try {
            // 1. 更新狀態為 RETRYING
            stepLogMapper.updateStatus(stepLog.getId(), "RETRYING", errorMessage);
            
            // 2. 計算下次重試時間(指數退避)
            long delaySeconds = (long) Math.pow(2, stepLog.getRetryCount() + 1) * 30;
            Timestamp nextRetryTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);
            
            // 3. 刪除當前記錄
            stepLogMapper.deleteById(stepLog.getId());
            
            // 4. 重新插入新記錄
            DistributeEventStepLog newStepLog = DistributeEventStepLog.builder()
                .stepCode(stepLog.getStepCode())
                .jobCode(stepLog.getJobCode())
                .consumerNode(stepLog.getConsumerNode())
                .executionKey(stepLog.getExecutionKey())
                .businessKey(stepLog.getBusinessKey())
                .execStatus("PENDING")
                .retryCount(stepLog.getRetryCount() + 1)
                .maxRetry(stepLog.getMaxRetry())
                .nextRetryTime(nextRetryTime)
                .payload(stepLog.getPayload())
                .rollbackData(stepLog.getRollbackData())
                .build();
                
            stepLogMapper.insert(newStepLog);
            
            log.info("執行重試安排成功,第{}次重試,下次執行時間: {}", 
                newStepLog.getRetryCount(), nextRetryTime);
                
        } catch (Exception e) {
            log.error("執行重試安排失敗: {}", stepLog.getStepCode(), e);
        }
    }
    
    /**
     * 處理通知失敗 - 指數退避重試
     */
    public void handleNotificationFailure(DistributeEventStepLog stepLog, String notifyErrorMessage) {
        int currentNotifyRetry = stepLog.getNotifyRetryCount();
        
        if (currentNotifyRetry < stepLog.getMaxNotifyRetry()) {
            // 還有通知重試機會 - 指數退避
            scheduleNotificationRetry(stepLog, notifyErrorMessage);
        } else {
            // 通知重試次數耗盡
            markNotificationFailed(stepLog, notifyErrorMessage);
        }
    }
    
    /**
     * 安排通知重試(指數退避)
     */
    private void scheduleNotificationRetry(DistributeEventStepLog stepLog, String notifyErrorMessage) {
        try {
            int nextNotifyRetryCount = stepLog.getNotifyRetryCount() + 1;
            
            // 指數退避算法: 2^n * 60秒
            long delaySeconds = (long) Math.pow(2, nextNotifyRetryCount) * 60;
            Timestamp nextNotifyTime = new Timestamp(System.currentTimeMillis() + delaySeconds * 1000);
            
            stepLogMapper.updateNotificationForRetry(
                stepLog.getId(),
                nextNotifyRetryCount,
                nextNotifyTime,
                notifyErrorMessage
            );
            
            log.info("通知重試安排成功,第{}次重試,下次通知時間: {}", 
                nextNotifyRetryCount, nextNotifyTime);
                
        } catch (Exception e) {
            log.error("通知重試安排失敗: {}", stepLog.getStepCode(), e);
        }
    }
    
    /**
     * 回滾機制:刪除重新開始
     */
    private void triggerRollback(DistributeEventStepLog stepLog, String errorMessage) {
        try {
            // 1. 標記為執行失敗
            stepLogMapper.updateStatus(stepLog.getId(), "FAILED", errorMessage);
            
            // 2. 執行回滾操作(刪除相關數據)
            executeRollbackAction(stepLog);
            
            // 3. 標記回滾完成
            stepLogMapper.markAsRollback(stepLog.getId());
            
            // 4. 通知業務端失敗
            notifyBusinessFailure(stepLog);
            
        } catch (Exception e) {
            log.error("回滾執行失敗: {}", stepLog.getStepCode(), e);
            alertService.sendRollbackFailureAlert(stepLog, e);
        }
    }
    
    // 執行具體的回滾操作
    private void executeRollbackAction(DistributeEventStepLog stepLog) {
        String rollbackData = stepLog.getRollbackData();
        if (StringUtils.isBlank(rollbackData)) return;
        
        RollbackSnapshot snapshot = JSON.parseObject(rollbackData, RollbackSnapshot.class);
        
        switch (snapshot.getStepType()) {
            case "PAGE_CREATE":
                pageService.deleteById(snapshot.getEntityId());
                break;
            case "DATA_COPY":
                dataService.deleteBatch(snapshot.getDataIds());
                break;
            case "PERMISSION_GRANT":
                permissionService.revoke(snapshot.getPermissionIds());
                break;
        }
    }
}

總結

主要特點
流程清晰: 業務拆解 → HTTP分發 → 定時消費 → 狀態同步
冪等性簡單: 一天一次執行保證,避免重複處理
重試機制: 最多3次,指數退避,失敗後智能回滾
回滾策略: 刪除重新開始,簡單有效
併發控制: 2線程限流,避免資源爭搶
狀態追蹤: 完整的執行鏈路監控

核心優勢
🎯 設計精簡: 去除複雜的多重冪等性策略,採用基於日期的簡單方案
💡 實用性強: 回滾即刪除,符合業務實際需求
🔧 易於維護: 清晰的代碼結構和執行流程
性能優化: 合理的併發控制和索引設計
🛡️ 可靠性高: 完善的重試和回滾機制

user avatar segmentfault 头像 aipaobudeshoutao 头像 jianghushinian 头像 boxuegu 头像 changqingdezi 头像 guangmingleiluodebaomihua 头像 aishang 头像 benfangdechaofen 头像 russell221 头像 litongjava 头像 daixiaoyulq 头像 stormjun94 头像
点赞 27 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.