Stories

Detail Return Return

實測有效|用 SeaTunnel 免費實現 MySQL→Oracle 實時同步,步驟超細 - Stories Detail

要説 MySQL 同步到Oracle的工具,除了傳統的 OGG,應該考慮的就是 ST(SeaTunnel) 了吧,簡直太好用啦 ~

當前生產已穩定運行 4 個月,源端 3 個庫,完美支持!推薦給大家試試~

1. SeaTunnel怎麼用?來看看幫助信息

./bin/seatunnel.sh -h
Usage: seatunnel.sh [options]
  Options:
    --async                         Run the job asynchronously, when the job 
                                    is submitted, the client will 
exit
                                    (default: 
false
)
    -can, --cancel-job              Cancel job by JobId
    --check                         Whether check config (default: 
false
)
    -cj, --close-job                Close client the task will also be closed 
                                    (default: 
true
)
    -cn, --cluster                  The name of cluster
    -c, --config                    Config file
    --decrypt                       Decrypt config file, When both --decrypt 
                                    and --encrypt are specified, only 
                                    --encrypt will take effect (default: 
false
) 
    -m, --master, -e, --deploy-mode SeaTunnel job submit master, support 
                                    [
local
, cluster] (default: cluster)
    --encrypt                       Encrypt config file, when both --decrypt 
                                    and --encrypt are specified, only 
                                    --encrypt will take effect (default: 
false
) 
    --get_running_job_metrics       Gets metrics 
for
 running 
jobs
 (default: 
false
) 
    -h, --
help
                      Show the usage message
    -j, --job-id                    Get job status by JobId
    -l, --list                      list job status (default: 
false
)
    --metrics                       Get job metrics by JobId
    -n, --name                      SeaTunnel job name (default: SeaTunnel)
    -r, --restore                   restore with savepoint by jobId
    -s, --savepoint                 savepoint job by jobId
    --set-job-id                    Set custom job id 
for
 job
    -i, --variable                  Variable substitution, such as -i 
                                    city=beijing, or -i date=20190318.We use 
','
 as separator, when inside 
""
, 
','
 are 
                                    treated as normal characters instead of 
                                    delimiters. For example, -i 
                                    city=
"beijing,shanghai"
. If you want to 
                                    use dynamic parameters, you can use the 
                                    following format: -i date=$(date 
                                    +
"%Y%m%d"
). (default: [])

<u>參考説明:</u>

1)作業提交相關參數

2)作業管理相關參數

2. 怎麼安裝,可以參考之前文章

【數據同步】SeaTunnel初體驗,5000字深入淺出帶你用上Oracle-CDC

3. 日常運維之任務管理

3.1 查看任務:running 表示正在運行的,當然也會看到其他的狀態

./bin/seatunnel.sh -l

3.2 暫停任務

./bin/seatunnel.sh -s 967714059992432641

3.3 再次啓動已暫停的任務

./bin/seatunnel.sh -r 967714059992432641 -c $SEATUNNEL_HOME/config/mysql_virdb_config 

由於恢復的時候未加入後台運行,導致一直前端,直接ctrl+c 退出後,狀態為

PS: 故恢復時候,一定要加入相關的參數,job命名,是否後台運行等

./bin/seatunnel.sh -r 967714059992432641 -c $SEATUNNEL_HOME/config/mysql_virdb_config --async -n job_mysql_virdb

再次查看作業狀態,這就是我們期待的樣子

3.4 取消任務

該命令會取消指定作業,取消作業後,作業會被停止,作業的狀態會變為CANCELED。

支持批量取消作業,可以一次取消多個作業。

被cancel的作業的所有斷點信息都將被刪除,無法通過seatunnel.sh -r 恢復。

./bin/seatunnel.sh -can 967714059992432641

PS: 取消後的狀態,與直接ctrl+c 退出後竟然的相識,若再啓動應該會丟了一部分數據吧

4. SeaTunnel 日誌配置

配置文件:$SEATUNNEL_HOME/config/log4j2.properties

4.1 為每個作業單獨配置日誌文件(重啓其中一個job後生效)

如下配置

rootLogger.appenderRef.file.ref = fileAppender
appender.file.layout.pattern = [%X{ST-JID}] %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n

更改為如下:

rootLogger.appenderRef.file.ref = routingAppender
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n

4.2 SeaTunnel 支持定時刪除舊日誌文件,以避免磁盤空間不足

您可以在 $SEATUNNEL_HOME/config/seatunnel.yaml 配置文件中添加以下配置:
默認配置項如下(時間按分鐘計算,比如1440為1440分鐘)

seatunnel:
engine:
history-job-expire-minutes:1440
telemetry:
logs:
scheduled-deletion-enable:true

<u>説明:</u>

  • history-job-expire-minutes: 設置歷史作業和日誌的保留時間(單位:分鐘)。系統將在指定的時間後自動清除過期的作業信息和日誌文件。
  • scheduled-deletion-enable: 啓用定時清理功能,默認為 true。系統將在作業達到 history-job-expire-minutes 設置的過期時間後自動刪除相關日誌文件。關閉該功能後,日誌將永久保留在磁盤上,需要用户自行管理,否則可能影響磁盤佔用。建議根據需求合理配置。

5. Web UI 查看任務情況

ps:終於等到了,居然有 Web UI界面監控作業情況了,太棒了吧!!!來來來,看下如何配置及訪問

5.1 配置Web UI

配置文件:$SEATUNNEL_HOME/config/seatunnel.yaml,默認配置如下

seatunnel:
engine:
http:
enable-http:true
port:8080

5.2 訪問Web UI 頁面

打開瀏覽器,訪問 http://ip:8080 即可

5.3 同步數據比對

通過最新創建時間及更新時間,以及行數進行比對

6 常見錯誤

6.1 必須配置數據庫名及表名,不然會報錯

2025-04-24 17:59:42,640 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error, 
2025-04-24 17:59:42,641 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report 
in
 https://github.com/apache/seatunnel/issues
2025-04-24 17:59:42,641 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed 
2025-04-24 17:59:42,642 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a sink 
for
 identifier 
'jdbc'
.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:250)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:669)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:592)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:240)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:165)
        ... 2 more
Caused by: org.apache.seatunnel.api.configuration.util.OptionValidationException: ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - There are unconfigured options, the options(
'database'
) are required because [
'generate_sink_sql'
 == 
true
] is 
true
.
        at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:200)
        at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:107)
        at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:47)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:239)
        ... 8 more
2025-04-24 17:59:42,642 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
===============================================================================
Exception 
in
 thread 
"main"
 org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a sink 
for
 identifier 
'jdbc'
.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:250)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:669)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:592)
        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:240)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123)
        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:165)
        ... 2 more
Caused by: org.apache.seatunnel.api.configuration.util.OptionValidationException: ErrorCode:[API-02], ErrorDescription:[Option item validate failed] - There are unconfigured options, the options(
'database'
) are required because [
'generate_sink_sql'
 == 
true
] is 
true
.
        at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:200)
        at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:107)
        at org.apache.seatunnel.api.configuration.util.ConfigValidator.validate(ConfigValidator.java:47)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:239)
        ... 8 more

6.2 報錯2,ID問題

[967787050499571713] 2025-04-24 22:00:16,190 ERROR [.s.e.s.c.CheckpointCoordinator] [hz.main.generic-operation.thread-14] - report error from task
org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.IllegalArgumentException: can't find field [ID]
        at org.apache.seatunnel.api.table.type.SeaTunnelRowType.indexOf(SeaTunnelRowType.java:87)
        at org.apache.seatunnel.api.table.type.SeaTunnelRowType.indexOf(SeaTunnelRowType.java:77)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.createWriter(JdbcSink.java:133)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSink.createWriter(JdbcSink.java:66)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSink.createWriter(MultiTableSink.java:82)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:342)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
        at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$runInternal$0(NotifyTaskRestoreOperation.java:107)
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
        at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:391) ~[seatunnel-starter.jar:2.3.10]
        at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:182) ~[seatunnel-starter.jar:2.3.10]
        at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48) ~[seatunnel-starter.jar:2.3.10]
        at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.10]
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.10]

6.3 已存在會提示

Error Msg = ORA-00955: name is already used by an existing object
        at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:530)
        ... 43 more
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$6(CoordinatorService.java:656)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Web UI的任務展示與服務器查詢有不一致的地方

jobid:967961857958608897 之前暫停過,再啓動後,在web ui存在2條記錄


6.4 可能是數據延遲導致,過幾天再看的,已經沒有

7. 參考文檔

  • 下載地址
    https://seatunnel.apache.org/download/
  • source 源端 mysql-cdc配置
    https://seatunnel.apache.org/zh-CN/docs/2.3.10/connector-v2/s...
  • sink 目標端 oracle 配置
    https://seatunnel.apache.org/zh-CN/docs/2.3.10/connector-v2/s...
  • 為每個作業配置單獨的配置項
    https://seatunnel.apache.org/zh-CN/docs/2.3.10/seatunnel-engi...
user avatar u_15714439 Avatar laoqing Avatar bytebase Avatar renxingdebenma Avatar menglihuaxiangbian Avatar qiyuxuanangdechuangkoutie Avatar sectrend Avatar yansudeshanyang Avatar lanyiyun666 Avatar
Favorites 9 users favorite the story!
Favorites

Add a new Comments

Some HTML is okay.