以前一直使用 Python 的任務調度庫 APScheduler(支持任務持久化,支持多種存儲方式),但由於沒有找到與它功能和使用方式類似的 Golang 庫,所以模仿 APScheduler 3.x 寫了個簡易版本的 AGScheduler。
AGScheduler
Advanced Golang Scheduler (AGScheduler) 是一款適用於 Golang 的任務調度庫,支持多種調度類型,支持動態更改和持久化作業,支持作業隊列,支持作業結果回收,支持事件監聽,支持遠程調用,支持集羣
特性
-
支持三種調度類型
- [x] 一次性執行
- [x] 間隔執行
- [x] Cron 式調度
-
支持多種作業存儲方式
- [x] Memory (不支持集羣 HA 模式)
- [x] GORM (任何 GORM 支持的 RDBMS 都能運行)
- [x] Redis
- [x] MongoDB
- [x] etcd
- [x] Elasticsearch
-
支持多種作業隊列
- [x] Memory (不支持集羣模式)
- [x] NSQ
- [x] RabbitMQ
- [x] Redis
- [x] MQTT (不支持歷史作業)
- [x] Kafka
-
支持多種作業結果後端
- [x] Memory (不支持集羣模式)
- [x] GORM (任何 GORM 支持的 RDBMS 都能運行)
- [x] MongoDB
-
支持事件監聽
- [x] 調度器事件
- [x] 作業事件
-
支持遠程調用
- [x] gRPC
- [x] HTTP
-
支持集羣
- [x] 遠程工作節點
- [x] 調度器高可用 (實驗性)
架構
鏈接
https://github.com/agscheduler/agscheduler
安裝
go get -u github.com/agscheduler/agscheduler
使用
package main
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/agscheduler/agscheduler"
"github.com/agscheduler/agscheduler/stores"
)
func printMsg(ctx context.Context, j agscheduler.Job) (result string) {
slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
return
}
func main() {
agscheduler.RegisterFuncs(
agscheduler.FuncPkg{Func: printMsg},
)
scheduler := &agscheduler.Scheduler{}
store := &stores.MemoryStore{}
scheduler.SetStore(store)
job1 := agscheduler.Job{
Name: "Job1",
Type: agscheduler.JOB_TYPE_INTERVAL,
Interval: "2s",
Timezone: "UTC",
Func: printMsg,
Args: map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
}
job1, _ = scheduler.AddJob(job1)
slog.Info(fmt.Sprintf("%s.\n\n", job1))
job2 := agscheduler.Job{
Name: "Job2",
Type: agscheduler.JOB_TYPE_CRON,
CronExpr: "*/1 * * * *",
Timezone: "Asia/Shanghai",
FuncName: "main.printMsg",
Args: map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
}
job2, _ = scheduler.AddJob(job2)
slog.Info(fmt.Sprintf("%s.\n\n", job2))
job3 := agscheduler.Job{
Name: "Job3",
Type: agscheduler.JOB_TYPE_DATETIME,
StartAt: "2023-09-22 07:30:08",
Timezone: "America/New_York",
Func: printMsg,
Args: map[string]any{"arg8": "8", "arg9": "9"},
}
job3, _ = scheduler.AddJob(job3)
slog.Info(fmt.Sprintf("%s.\n\n", job3))
jobs, _ := scheduler.GetAllJobs()
slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))
scheduler.Start()
select {}
}
註冊函數
由於 golang 無法序列化函數,所以scheduler.Start()之前需要使用RegisterFuncs註冊函數
隊列
mq := &queues.MemoryQueue{}
broker := &agscheduler.Broker{
Queues: map[string]agscheduler.QueuePkg{
"default": {
Queue: mq,
Workers: 2,
},
},
}
scheduler.SetBroker(broker)
結果回收
backend := &backends.MemoryBackend{}
recorder := &agscheduler.Recorder{Backend: backend}
scheduler.SetRecorder(recorder)
job, _ = scheduler.AddJob(job)
records, _ := recorder.GetRecords(job.Id)
事件監聽
func jobCallback(ep agscheduler.EventPkg) {
slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}
......
listener := &agscheduler.Listener{
Callbacks: []agscheduler.CallbackPkg{
{
Callback: jobCallback,
Event: agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
},
},
}
scheduler.SetListener(listener)
gRPC
// Server
grservice := services.GRPCService{
Scheduler: scheduler,
Address: "127.0.0.1:36360",
// PasswordSha2: "xxxxxx",
}
grservice.Start()
// Client
conn, _ := grpc.NewClient("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
client := pb.NewSchedulerClient(conn)
client.AddJob(ctx, job)
HTTP
// Server
hservice := services.HTTPService{
Scheduler: scheduler,
Address: "127.0.0.1:36370",
// PasswordSha2: "xxxxxx",
}
hservice.Start()
// Client
mJob := map[string]any{...}
bJob, _ := json.Marshal(mJob)
resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))
集羣
// Main Node
cnMain := &agscheduler.ClusterNode{
Endpoint: "127.0.0.1:36380",
EndpointGRPC: "127.0.0.1:36360",
EndpointHTTP: "127.0.0.1:36370",
Queue: "default",
}
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()
// Worker Node
cnNode := &agscheduler.ClusterNode{
EndpointMain: "127.0.0.1:36380",
Endpoint: "127.0.0.1:36381",
EndpointGRPC: "127.0.0.1:36361",
EndpointHTTP: "127.0.0.1:36371",
Queue: "worker",
}
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()
集羣 HA (高可用,實驗性)
// HA 需要滿足以下條件:
//
// 1. 集羣中 HA 節點的數量必須為奇數
// 2. 所有 HA 節點都需要連接到同一個存儲(不包含 MemoryStore)
// 3. ClusterNode 的 Mode 屬性需要設置為 `HA`
// 4. HA 主節點必須先啓動
// Main HA Node
cnMain := &agscheduler.ClusterNode{..., Mode: "HA"}
// HA Node
cnNode1 := &agscheduler.ClusterNode{..., Mode: "HA"}
cnNode2 := &agscheduler.ClusterNode{..., Mode: "HA"}
// Worker Node
cnNode3 := &agscheduler.ClusterNode{...}
Base API
| gRPC Function | HTTP Method | HTTP Path |
|---|---|---|
| GetInfo | GET | /info |
| GetFuncs | GET | /funcs |
Scheduler API
| gRPC Function | HTTP Method | HTTP Path |
|---|---|---|
| AddJob | POST | /scheduler/job |
| GetJob | GET | /scheduler/job/:id |
| GetAllJobs | GET | /scheduler/jobs |
| UpdateJob | PUT | /scheduler/job |
| DeleteJob | DELETE | /scheduler/job/:id |
| DeleteAllJobs | DELETE | /scheduler/jobs |
| PauseJob | POST | /scheduler/job/:id/pause |
| ResumeJob | POST | /scheduler/job/:id/resume |
| RunJob | POST | /scheduler/job/run |
| ScheduleJob | POST | /scheduler/job/schedule |
| Start | POST | /scheduler/start |
| Stop | POST | /scheduler/stop |
Broker API
| gRPC Function | HTTP Method | HTTP Path |
|---|---|---|
| GetQueues | GET | /broker/queues |
Recorder API
| gRPC Function | HTTP Method | HTTP Path |
|---|---|---|
| GetRecords | GET | /recorder/records/:job_id |
| GetAllRecords | GET | /recorder/records |
| DeleteRecords | DELETE | /recorder/records/:job_id |
| DeleteAllRecords | DELETE | /recorder/records |
Cluster API
| gRPC Function | HTTP Method | HTTP Path |
|---|---|---|
| GetNodes | GET | /cluster/nodes |
示例
完整示例
開發
# 克隆代碼
git clone git@github.com:agscheduler/agscheduler.git
# 工作目錄
cd agscheduler
# 安裝依賴
make install
# 啓動 CI 服務
make up-ci-services
# 運行檢查
make check-all
Cli
cargo install agscheduler-cli
Web
docker run --rm -p 8080:80 ghcr.io/agscheduler/agscheduler-web:latest
致謝
APScheduler
simple-raft