博客 / 詳情

返回

適用於 Golang 的任務調度程序 AGScheduler

以前一直使用 Python 的任務調度庫 APScheduler(支持任務持久化,支持多種存儲方式),但由於沒有找到與它功能和使用方式類似的 Golang 庫,所以模仿 APScheduler 3.x 寫了個簡易版本的 AGScheduler。

AGScheduler

Advanced Golang Scheduler (AGScheduler) 是一款適用於 Golang 的任務調度庫,支持多種調度類型,支持動態更改和持久化作業,支持作業隊列,支持作業結果回收,支持事件監聽,支持遠程調用,支持集羣

特性

  • 支持三種調度類型

    • [x] 一次性執行
    • [x] 間隔執行
    • [x] Cron 式調度
  • 支持多種作業存儲方式

    • [x] Memory (不支持集羣 HA 模式)
    • [x] GORM (任何 GORM 支持的 RDBMS 都能運行)
    • [x] Redis
    • [x] MongoDB
    • [x] etcd
    • [x] Elasticsearch
  • 支持多種作業隊列

    • [x] Memory (不支持集羣模式)
    • [x] NSQ
    • [x] RabbitMQ
    • [x] Redis
    • [x] MQTT (不支持歷史作業)
    • [x] Kafka
  • 支持多種作業結果後端

    • [x] Memory (不支持集羣模式)
    • [x] GORM (任何 GORM 支持的 RDBMS 都能運行)
    • [x] MongoDB
  • 支持事件監聽

    • [x] 調度器事件
    • [x] 作業事件
  • 支持遠程調用

    • [x] gRPC
    • [x] HTTP
  • 支持集羣

    • [x] 遠程工作節點
    • [x] 調度器高可用 (實驗性)

架構

framework.png

鏈接

https://github.com/agscheduler/agscheduler

安裝

go get -u github.com/agscheduler/agscheduler

使用

package main

import (
    "context"
    "fmt"
    "log/slog"
    "time"

    "github.com/agscheduler/agscheduler"
    "github.com/agscheduler/agscheduler/stores"
)

func printMsg(ctx context.Context, j agscheduler.Job) (result string) {
    slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
    return
}

func main() {
    agscheduler.RegisterFuncs(
        agscheduler.FuncPkg{Func: printMsg},
    )

    scheduler := &agscheduler.Scheduler{}

    store := &stores.MemoryStore{}
    scheduler.SetStore(store)

    job1 := agscheduler.Job{
        Name:     "Job1",
        Type:     agscheduler.JOB_TYPE_INTERVAL,
        Interval: "2s",
        Timezone: "UTC",
        Func:     printMsg,
        Args:     map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
    }
    job1, _ = scheduler.AddJob(job1)
    slog.Info(fmt.Sprintf("%s.\n\n", job1))

    job2 := agscheduler.Job{
        Name:     "Job2",
        Type:     agscheduler.JOB_TYPE_CRON,
        CronExpr: "*/1 * * * *",
        Timezone: "Asia/Shanghai",
        FuncName: "main.printMsg",
        Args:     map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
    }
    job2, _ = scheduler.AddJob(job2)
    slog.Info(fmt.Sprintf("%s.\n\n", job2))

    job3 := agscheduler.Job{
        Name:     "Job3",
        Type:     agscheduler.JOB_TYPE_DATETIME,
        StartAt:  "2023-09-22 07:30:08",
        Timezone: "America/New_York",
        Func:     printMsg,
        Args:     map[string]any{"arg8": "8", "arg9": "9"},
    }
    job3, _ = scheduler.AddJob(job3)
    slog.Info(fmt.Sprintf("%s.\n\n", job3))

    jobs, _ := scheduler.GetAllJobs()
    slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))

    scheduler.Start()

    select {}
}

註冊函數

由於 golang 無法序列化函數,所以 scheduler.Start() 之前需要使用 RegisterFuncs 註冊函數

隊列

mq := &queues.MemoryQueue{}
broker := &agscheduler.Broker{
    Queues: map[string]agscheduler.QueuePkg{
        "default": {
            Queue:   mq,
            Workers: 2,
        },
    },
}

scheduler.SetBroker(broker)

結果回收

backend := &backends.MemoryBackend{}
recorder := &agscheduler.Recorder{Backend: backend}

scheduler.SetRecorder(recorder)

job, _ = scheduler.AddJob(job)
records, _ := recorder.GetRecords(job.Id)

事件監聽

func jobCallback(ep agscheduler.EventPkg) {
    slog.Info(fmt.Sprintf("Event code: `%d`, job `%s`.\n\n", ep.Event, ep.JobId))
}

......

listener := &agscheduler.Listener{
    Callbacks: []agscheduler.CallbackPkg{
        {
            Callback: jobCallback,
            Event:    agscheduler.EVENT_JOB_ADDED | agscheduler.EVENT_JOB_DELETED,
        },
    },
}

scheduler.SetListener(listener)

gRPC

// Server
grservice := services.GRPCService{
    Scheduler: scheduler,
    Address:   "127.0.0.1:36360",
    // PasswordSha2: "xxxxxx",
}
grservice.Start()

// Client
conn, _ := grpc.NewClient("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
client := pb.NewSchedulerClient(conn)
client.AddJob(ctx, job)

HTTP

// Server
hservice := services.HTTPService{
    Scheduler: scheduler,
    Address:   "127.0.0.1:36370",
    // PasswordSha2: "xxxxxx",
}
hservice.Start()

// Client
mJob := map[string]any{...}
bJob, _ := json.Marshal(mJob)
resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))

集羣

// Main Node
cnMain := &agscheduler.ClusterNode{
    Endpoint:     "127.0.0.1:36380",
    EndpointGRPC: "127.0.0.1:36360",
    EndpointHTTP: "127.0.0.1:36370",
    Queue:        "default",
}
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()

// Worker Node
cnNode := &agscheduler.ClusterNode{
    EndpointMain: "127.0.0.1:36380",
    Endpoint:     "127.0.0.1:36381",
    EndpointGRPC: "127.0.0.1:36361",
    EndpointHTTP: "127.0.0.1:36371",
    Queue:        "worker",
}
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()

集羣 HA (高可用,實驗性)

// HA 需要滿足以下條件:
//
// 1. 集羣中 HA 節點的數量必須為奇數
// 2. 所有 HA 節點都需要連接到同一個存儲(不包含 MemoryStore)
// 3. ClusterNode 的 Mode 屬性需要設置為 `HA`
// 4. HA 主節點必須先啓動

// Main HA Node
cnMain := &agscheduler.ClusterNode{..., Mode: "HA"}

// HA Node
cnNode1 := &agscheduler.ClusterNode{..., Mode: "HA"}
cnNode2 := &agscheduler.ClusterNode{..., Mode: "HA"}

// Worker Node
cnNode3 := &agscheduler.ClusterNode{...}

Base API

gRPC Function HTTP Method HTTP Path
GetInfo GET /info
GetFuncs GET /funcs

Scheduler API

gRPC Function HTTP Method HTTP Path
AddJob POST /scheduler/job
GetJob GET /scheduler/job/:id
GetAllJobs GET /scheduler/jobs
UpdateJob PUT /scheduler/job
DeleteJob DELETE /scheduler/job/:id
DeleteAllJobs DELETE /scheduler/jobs
PauseJob POST /scheduler/job/:id/pause
ResumeJob POST /scheduler/job/:id/resume
RunJob POST /scheduler/job/run
ScheduleJob POST /scheduler/job/schedule
Start POST /scheduler/start
Stop POST /scheduler/stop

Broker API

gRPC Function HTTP Method HTTP Path
GetQueues GET /broker/queues

Recorder API

gRPC Function HTTP Method HTTP Path
GetRecords GET /recorder/records/:job_id
GetAllRecords GET /recorder/records
DeleteRecords DELETE /recorder/records/:job_id
DeleteAllRecords DELETE /recorder/records

Cluster API

gRPC Function HTTP Method HTTP Path
GetNodes GET /cluster/nodes

示例

完整示例

開發

# 克隆代碼
git clone git@github.com:agscheduler/agscheduler.git

# 工作目錄
cd agscheduler

# 安裝依賴
make install

# 啓動 CI 服務
make up-ci-services

# 運行檢查
make check-all

Cli

cargo install agscheduler-cli

Web

docker run --rm -p 8080:80 ghcr.io/agscheduler/agscheduler-web:latest

致謝

APScheduler

simple-raft

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.