动态

详情 返回 返回

DolphinDB定時作業教程 - 动态 详情

DolphinDB提供的定時作業(scheduled job)功能,可以讓系統在指定的時間以指定的頻率自動執行作業。當我們需要數據庫定時自動執行一些腳本進行計算分析(譬如每日休市後分鍾級的K線計算、每月統計報表生成)、數據庫管理(譬如數據庫備份、數據同步)、操作系統管理(譬如過期的日誌文件刪除)等工作時,可以用這個功能來實現。

定時作業用一個函數來表示,這給了作業定義極大的靈活性。凡是能用函數來表示的工作,都可以作為定時任務來運行。定時作業通過scheduleJob函數提交,並按設定時間在後台運行。作業創建後,作業相關定義信息序列化保存到數據節點的磁盤文件。節點重啓後,系統會反序列化並加載定時作業。定時作業每次運行的結果也會保存到節點磁盤上,我們可以使用getJobMessage和getJobReturn來查看每個作業的運行日誌和返回值。

1.功能介紹

1.1 創建定時作業

創建定時作業使用函數scheduleJob。作業創建後,系統會序列化作業定義信息並保存到文件<homeDir>/sysmgmt/jobEditlog.meta。函數語法如下:

scheduleJob(jobId, jobDesc, jobFunc, scheduledTime, startDate, endDate, frequency, [days])

其中要注意的是:

  • 參數jobFunc(作業函數)是一個不帶參數的函數。
  • 參數scheduledTime(預定時間)可以是minute類型的標量或向量。當它為向量時,注意相鄰2個時間點的間隔不能小於30分鐘。
  • 函數返回值是定時作業的作業ID。如果輸入的jobId與已有定時作業的作業ID不重複,系統返回輸入的jobId。否則在jobId後面添加當前日期,"000",“001”等作為後綴,直到產生唯一的作業ID。

眾所周知,執行一個函數必須提供函數需要的所有參數。在函數化編程中,一個提供了所有參數的函數,實際上就是原函數的一個特殊的部分應用(Partial Application),也即一個不帶參數的函數。在DolphinDB中,我們用花括號{}來表示部分應用。

自定義函數、內置函數、插件函數、函數視圖(Function View)和模塊中的函數等各類函數都可以作為作業函數。因此,定時作業幾乎能做任何事情。比如用自定義函數、插件函數等做計算分析,用內置函數run運行一個腳本文件,用shell函數執行操作系統管理等等。下面例子中的作業調用了一個自定義函數getMaxTemperature,用於計算前一天某個設備温度指標的最大值,參數是設備編號,創建作業時,用getMaxTemperature{1}給設備編號賦值1,定時作業在每天0點執行。

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,ts between (today()-1).datetime():(today().datetime()-1)
    return  maxTemp
}
scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

下面的例子執行了一個腳本文件。作業函數用了run函數,並指定腳本文件monthlyJob.dos的完整路徑作為參數,作業在2020年的每月1號0點執行。

scheduleJob(`monthlyJob, "Monthly Job 1", run{"/home/DolphinDB/script/monthlyJob.dos"}, 00:00m, 2020.01.01, 2020.12.31, 'M', 1);

下面的例子執行了一個刪除日誌文件的操作系統命令。作業函數用了shell函數,並指定具體的命令“rm /home/DolphinDB/server/dolphindb.log”作為參數。作業在每週的週日1點執行。

scheduleJob(`weeklyjob, "rm log", shell{"rm /home/DolphinDB/server/dolphindb.log"}, 1:00m, 2020.01.01, 2021.12.31, 'W', 6);

在實際應用中,用函數參數、函數返回值進行輸入輸出有點不太方便,我們更常用的做法是從數據庫中取出數據,計算後把結果再存到數據庫中。下面的例子是在每日休市後,計算分鐘級的K線。自定義函數computeK中,行情數據從分佈式數據庫表trades中取出,計算後存入分佈式數據庫表OHLC中。作業的frequency為"W"、days為[1,2,3,4,5],scheduledTime為15:00m,表示作業在每週一到週五的15點執行。

def computeK(){
    barMinutes = 7
    sessionsStart=09:30:00.000 13:00:00.000
    OHLC =  select first(price) as open, max(price) as high, min(price) as low,last(price) as close, sum(volume) as volume 
        from loadTable("dfs://stock","trades")
        where time >today() and time < now()
        group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart
    append!(loadTable("dfs://stock","OHLC"),OHLC)
}
scheduleJob(`kJob, "7 Minutes", computeK, 15:00m, 2020.01.01, 2021.12.31, 'W', [1,2,3,4,5]);

1.2 查詢定時作業

查詢節點中的定時作業定義信息可以用getScheduledJobs。函數語法如下:

getScheduledJobs([jobIdPattern])

其中參數jobIdPattern是表示作業ID或作業ID模式的字符串。它支持通配符“%”和“?”。函數的返回值是表格形式的定時作業信息。若jobId沒有指定,則返回所有作業。

系統會對每次作業的執行情況進行保存,包括定時作業的運行日誌和返回值。運行日誌保存在jodId.msg 文件中,定時作業的返回值保存在jobId.object文件中。這些文件都保存在目錄<homeDir>/batchJobs下。我們可以分別使用getJobMessage和getJobReturn來查看每個作業的運行日誌和返回值。但要注意jobID的取值,一是創建作業時,如前所述,若jobId與已有定時作業的作業ID重複,系統返回的不是輸入的jobId;二是對會多次執行的作業,每次執行定時作業時,作業ID是不一樣的。因此我們需要用getRecentJobs來查看已完成的定時作業。比如我們定義如下定時作業:

def foo(){
    print "test scheduled job at"+ now()
    return now()
}
scheduleJob(`testJob, "foo", foo, 17:00m+0..2*30, today(), today(), 'D');

運行getRecentJobs()後得到如下信息:

jobId                jobDesc    startTime                endTime
------              ------- ----------------------- ----------------------
testJob                foo1    2020.02.14T17:00:23.636    2020.02.14T17:00:23.639
testJob20200214        foo1    2020.02.14T17:30:23.908    2020.02.14T17:30:23.910
testJob20200214000  foo1    2020.02.14T18:00:23.148    2020.02.14T18:00:26.749

從中我們看到,第一次執行的作業ID是“testJob”,第二次是“testJob20200214”...每次都有變化。如下所示,我們可用getJobMessagegetJobReturn查看了第3次的執行情況:

>getJobMessage(`testJob20200214000);
2020-02-14 18:00:23.148629 Start the job [testJob20200214000]: foo
2020-02-14 18:00:23.148721 test the scheduled job at 2020.02.14T18:00:23.148
2020-02-14 18:00:26.749111 The job is done.

>getJobReturn(`testJob20200214000);
2020.02.14T18:00:23.148

1.3 刪除定時作業

刪除定時作業用函數deleteScheduledJob。語法如下:

deleteScheduledJob(jobId)

參數jobId是作業ID。刪除前可用getScheduledJobs得到想要刪除作業的作業ID。

2.定時作業運行時的權限

用户創建定時作業時以什麼身份登錄,執行定時作業時就以這個身份運行。因此用户創建定時作業時,需要確保用户有權限訪問用到的資源。比如登錄用户不是授權用户,就不能訪問集羣的分佈式功能,若用到了集羣的分佈式功能,執行時就會出錯。以下例子中用户guestUser1沒有訪問DFS權限:

def foo1(){
    print "Test scheduled job "+ now()
    cnt=exec count(*) from loadTable("dfs://FuturesContract","tb")
    print "The count of table is "+cnt
    return cnt
}
login("guestUser1","123456")
scheduleJob(`guestGetDfsjob, "dfs read", foo1, [12:00m, 21:03m, 21:45m], 2020.01.01, 2021.12.31, "D");

作業執行後,用getJobMessage(`guestGetDfsjob)查詢,如下所示,定時作業沒有權限去讀取分佈式數據庫:

2020-02-14 21:03:23.193039 Start the job [guestGetDfsjob]: dfs read
2020-02-14 21:03:23.193092 Test the scheduled job at 2020.02.14T21:03:23.193
2020-02-14 21:03:23.194914 Not granted to read table dfs://FuturesContract/tb

因此,若要遠程執行控制節點的某些功能,訪問集羣中的某個分佈式表,需要先以管理員(admin)或其他授權用户身份登錄。具體可以通過login函數來完成。

從所示日誌中也可以發現,訪問分佈式表後的語句沒有執行,也就是説作業執行過程中若遇到錯誤,執行會中斷。為了防止出現異常而停止執行後續的腳本,可使用try-catch語句俘獲異常。運行過程中需要輸出運行信息,可以用print打印,輸出都會記錄在jodId.msg日誌文件中。

3.定時作業的序列化

定時作業在創建後,系統會把創建用户(userID)、作業的ID、描述信息、起始時間、作業頻率、作業的定義等持久化保存。存儲路徑為<homeDir>/sysmgmt/jobEditlog.meta。作業用一個DolphinDB的函數來表示。函數的定義包括了一系列語句,這些語句又會調用其他函數和一些全局類對象,譬如共享變量(shared variable)。共享變量序列化時用名稱來表示。反序列化時,共享變量必須存在,否則會失敗。作業函數或其依賴的函數根據是否經過編譯可以分兩類:經過編譯的函數包括內置函數和插件函數和腳本函數包括自定義函數、函數視圖和模塊中的函數等。這兩類函數的序列化方法有所不同,下面分別進行説明。

3.1 經過編譯的函數的序列化

對經過編譯的函數的序列化,只序列化函數名稱和模塊名稱。反序列化的時候,會在系統中搜索這些模塊及函數,若搜索不到,就會失敗。所以定時作業中若用到了插件函數,就需要在反序列化之前預先加載。系統與定時作業相關組件資源的初始化順序依次是:系統級初始化腳本(dolphindb.dos),函數視圖(function view)、用户級啓動腳本(startup.dos)和定時作業。定時作業在啓動腳本執行後加載。如下例所示,在作業函數jobDemo中用到了odbc插件:

use odbc
def jobDemo(){
    conn = odbc::connect("dsn=mysql_factorDBURL");
}
scheduleJob("job demo","example of init",jobDemo,15:48m, 2019.01.01, 2020.12.31, 'D')

但odbc插件在系統啓動時沒有加載,所以讀取定時作業的時候,因無法識別這個函數,輸出下列日誌後退出系統。

<ERROR>:Failed to unmarshall the job [job demo]. Failed to deserialize assign statement.. Invalid message format

在啓動腳本中加入下列代碼加載odbc插件後,系統即啓動成功。

loadPlugin("plugins/odbc/odbc.cfg")

3.2 腳本函數的序列化

腳本函數會序列化函數參數以及函數定義的每一個語句。語句中若又包含了依賴的腳本函數,也會序列化這些依賴函數的定義。

創建定時作業後,若這些腳本函數被刪除或被修改了,或它依賴的腳本函數被修改,不影響定時作業運行。若希望定時作業按新的函數執行,就需要先刪除定時作業、然後重新創建定時作業,否則會運行舊的序列化的函數。其中要注意關聯的函數也需要重新定義。下面舉例説明:

  • 例子1,作業函數在創建定時作業後被修改,如下所示,作業函數f在創建scheduleJob後被重新定義:
def f(){
    print "The old function is called " 
}
scheduleJob(`test, "f", f, 11:05m, today(), today(), 'D');
go
def f(){
    print "The new function is called " 
}

定時作業執行後,用getJobMessage(`test)得到如下信息,從中看到定時作業執行的還是舊的自定義函數。

2020-02-14 11:05:53.382225 Start the job [test]: f
2020-02-14 11:05:53.382267 The old function is called 
2020-02-14 11:05:53.382277 The job is done.
  • 例子2,作業函數在創建定時作業後依賴的函數被修改,如下所示,作業函數是函數視圖fv,fv調用了函數foo,在scheduleJob後,函數foo重新被定義,函數視圖也重新生成:
def foo(){
    print "The old function is called " 
}
def fv(){
    foo()
}
addFunctionView(fv)  

scheduleJob(`testFvJob, "fv", fv, 11:36m, today(), today(), 'D');
go
def foo(){
    print "The new function is called " 
}
dropFunctionView(`fv)
addFunctionView(fv) 

定時作業執行後,然後getJobMessage(`testFvJob)得到如下信息,從中看到定時作業執行的還是舊的函數。

2020-02-14 11:36:23.069892 Start the job [testFvJob]: fv
2020-02-14 11:36:23.069939 The old function is called 
2020-02-14 11:36:23.069951 The job is done.

用模塊函數也是如此。我們創建一個模塊printLog.dos,其內容如下:

module printLog
def printLogs(logText){
    writeLog(string(now()) + " : " + logText)
    print "The old function is called"
}

然後創建一個定時作業調用這個printLog::printLogs函數:

use printLog
def f5(){
    printLogs("test my log")
}
scheduleJob(`testModule, "f5", f5, 13:32m, today(), today(), 'D');

在運行定時作業之前修改模塊如下:

module printLog
def printLogs(logText){
    writeLog(string(now()) + " : " + logText)
    print "The new function is called"
}

定時作業執行後,然後getJobMessage(`testModule)得到如下信息,從中看到定時作業執行的還是舊的函數。

2020-02-14 13:32:22.870855 Start the job [testModule]: f5
2020-02-14 13:32:22.871097 The old function is called
2020-02-14 13:32:22.871106 The job is done.

4.定時運行腳本文件

在創建定時作業時,若作業函數是run一個腳本文件,因為序列化時只保存了文件名,沒有保存文件內容,所以需要把依賴的自定義函數都放到腳本文件中,否則會因為找不到自定義的函數而執行失敗。比如創建一個腳本文件testjob.dos,文件內容如下:

foo()

然後在DolphinDB GUI中執行下列腳本:

def foo(){
    print ("Hello world!")
}
run "/home/xjqian/testjob.dos"

結果顯示能正常執行:

2020.02.14 13:47:00.992: executing code (line 104-108)...
Hello world!

再創建定時作業run這個腳本文件,代碼如下所示:

scheduleJob(`dailyfoofile1, "Daily Job 1", run {"/home/xjqian/testjob.dos"}, 16:14m, 2020.01.01, 2020.12.31, 'D');

但運行這個作業時卻發生瞭如下異常:

Exception was raised when running the script [/home/xjqian/testjob.dos]:Syntax Error: [line #3] Cannot recognize the token foo

這是foo函數定義和定時作業執行不在同一個會話(session)中,作業執行時找不到函數定義的緣故。把foo()的定義放到腳本文件中,修改testjob.dos文件內容如下:

def foo(){
    print ("Hello world!")
}
foo()

再重新創建定時作業運行這個腳本文件,就能順利完成。

5.小結和展望

常見故障及排除

  • 作業函數引用了共享變量,但是作業加載前沒有定義該共享變量。一般建議在用户的啓動腳本中定義該共享變量。
  • 作業函數引用了插件中的函數,但是作業加載前沒有加載該插件。一般建議在用户的啓動腳本中定義加載該插件。
  • 定時運行一個腳本文件,找不到依賴的函數。腳本文件必須包含依賴的自定義函數。
  • 創建定時作業的用户沒有訪問分佈式數據庫表的權限。授權該用户訪問相應數據庫的權限。
  • 在啓動腳本中使用函數scheduleJob、 getScheduledJobs和deleteScheduledJob時拋出異常。節點啓動時,定時作業的初始化在啓動腳本之後,因此不能在啓動腳本中使用跟定時作業相關的功能

在某些罕見的情況下,可能出現在系統重啓時,發生定時作業加載失敗,甚至導致系統無法啓動的情況。尤其是版本升級的時候,內置函數、插件函數等函數接口可能會有變化從而導致作業無法加載,或者出現一些兼容性bug導致系統重啓失敗。因此,我們開發時需要保留定義定時作業的腳本。若因定時任務導致系統無法啓動,可以先刪除定時作業的序列化文件<homeDir>/sysmgmt/jobEditlog.meta,在系統重啓後再重新創建這些定時作業。

後續功能開發

  • 增加瀏覽作業函數以及依賴的函數的定義的功能。
  • 定義和實現定時作業之間的依賴關係。
user avatar motianlun_5d0766992e67a 头像
点赞 1 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.