动态

详情 返回 返回

基於 DMS Dify+Notebook+Airflow 實現 Agent 的一站式開發 - 动态 详情

本文作者:阿里雲數據庫開發專家 陳樞華

背景與挑戰

Dify 作為一款低代碼 AI 應用開發平台,憑藉其直觀的可視化工作流編排能力,極大降低了大模型應用的開發門檻。然而,在實際企業級落地過程中,我們發現其原生能力仍存在兩個關鍵瓶頸:

  • 代碼執行能力受限:Dify 內置的 Sandbox 節點雖支持基礎 Python 代碼執行,但無法安裝自定義 Python 包,難以支撐複雜的業務邏輯、數據處理或算法調用;
  • 缺乏自動化調度機制:Dify 原生架構不支持 Agent 或 Agentic Workflow 的定時觸發、週期性運行與依賴編排,導致其難以融入企業級自動化運維體系。
    這兩個問題嚴重製約了 Dify 在生產環境中的深度應用——尤其當我們希望構建一個具備“感知-決策-執行-反饋”閉環能力的智能 Agent 時,僅靠 Dify 自身往往力不從心。
    為突破這些限制,我們在實踐中探索出一套 “Dify + DMS Notebook + DMS Airflow”三位一體的一站式增強開發架構,有效補齊了 Dify 在執行能力與調度能力上的短板:
    ✅ DMS Notebook:提供完整、可定製的 Python 運行環境,支持第三方庫安裝、交互式開發與複雜邏輯實現,作為 Dify Sandbox 的強力補充;
    ✅ DMS Airflow:作為統一調度引擎,實現對 Dify 工作流、Notebook 腳本或 Agent 任務的定時觸發、依賴管理與可靠執行;
    ✅ DMS 平台集成:實現從開發、調試、部署到調度、監控的全鏈路閉環管理,顯著提升 Dify 在企業場景下的工程化落地能力。
    本文將以一個銷售數據分析機器人的完整開發案例,詳細介紹如何基於 DMS 平台,構建一個可調度、可擴展、可運維的 Agent 系統。

使用 DMS Notebook 擴展 Dify 的代碼執行能力

為什麼需要 Notebook?

在 Dify 中,若需調用 Pandas 進行數據清洗、使用 Prophet 做時間序列預測,或集成企業內部 SDK,Sandbox 節點往往無能為力。而 DMS Notebook 提供了完整的 Python 環境,支持:

  • 自定義 pip 包安裝;
  • 環境變量配置(如 AK/SK、API Key);
  • 異步服務開發(如 FastAPI);
  • 與 VPC 內其他服務安全互通。
    這使其成為 Dify 外部能力擴展的理想“執行單元”。

步驟詳解:構建一個銷售數據分析 API 服務

1.創建 DMS Notebook 會話

  • 進入 DMS 控制枱 > Notebook 會話 > 創建會話;
    圖片
  • 各參數的定義如下:
    image.png
  • 選擇合適的 Python 鏡像版本;
    圖片
  • 在 配置 > 編輯設置 中:
    圖片
  • PyPI 包管理:按 requirements.txt 格式填入依賴(如 pandas, fastapi, uvicorn, nest-asyncio);
  • 環境變量:設置 ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET、大模型 API Key 等;
  • 關鍵配置:
  • fastapi、uvicorn、nest-asyncio庫是必須要安裝的;
  • 設置 資源釋放時間 = 0(防止服務被自動釋放);
  • 設置環境變量 DMS_KERNEL_IDLE_TIMEOUT=0(避免 Jupyter Kernel 因空閒被 kill)。

💡 踩坑提示:若未設置 DMS_KERNEL_IDLE_TIMEOUT=0,長時間運行的 API 服務可能在空閒數分鐘後被系統回收,導致後續調用失敗。

  • 創建完成後,在notebook會話窗口中點擊啓動即可
    圖片

2.編寫並啓動 FastAPI 服務

  • 點擊文件夾圖標,右鍵點擊default(默認庫),再點擊新建Notebook文件
    圖片
  • 在代碼塊中編寫相關的python代碼,可以參考以下模板構建你的API服務,更多FastAPI相關的使用方法請查看官方文檔https://fastapi.tiangolo.com/
import os
from fastapi import FastAPI, HTTPException, Request, File, UploadFile, Path, Query, Form, Header
from fastapi.staticfiles import StaticFiles
from typing import Optional
import nest_asyncio
import asyncio
import httpx
import io
'''
注意,Jupyter本身就在一個asyncio事件循環中運行。我們不能在已有循環中直接運行另一個循環,但nest_asyncio這個庫可以“打補丁”,允許我們這樣做。
'''
nest_asyncio.apply()
app = FastAPI(title="Your Service Name", description="Description of Your Service")
# 創建static目錄(如果不存在)
static_dir = "static"
if not os.path.exists(static_dir):
    os.makedirs(static_dir)
# 掛載靜態文件服務
app.mount("/static", StaticFiles(directory=static_dir), name="static")
@app.get("/")
async def root():
    """
    根節點,返回服務基本信息
    # --- 如何使用 curl 調用 ---
    curl -X GET "http://127.0.0.1:8000/"
    
    """
    return {
        "message": "Service is running",
        "documentation": "/docs",
        "note": "..."
    }
@app.post("/process-data/{item_id}")
async def process_data(
    request: Request, # 使用 Request 對象來接收原始請求
    
    # 路徑參數
    item_id: int = Path(..., title="物品ID", ge=1),
    
    # 查詢參數
    is_premium: bool = Query(False, description="是否為高級物品"),
    
    # 請求頭參數
    x_token: Optional[str] = Header(None, description="自定義的認證Token")
):
    """
    接收 JSON 請求體、路徑參數、查詢參數和請求頭。
    # --- 如何使用 curl 調用 ---
    # -X POST: 指定請求方法
    # URL: 包含路徑參數 {item_id} 和查詢參數 ?is_premium=true
    # -H: 添加請求頭 (Header)
    # -d: 發送請求體 (Body),這裏是 JSON 字符串
    curl -X POST "http://127.0.0.1:8000/process-data/101?is_premium=true" \
         -H "Content-Type: application/json" \
         -H "X-Token: my-secret-token" \
         -d '{"name": "筆記本電腦", "price": 7999.9, "tags": ["electronics", "office"]}'
    """
    if x_token != "my-secret-token":
        raise HTTPException(status_code=401, detail="X-Token 無效")
    
    try:
        # 手動解析 JSON 請求體
        json_body = await request.json()
        
        name = json_body.get("name")
        price = json_body.get("price")
        
        # 你的業務邏輯代碼
        if not name or not price:
            raise HTTPException(status_code=400, detail="請求體中缺少 'name' 或 'price'")
        return {
            "message": "數據處理成功",
            "received_data": {
                "item_id": item_id,
                "is_premium": is_premium,
                "x_token": x_token,
                "body": json_body
            }
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"服務執行錯誤: {str(e)}")
@app.post("/upload-file")
async def upload_file(
    # 表單數據
    token: str = Form(...),
    # 上傳文件
    file: UploadFile = File(...)
):
    """
    通過表單(form-data)上傳文件和附帶的文本信息。
    # --- 如何使用 curl 調用 ---
    # -F: 用於發送 multipart/form-data
    # -F "file=@/path/to/your/file.txt": @符號表示後面是文件路徑,curl會讀取該文件內容作為上傳數據
    # -F "token=user123": 發送一個名為 token 的表單字段
    # 注意: 請將 /path/to/your/file.txt 替換為你的本地文件真實路徑
    curl -X POST "http://127.0.0.1:8000/upload-file" \
         -F "file=@./test_upload.txt" \
         -F "token=my-form-token"
    """
    # 為了讓curl示例能工作,我們先創建一個示例文件
    if not os.path.exists("test_upload.txt"):
        with open("test_upload.txt", "w") as f:
            f.write("This is a test file for curl upload.")
    try:
        contents = await file.read()
        file_location = os.path.join(static_dir, file.filename)
        with open(file_location, "wb") as f:
            f.write(contents)
            
        return {
            "message": "文件上傳成功!",
            "token": token,
            "filename": file.filename,
            "file_size": len(contents),
            "file_url": f"/static/{file.filename}"
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"文件處理錯誤: {str(e)}")
@app.get("/status")
async def get_server_status():
    """
    獲取服務器狀態。
    # --- 如何使用 curl 調用 ---
    curl -X GET "http://127.0.0.1:8000/status"
    """
    return {"status": "running"}
async def run_server(host="127.0.0.1", port=8000):
    """在後台運行uvicorn服務器"""
    import uvicorn
    config = uvicorn.Config(app, host=host, port=port, log_level="info")
    server = uvicorn.Server(config)
    # uvicorn.run() 是一個阻塞調用,所以我們用更底層的Server.serve()
    await server.serve()
task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))
# 等待服務啓動
await asyncio.sleep(2)
# 創建一個異步HTTP客户端
async with httpx.AsyncClient() as client:
    print("正在向 http://127.0.0.1:8000/status/ 發送請求...")
    
    # 發送POST請求
    response = await client.get("http://127.0.0.1:8000/status")
    
    # 打印結果
    if response.status_code == 200:
        print("服務啓動成功")
    else:
        print("服務啓動失敗,請檢查報錯信息")

• 接下來我們以構建一個單日銷售數據分析的API為例,代碼內容如下:

import os
import pandas as pd
from fastapi import FastAPI, HTTPException, Request, File, UploadFile
from fastapi.staticfiles import StaticFiles
import nest_asyncio
import asyncio
import httpx
import io
'''
注意,Jupyter本身就在一個asyncio事件循環中運行。我們不能在已有循環中直接運行另一個循環,但nest_asyncio這個庫可以“打補丁”,允許我們這樣做。
'''
nest_asyncio.apply()
app = FastAPI(title="Sales Data Analysis Service", description="Provides data analysis and chart generation capabilities for Dify")
# 創建static目錄(如果不存在)
static_dir = "static"
if not os.path.exists(static_dir):
    os.makedirs(static_dir)
# 掛載靜態文件服務
app.mount("/static", StaticFiles(directory=static_dir), name="static")
def load_sales_data_from_file(file_content: bytes):
    """從上傳的文件內容加載銷售數據"""
    try:
        # 將字節內容轉換為StringIO對象
        csv_string = file_content.decode('utf-8')
        df = pd.read_csv(io.StringIO(csv_string))
        
        # 驗證必要的列是否存在
        required_columns = ['Date', 'Product', 'Price', 'Amount', 'Region']
        if not all(col in df.columns for col in required_columns):
            raise ValueError(f"CSV file must contain columns: {', '.join(required_columns)}")
        
        # 轉換數據類型
        df['Date'] = pd.to_datetime(df['Date'])
        df['Price'] = pd.to_numeric(df['Price'], errors='coerce')
        df['Amount'] = pd.to_numeric(df['Amount'], errors='coerce')
        
        # 計算銷售額 (Sales = Price × Amount)
        df['Sales'] = df['Price'] * df['Amount']
        
        return df
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing CSV file: {str(e)}")
@app.get("/")
async def root():
    """Root endpoint, returns service information"""
    return {
        "message": "Sales Data Analysis Service is running",
        "documentation": "/docs",
        "endpoints": [
            "POST /analysis/daily_sale_analysis"
        ],
        "note": "Require a CSV file upload with columns: Date, Product, Price, Amount, Region"
    }
@app.post("/analysis/daily_sale_analysis")
async def daily_sale_analysis(
    file: UploadFile = File(...)
):
    """當日銷售數據分析 - 分析上傳文件中的銷售數據"""
    try:
        # 驗證文件類型
        if not file.filename.endswith('.csv'):
            raise HTTPException(status_code=400, detail="文件必須是CSV格式")
        
        # 讀取上傳的文件
        file_content = await file.read()
        df = load_sales_data_from_file(file_content)
        
        # 獲取數據中的日期範圍
        df['Date'] = pd.to_datetime(df['Date']).dt.date
        unique_dates = sorted(df['Date'].unique())
        
        if len(unique_dates) == 0:
            raise HTTPException(status_code=400, detail="數據文件中沒有有效的日期數據")
        
        # 如果有多個日期,取最新的日期作為分析目標
        target_date = unique_dates[-1] if len(unique_dates) > 1 else unique_dates[0]
        
        # 篩選目標日期的數據
        daily_data = df[df['Date'] == target_date].copy()
        
        if daily_data.empty:
            raise HTTPException(status_code=400, detail=f"沒有找到日期 {target_date} 的銷售數據")
        
        # 基礎統計
        total_sales = daily_data['Sales'].sum()
        total_orders = len(daily_data)
        total_quantity = daily_data['Amount'].sum()
        avg_order_value = total_sales / total_orders if total_orders > 0 else 0
        
        # 產品分析
        product_analysis = daily_data.groupby('Product').agg({
            'Sales': 'sum',
            'Amount': 'sum',
            'Price': 'mean'
        }).round(2)
        
        # 按銷售額排序,取前5名產品
        top_products = product_analysis.sort_values('Sales', ascending=False).head(5)
        top_products_list = []
        for product, row in top_products.iterrows():
            top_products_list.append({
                "product": product,
                "sales": float(row['Sales']),
                "quantity": int(row['Amount']),
                "avg_price": float(row['Price'])
            })
        
        # 地區分析
        region_analysis = daily_data.groupby('Region').agg({
            'Sales': 'sum',
            'Amount': 'sum'
        }).round(2)
        
        # 按銷售額排序
        top_regions = region_analysis.sort_values('Sales', ascending=False)
        region_list = []
        for region, row in top_regions.iterrows():
            region_list.append({
                "region": region,
                "sales": float(row['Sales']),
                "quantity": int(row['Amount']),
                "percentage": round(float(row['Sales']) / total_sales * 100, 2)
            })
        
        # 價格區間分析
        daily_data['price_range'] = pd.cut(daily_data['Price'], 
                                         bins=[0, 100, 500, 1000, 5000, float('inf')], 
                                         labels=['0-100', '100-500', '500-1000', '1000-5000', '5000+'])
        
        price_range_analysis = daily_data.groupby('price_range').agg({
            'Sales': 'sum',
            'Amount': 'sum'
        }).round(2)
        
        price_ranges = []
        for price_range, row in price_range_analysis.iterrows():
            if not pd.isna(row['Sales']) and row['Sales'] > 0:
                price_ranges.append({
                    "range": str(price_range),
                    "sales": float(row['Sales']),
                    "quantity": int(row['Amount'])
                })
        
        # 生成洞察分析
        insights = []
        
        # 銷售額洞察
        if total_sales > 100000:
            insights.append(f"當日銷售表現優秀,總銷售額達到 {total_sales:,.2f} 元")
        elif total_sales > 50000:
            insights.append(f"當日銷售表現良好,總銷售額為 {total_sales:,.2f} 元")
        else:
            insights.append(f"當日銷售額為 {total_sales:,.2f} 元,可能需要關注銷售策略")
        
        # 產品洞察
        if len(top_products_list) > 0:
            best_product = top_products_list[0]
            insights.append(f"最佳銷售產品是 {best_product['product']},銷售額 {best_product['sales']:,.2f} 元")
        
        # 地區洞察
        if len(region_list) > 0:
            best_region = region_list[0]
            insights.append(f"銷售表現最佳的地區是 {best_region['region']},佔總銷售額的 {best_region['percentage']}%")
        
        # 訂單洞察
        if avg_order_value > 1000:
            insights.append(f"平均訂單價值較高,為 {avg_order_value:,.2f} 元,顯示客户購買力強")
        
        return {
            "analysis_date": str(target_date),
            "summary": {
                "total_sales": round(float(total_sales), 2),
                "total_orders": int(total_orders),
                "total_quantity": int(total_quantity),
                "average_order_value": round(float(avg_order_value), 2)
            },
            "top_products": top_products_list,
            "region_analysis": region_list,
            "price_range_analysis": price_ranges,
            "insights": insights,
            "data_info": {
                "date_range": f"{unique_dates[0]} 到 {unique_dates[-1]}" if len(unique_dates) > 1 else str(unique_dates[0]),
                "total_records": len(daily_data),
                "unique_products": len(daily_data['Product'].unique()),
                "unique_regions": len(daily_data['Region'].unique())
            }
        }
        
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"當日銷售數據分析錯誤: {str(e)}")
@app.get("/status")
async def get_server_status():
    """獲取服務器狀態"""
    try:
        return {
            "status": "running"
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error getting server status: {str(e)}")
async def run_server(host="127.0.0.1", port=8000):
    """在後台運行uvicorn服務器"""
    import uvicorn
    config = uvicorn.Config(app, host=host, port=port, log_level="info")
    server = uvicorn.Server(config)
    # uvicorn.run() 是一個阻塞調用,所以我們用更底層的Server.serve()
    await server.serve()
task = asyncio.create_task(run_server(host="0.0.0.0", port=8000))
# 等待服務啓動
await asyncio.sleep(2)
# 創建一個異步HTTP客户端
async with httpx.AsyncClient() as client:
    print("正在向 http://127.0.0.1:8000/status/ 發送請求...")
    
    # 發送POST請求
    response = await client.get("http://127.0.0.1:8000/status")
    
    # 打印結果
    if response.status_code == 200:
        print("服務啓動成功")
    else:
        print("服務啓動失敗,請檢查報錯信息")
✅ 異步支持:Jupyter 內置 asyncio 事件循環,可直接使用 async/await。
  • 運行相關代碼塊,可以在輸出部分看到API服務啓動成功
    圖片

圖片

3.查看IP地址

  • 在notebook代碼塊中,可以使用英文感嘆號+終端命令的形式來執行命令,你也可以使用!pip install xxx來安裝額外需要的python包。接下來新建一個代碼塊,在代碼塊中輸入!ifconfig並點擊運行查看該 Notebook 會話在VPC中的IP地址,圖中172.16.0.252即為所需的IP地址,API服務地址即為:http://172.16.0.252:8080/analyze_sales
    圖片

在Dify on DMS實例中訪問服務

現在我們使用這份模擬銷售數據文件來訪問API服務:

  • 在工作流中添加HTTP請求節點
    圖片
  • 通過http://:<端口>/xxx 訪問創建的API的服務,並在BODY中傳入相應的參數
    圖片
  • 測試運行可以看到該請求成功返回了響應的輸出
    圖片
  • 你也可以在Notebook會話中看到相應的服務被調用
    圖片
  • 接下來以該Dify工作流為例子進行完整的服務調用
    銷售數據分析.yml
    圖片
    在釘釘羣組中添加一個自定義的機器人,並參考https://open.dingtalk.com/document/orgapp/robot-overview 瞭解如何獲取釘釘機器人的access_token和sign_secret
    圖片
    在填寫完你的釘釘機器人蔘數之後,點擊右上角的運行-》從本地上傳上傳示例的銷售數據,並點擊開始運行
    圖片
    釘釘羣中的機器人成功發送了對該銷售數據的分析報告
    圖片
    點擊右上角的發佈-》發佈更新發布工作流用於後面的定時調用
    圖片

使用 DMS Airflow 實現定時調度

創建DMS Airflow實例

參照以下鏈接在DMS中創建Airflow實例
https://help.aliyun.com/zh/dms/purchase-airflow-resources
https://help.aliyun.com/zh/dms/create-and-manage-an-airflow-i...

更多關於Airflow的操作,參照https://airflow.apache.org/docs/apache-airflow/stable/index.html

編寫 DAG:每日自動觸發銷售分析

  • 以下是示例的python代碼,用於定時調用Dify工作流的API
import pendulum
import requests
import json
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
CSV_FILE_PATH = "/yourpath/daily_sale_data.csv" 
DIFY_API_URL = "https://dify-dms.aliyuncs.com/v1" # 替換成你的 Dify Workflow API URL
# 從 Airflow Variable 中安全地獲取 API Key
DIFY_API_KEY = Variable.get("dify_api_key")
APP_API_KEY= Variable.get("app_api_key")
def call_dify_workflow_with_csv(**kwargs):
    """
    讀取 CSV 文件內容,並將其作為文件上傳調用 Dify 工作流。
    """
    print(f"準備從 '{CSV_FILE_PATH}' 讀取文件...")
    try:
        with open(CSV_FILE_PATH, 'rb') as f:
            files_to_upload = {
                'file': ('daily_sale_data.csv', f, 'document/csv') 
            }
            # 準備 API 請求的 headers 和 body
            headers = {
                'Authorization': f'Bearer {APP_API_KEY}',
                'DifyApiKey': f'{DIFY_API_KEY}',
            }
            
            
            file_upload_response=requests.post(
                DIFY_API_URL+'/files/upload',
                headers=headers, 
                data={'user': 'airflow-user-demo'}, 
                files=files_to_upload,
            )
            print(file_upload_response.json())
                
            file_id=file_upload_response.json().get('id')
            headers.update({'Content-Type': 'application/json'})
            
            # 'inputs' 通常是json字符串
            # 'user' 是必須的,代表最終用户的標識符
            input_data = {
                'sales_data':  {
                        "type": "document",
                        "transfer_method": "local_file",
                        "upload_file_id": file_id
                    }
            }
            data = {
                'inputs': input_data,  
                'user': 'airflow-user-demo',
                'response_mode': 'blocking',
            }
            print("開始調用 Dify API...")
            print(f"URL: {DIFY_API_URL}")
            
            response = requests.post(
                DIFY_API_URL+'/workflows/run', 
                headers=headers, 
                json=data,
            )
            
            # 檢查請求是否成功
            response.raise_for_status() 
            
            print(f"API 調用成功!狀態碼: {response.status_code}")
            
            # 打印響應內容
            print("--- Dify API Response ---")
            
            print(response.json()["data"]["outputs"]["answer"])
                     
            print("\n--- End of Response ---")
            
            # 你也可以將完整的響應推送到 XComs,以便下游任務使用
            # ti = kwargs['ti']
            # ti.xcom_push(key='dify_response', value=full_response)
    except FileNotFoundError:
        print(f"錯誤:文件未找到於 '{CSV_FILE_PATH}'")
        raise
    except requests.exceptions.RequestException as e:
        print(f"API 調用失敗: {e}")
        raise
with DAG(
    dag_id="dify_workflow",
    start_date=pendulum.datetime(2023, 10, 27, tz="Asia/Shanghai"),
    # '0 8 * * *' 代表每天早上8:00 (UTC+8)
    # Airflow 默認使用 UTC,但 Cron 表達式本身不帶時區,調度器會根據 DAG 的時區設置來解釋
    schedule="0 8 * * *", 
    catchup=False,
    tags=["dify", "api", "example"],
    doc_md="""
    ### Dify 工作流調用 DAG
    此 DAG 每天早上8點執行,它會:
    1. 從本地文件系統讀取一個 CSV 文件。
    2. 將該 CSV 文件作為附件,調用一個 Dify 工作流。
    3. 打印出 Dify API 的響應。
    """
) as dag:
    run_dify_workflow = PythonOperator(
        task_id="call_dify",
        python_callable=call_dify_workflow_with_csv,
    )
📌 注意:通過API調用Dify工作流中想要上傳本地文件,需要先通過/files/upload接口傳入相應的文件獲取文件id,再將文件id傳入工作流中。
  • 創建完成後打開Airflow實例,可以看到創建的定時任務
    圖片
  • 每日 8:00,系統自動調用 Dify 工作流,最終由釘釘機器人推送分析報告
    圖片

圖片

總結與思考

通過 DMS Notebook + DMS Airflow 對 Dify 的能力擴展,我們成功構建了一個具備以下特性的企業級 Agent 開發範式:
image.png

這套方案不僅解決了 Dify 當前的侷限性,更重要的是,它保留了 Dify 的低代碼優勢,同時通過與成熟數據基礎設施(Notebook + Airflow)的深度集成,實現了 “敏捷開發”與“工程可靠”的平衡。

🌟 核心理念:Agent 的價值不在於“全自動”,而在於“可擴展、可調度、可運維”。真正的生產級智能系統,一定是平台能力與工程實踐的結合體。

Add a new 评论

Some HTML is okay.