最近我們作為一個sass系統的客户,我們需要sass系統服務商給我實時同步數據,對方要求我們提供方案,英文實時同步,我這裏採用了基於mysql的binlog來做,總共分為2部分:一個是基於某個商户的配置把現在有數據的表發送給作為商户的我們,返回時間戳和時間節點或者binlogId之類的;二是他實現對binlog的解析然後過濾出來,是某個商户的數據,是需要推送的表,推送的表要推送那些字段,過濾出來後,形成一個待推送數據推送給sass的商户;三最好根據用要同步的表做一個count統計表的數據,用於和商户同步時的數據驗證。


初步完整的想法如下:

1:基於某個商户配置商户的數據修改事件地址接口;

2:配置商户的用於同步的推送數據的接口地址;

3:配置基於商户的要同步的表的名稱以及對應表下面的各個要同步的字段;字段是屬性這些,我初步考慮還是線下同步和客户商量;

{
  "merchantId": "merchant_001",
  "syncConfig": {
    "tables": [
      {
        "tableName": "tableA",
        "fields": ["fieldA1", "fieldA2"],
        "enabled": true,
        "description": "用户表"
      },
      {
        "tableName": "tableB",
        "fields": ["fieldB1", "fieldB2", "fieldB3", "fieldB4"],
        "enabled": true,
        "description": "訂單表"
      },
      {
        "tableName": "tableC",
        "fields": ["fieldC1", "fieldC2", "fieldC3", "fieldC4"],
        "enabled": false,
        "description": "商品表"
      },
      {
        "tableName": "tableD",
        "fields": ["fieldD1", "fieldD2", "fieldD3", "fieldD4"],
        "enabled": true,
        "description": "庫存表"
      }
    ]
  }
}

當然也可以有更復雜的模式:

{
  "merchantId": "merchant_001",
  "merchantName": "測試商户",
  "version": "1.0.0",
  "syncConfig": {
    "tables": [
      {
        "tableName": "tableA",
        "alias": "users",
        "description": "用户表",
        "enabled": true,
        "priority": 1,
        "syncMode": "incremental",
        "primaryKey": "id",
        "fields": [
          {
            "name": "fieldA1",
            "type": "string",
            "required": true,
            "description": "用户ID"
          },
          {
            "name": "fieldA2",
            "type": "string",
            "required": false,
            "description": "用户名",
            "transform": "trim"
          }
        ],
        "filters": {
          "where": "status = 'active'",
          "dateRange": {
            "field": "created_at",
            "from": "2024-01-01"
          }
        },
        "schedule": {
          "frequency": "daily",
          "time": "02:00:00"
        }
      },
      {
        "tableName": "tableB",
        "alias": "orders",
        "description": "訂單表",
        "enabled": true,
        "priority": 2,
        "syncMode": "full",
        "primaryKey": "order_id",
        "fields": [
          {
            "name": "fieldB1",
            "type": "int",
            "required": true,
            "description": "訂單ID"
          },
          {
            "name": "fieldB2",
            "type": "decimal",
            "required": true,
            "description": "訂單金額"
          },
          {
            "name": "fieldB3",
            "type": "datetime",
            "required": true,
            "description": "創建時間"
          },
          {
            "name": "fieldB4",
            "type": "string",
            "required": false,
            "description": "訂單狀態"
          }
        ],
        "schedule": {
          "frequency": "hourly"
        }
      }
    ],
    "globalSettings": {
      "batchSize": 1000,
      "timeout": 300,
      "retryTimes": 3,
      "errorHandling": "skip"
    }
  }
}

4:客户對數據修改事件接收後保存不處理,交給定時任務處理。定時任務處理前,先有個判斷變量,開啓了才處理事件,沒有開啓就不處理接收的事件。

5:請求Sass服務器同步接口,服務商按照配置把所有的數據查詢出來同步返回binlogId或者時間戳啥的,異步post到我們的同步接口上,同步完成後,我設置了對應的binlogId或者時間戳,然後再開啓修開時間出來,定時任務就從對應的binlogId或者時間戳開始往後執行。