動態

詳情 返回 返回

通過簡單的代碼,快速完成數據分析智能體方案的構建 - 動態 詳情

背景

在電商和遊戲等數據密集型行業中,業務人員經常需要快速獲取數據洞察及時應對運營策略的變化,例如轉化率,下單率,付費玩家的等級分佈變化等等問題。這些問題往往需要涉及複雜的SQL查詢。傳統方式主要依賴技術人員手動的查詢語句,或者使用固定報表,整體靈活性較差。非技術人員希望可以通過自然語言完成數庫查詢的工作,提高數據獲取的效率和靈活性。本文將介紹如何通過Bedrock AgentCore Runtime, Strands Agents SDK以及Amazon Redshift MCP Server,通過簡單的代碼,快速完成數據分析智能體方案的構建。

📢限時插播:無需管理基礎設施,利用亞馬遜技術與生態,快速集成與部署生成式AI模型能力。
✨ 精心設計,旨在引導您深入探索Amazon Bedrock的模型選擇與調用、模型自動化評估以及安全圍欄(Guardrail)等重要功能。
⏩快快點擊進入《多模一站通 —— Amazon Bedrock 上的基礎模型初體驗》實驗構建無限, 探索啓程!

Amazon Bedrock AgentCore

Amazon Bedrock AgentCore 可幫助您安全、大規模地部署和運行功能強大的人工智能體,主要解決AI智能體的雲端部署和運行挑戰。Amazon Bedrock AgentCore 服務可以組合使用,也可以單獨使用。本文將部署Bedrock AgentCore Runtime,託管數據分析智能體。Bedrock AgentCore Runtime 提供了一個安全、無服務器且專門構建的託管環境,用於部署和運行AI智能體或工具。

Strands Agents SDK

Strands Agents SDK是亞馬遜雲科技發佈的開源AI智能體SDK,可以簡化智能體開發,充分利用最新大語言模型的原生推理、規劃和工具調用能力,而不需要複雜的編排邏輯。Strands Agents SDK支持多種模型提供商,包括Amazon Bedrock、Anthropic、Ollama、Meta等,並且支持OpenAI兼容接口,在中國區也可以使用國內的模型服務商提供的模型API,Strands Agent SDK還同時提供了20多個預構建工具。本文將通過Strands Agents SDK與Amazon Redshift MCP Server的集成,實現主要的智能體功能。Strands Agents SDK通過MCP Client自動發現和加載Redshift MCP Server提供的所有數據庫操作工具,可以自動發現和管理Redshift集羣的元數據信息,包括表結構、字段類型、索引關係等,為AI代理提供完整的數據庫上下文。MCP Server還處理連接池管理、權限控制和查詢優化,生成安全高效的SQL查詢。

Amazon Redshift MCP Server

關於MCP的定義這裏不再贅述。本文主要使用Amazon Redshift MCP Server來與Redshift資源進行交互。該MCP Server提供了一套全面的工具集,包括髮現、探索和查詢Amazon Redshift集羣及無服務器工作組的功能,使AI助手能夠安全高效地操作Redshift資源。該MCP Server使用Redshift Data API完成數倉的訪問和操作,無需用户名密碼。因此,在後續的AgentCore Runtime部署過程中,請確保AgentCore Runtime附加的角色有足夠的權限訪問Redshift中的數據,同時請注意分配對應的表權限。

本文將基於模擬的遊戲用户數據完成方案演示。

方案架構

架構圖

image.png

實現邏輯

核心文件如下:

  • strands_agent.py # 主要Agent實現
  • deploy.py # 部署腳本
  • test_client.py # 測試客户端,客户端主要調用agentcore client
  • requirements.txt # 依賴管理

requirements.txt

strands-agents
strands-agents-tools
bedrock-agentcore
bedrock-agentcore-starter-toolkit
aws-opentelemetry-distro>=0.10.0
mcp

strands_agent.py

本文基於已有的strands agent代碼,通過@app.entrypoint裝飾器,將普通的Python函數轉換為AgentCore Runtime可以識別和調用的入口點。AgentCore Runtime將用户的代碼打包成Docker容器,容器啓動後通過@app.entrypoint裝飾器識別請求入口點主函數。

請在Redshift MCP Server中配置您的Redshift集羣所在的區域為默認區域。在Strands Agent中指定調用的模型,這裏使用的是Amazon Bedrock 中的Claude 3.7模型。您可以根據具體的業務需求調整系統提示詞。請注意在initialize_table_permissions函數中替換您在Redshift集羣中需要訪問的表,該函數用於初始化Redshift Data API對於表的訪問權限。請參考Amazon Redshift MCP Server文檔中涉及的權限要求。

#!/usr/bin/env python3
"""
Strands Agent with Redshift MCP Tools for AgentCore Runtime
"""

from strands import Agent
from strands.tools.mcp import MCPClient
from mcp import stdio_client, StdioServerParameters
from bedrock_agentcore.runtime import BedrockAgentCoreApp

app = BedrockAgentCoreApp()

async def initialize_table_permissions(mcp_client, cluster_id, database_name, tables):
    """Initialize table permissions for Redshift Data API access"""
    
    try:
        clusters_result = await mcp_client.call_tool_async("list_clusters", {})
        
        if hasattr(clusters_result, 'content') and clusters_result.content:
            cluster_info = str(clusters_result.content)
            if cluster_id not in cluster_info:
                import re
                matches = re.findall(r'identifier["\']?\s*:\s*["\']?([^"\'}\s,]+)', cluster_info)
                if matches:
                    cluster_id = matches[0]
        
        for table in tables:
            grant_sql = f"GRANT SELECT ON TABLE {table} TO PUBLIC;"
            
            try:
                result = await mcp_client.call_tool_async(
                    "execute_query",
                    {
                        "cluster_identifier": cluster_id,
                        "database_name": database_name,
                        "sql": grant_sql
                    }
                )
            except Exception as table_error:
                continue
            
    except Exception as e:
        pass

@app.entrypoint
async def strands_agent_bedrock(payload, context):
    """
    Invoke the agent with a payload
    """
    # ========== 配置參數 ==========
    # AWS配置
    AWS_REGION = "us-west-2"
    DATABASE_NAME = "testdb"
    CLUSTER_ID = "test-workgroup"
    
    # 數據表配置
    TABLES = [
        'public.activity_events',
        'public.charge_events', 
        'public.fight_events',
        'public.login_logout_events'
    ]
    
    # 模型配置
    MODEL_ID = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
    
    # MCP服務器配置
    MCP_COMMAND = "uvx"
    MCP_ARGS = ["awslabs.redshift-mcp-server@latest"]
    # ========== 配置參數結束 ==========
    
    try:
        redshift_mcp_client = MCPClient(
            lambda: stdio_client(StdioServerParameters(
                command=MCP_COMMAND, 
                args=MCP_ARGS,
                env={
                    "AWS_DEFAULT_REGION": AWS_REGION,
                    "AWS_REGION": AWS_REGION
                }
            ))
        )

        with redshift_mcp_client:
            redshift_tools = redshift_mcp_client.list_tools_sync()

            try:
                await initialize_table_permissions(redshift_mcp_client, CLUSTER_ID, DATABASE_NAME, TABLES)
            except Exception as perm_error:
                print(f"表權限初始化失敗: {perm_error}")

            agent = Agent(
                model=MODEL_ID,
                system_prompt=""""""你是一位專業的AWS Redshift數據分析師助手,具備以下核心能力:
                ## 角色定位
                - 精通Redshift數據倉庫架構、性能優化和數據分析
                - 能夠使用可用工具高效查詢和分析Redshift數據
                - 提供準確、實用的數據洞察和業務建議

                ## 分析方法論
                1. **數據探索**:首先了解數據結構、質量和分佈特徵
                2. **業務理解**:結合業務場景解讀數據含義
                3. **統計分析**:運用描述性統計、趨勢分析、異常檢測等方法
                4. **洞察提煉**:從數據中提取可操作的業務洞察
                5. **建議輸出**:提供基於數據的決策建議和後續行動方案
                
                ## SQL執行安全規範
                - 僅執行SELECT查詢,嚴禁INSERT、UPDATE、DELETE、CREATE、DROP等寫操作
                - 每個查詢必須包含LIMIT子句,避免返回過大結果集
                - 查詢前必須驗證表名和字段名的存在性
                - **重要**:如果查詢失敗,必須先執行ROLLBACK或COMMIT來結束當前事務,然後重新開始新的查詢
                - 避免在字符串字段上使用日期函數,需要先進行類型轉換
                - 如果查詢失敗,重新生成兼容的SQL而不是嘗試修復事務狀態

                ## 輸出要求
                - **語言**:全程使用中文回覆
                - **格式**:以Markdown格式組織內容,包含清晰的標題層級
                - **內容結構**:
                  - 數據概覽與質量評估
                  - 詳細分析過程和思維邏輯
                  - 關鍵發現和數據洞察
                  - 業務建議和行動建議

                ## 分析深度
                - 不僅提供查詢結果,更要解釋數據背後的業務含義
                - 識別數據趨勢、模式和異常
                - 提供預測性分析和建議
                - 考慮數據的時間序列特徵和季節性等相關的環境影響

                請始終保持專業、準確、有洞察力的分析風格。""",
                tools=redshift_tools,
            )
            
            user_input = payload.get("prompt", "No prompt found")
            
            response = agent(user_input)
            return response
            
    except Exception as e:
        error_msg = f"Agent執行錯誤: {str(e)}"
        return f"抱歉,處理請求時出現錯誤: {str(e)}"

if __name__ == "__main__":
    app.run()

接下來需要將開發好的strands_agent.py部署到Bedrock AgentCore Runtime中運行。在deploy.py中定義AgentCore部署的區域信息,指定部署的entrypoint和requirements文件。Bedrock AgentCore Runtime部署的過程中會解析strands_agent.py中的entrypoint,並生成.bedrock_agentcore.yaml、Dockerfile、.dockerignore等文件。同時在雲上自動創建ECR 用於存儲Agent的Docker鏡像,用於構建Docker鏡像並推送到ECR的CodeBuild項目。最終會在指定區域完成AgentCore Runtime的部署。請注意完成Bedrock AgentCore Runtime部署後,確保關聯的IAM角色權限允許訪問Redshift集羣,可以添加文檔中提及的權限用於集羣訪問以及Data API執行。

deploy.py

#!/usr/bin/env python3
"""
Deploy Strands Agent with Redshift MCP Tools to AgentCore Runtime
"""

from bedrock_agentcore_starter_toolkit import Runtime
import boto3
import time

def deploy():
    """Deploy Strands Agent to AgentCore Runtime"""
    
    region = 'us-west-2'
    
    agentcore_runtime = Runtime()
    
    response = agentcore_runtime.configure(
        entrypoint="strands_agent.py",
        auto_create_execution_role=True,
        auto_create_ecr=True,
        requirements_file="requirements.txt",
        region=region,
        agent_name="<替換為您所需的agentcore名稱>"
    )
    
    print(f"AgentCore Runtime configured successfully!")
    print(f"Region: {region}")
    
    launch_result = agentcore_runtime.launch()
    
    print("等待部署完成...")
    status_response = agentcore_runtime.status()
    status = status_response.endpoint['status']
    
    end_status = ['READY', 'CREATE_FAILED', 'DELETE_FAILED', 'UPDATE_FAILED']
    while status not in end_status:
        print(f"狀態: {status} - 等待中...")
        time.sleep(10)
        status_response = agentcore_runtime.status()
        status = status_response.endpoint['status']
    
    print(f"最終狀態: {status}")
    
    if status == 'READY':
        print("部署成功!")
        return {
            'region': region,
            'agent_arn': launch_result.agent_arn,
            'launch_result': launch_result
        }
    else:
        print("部署失敗")
        return None

if __name__ == "__main__":
    result = deploy()
    if result:
        print("\n" + "="*50)
        print("部署信息:")
        print("="*50)
        print(f"Region: {result['region']}")
        print(f"Agent ARN: {result['agent_arn']}")
        print("\n保存這些信息用於測試!")

test_client.py

  • 在agent_runtime_arn參數中指定您上一步部署完成的AgentCore Runtime的arn。
  • 請在prompt中替換為您的指令,根據原始表結構,本文使用的提示詞為:“幫我總結testdb中charge_events的事件情況,並且根據歷史趨勢,分析總結未來兩週用户可能的事件趨勢,在輸出中包含詳細的分析過程”
  • 本文測試客户端代碼通過正則解析輸出內容並轉寫到文件中方便閲讀,請根據實際需求獲取輸出內容。
#!/usr/bin/env python3

import boto3
import json
import uuid
import datetime
import re

def extract_agent_content(response_data):
    content = []
    if isinstance(response_data, bytes):
        text = response_data.decode('utf-8', errors='ignore')
    else:
        text = str(response_data)
    
    text = text.strip()
    if text.startswith('"') and text.endswith('"'):
        cleaned = text[1:-1].replace('\\n', '\n').replace('\\t', '\t').replace('\\"', '"').replace('\\\\', '\\')
        if len(cleaned.strip()) > 50:
            content.append(cleaned)
            return content
    
    try:
        json_pattern = r'\{[^{}]*"body"[^{}]*"output"[^{}]*"messages".*?\}'
        json_matches = re.findall(json_pattern, text, re.DOTALL)
        
        for json_str in json_matches:
            try:
                data = json.loads(json_str)
                if "body" in data and "output" in data["body"]:
                    messages = data["body"]["output"].get("messages", [])
                    for message in messages:
                        if "content" in message and "message" in message["content"]:
                            msg_content = message["content"]["message"]
                            if len(msg_content.strip()) > 20:
                                content.append(msg_content)
            except:
                continue
    except:
        pass
    
    if not content:
        message_pattern = r'"message":\s*"((?:[^"\\]|\\.|\\n|\\t)*)"'
        matches = re.findall(message_pattern, text, re.DOTALL)
        
        for match in matches:
            cleaned = match.replace('\\n', '\n').replace('\\t', '\t').replace('\\"', '"').replace('\\\\', '\\')
            if len(cleaned.strip()) > 50:
                content.append(cleaned)
    
    return content

def test_strands_agent():
    agent_runtime_arn = "<您上一步中部署完成的agentcore runtime arn>"
    session_id = str(uuid.uuid4())
    
    client = boto3.client(
        'bedrock-agentcore',
        region_name='us-west-2',
        config=boto3.session.Config(read_timeout=300, connect_timeout=60)
    )
    
    PROMPT = "幫我總結testdb表中充值的事件情況,並且根據歷史一個月內的趨勢,分析總結未來以周用户可能的事件趨勢,在輸出中包含詳細的分析過程"
    
    try:
        print(f"測試查詢: {PROMPT}")
        
        response = client.invoke_agent_runtime(
            agentRuntimeArn=agent_runtime_arn,
            qualifier="DEFAULT",
            runtimeUserId="123", 
            runtimeSessionId=session_id,
            payload=json.dumps({"prompt": PROMPT})
        )
        
        all_data = ""
        
        if "text/event-stream" in response.get("contentType", ""):
            print("處理流式響應...")
            for line in response["response"].iter_lines(chunk_size=1024):
                if line:
                    line_str = line.decode("utf-8", errors='ignore')
                    all_data += line_str + "\n"
        else:
            print("處理普通響應...")
            for event in response.get("response", []):
                event_str = event.decode('utf-8', errors='ignore')
                all_data += event_str + "\n"
        
        print(f"收到數據長度: {len(all_data)} 字符")
        
        contents = extract_agent_content(all_data)
        
        if contents:
            print("\nAgent輸出:")
            for i, content in enumerate(contents, 1):
                print(f"\n--- 片段 {i} ---")
                print(content)
        else:
            print("未提取到有效內容")
        
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = f"agent_output_{timestamp}.txt"
        
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(f"Agent測試結果\n")
            f.write(f"時間: {datetime.datetime.now()}\n")
            f.write(f"查詢: {PROMPT}\n")
            f.write(f"會話ID: {session_id}\n")
            f.write("="*50 + "\n\n")
            
            for i, content in enumerate(contents, 1):
                f.write(f"片段 {i}:\n{content}\n\n")
        
        print(f"\n測試完成,共提取 {len(contents)} 個內容片段")
        print(f"結果已保存到: {output_file}")
        
    except Exception as e:
        print(f"測試失敗: {e}")

if __name__ == "__main__":
    test_strands_agent()

參考輸出內容:

Agent測試結果
時間: 2025-09-24 15:31:08.648040
查詢: 幫我總結testdb表中充值的事件情況,並且根據歷史一個月內的趨勢,分析總結未來以周用户可能的事件趨勢,在輸出中包含詳細的分析過程
會話ID: xxxxxxxx
==================================================

片段 1:
# 充值事件分析報告

## 1. 數據概覽
本次分析基於`testdb`數據庫中的`charge_events`表,包括充值事件的詳細信息。

## 2. 周度充值趨勢分析

### 2.1 充值事件統計
- **總週數**:4周
- **充值周列表**:
  1. 2025-04-28:250個用户,250個充值事件,總金額638萬
  2. 2025-05-05:812個用户,842個充值事件,總金額2076萬
  3. 2025-05-12:1053個用户,1102個充值事件,總金額2787萬
  4. 2025-05-19:1035個用户,1066個充值事件,總金額2643萬

### 2.2 趨勢解讀
- **用户增長**:從250人快速增長到1000+人,增長約4倍
- **充值事件增長**:從250增長到1100,增長約4.4倍
- **總充值金額**:從638萬增長到2787萬,增長約4.3倍

## 3. 充值時間分佈 
### 3.1 Top 5充值高峯時段分析

| 排名 | 小時 | 事件數 | 總金額 | 百分比 |
|------|------|--------|--------|--------|
| 1    | 15時 | 162    | 413萬  | 4.97%  |
| 2    | 9時  | 156 
   | 394萬  | 4.79%  |
| 3    | 1時  | 156    | 405萬  | 4.79%  |
| 4    | 2時  | 146    | 358萬  | 4.48%  |
| 5    | 11時 | 144    | 344萬  | 4.42%  |

### 3.2 時間分佈特徵
- 充值高峯主要集中在15時、9時、1時
- 每個高峯時段事件數在144-162之間
- 高峯時段金額在340萬-410萬之間

## 4. 未來預測與建議

### 4.1 用户增長預測
- 根據過去4周趨勢,預計未來每週用户增長將保持在20-30%
- 下一個月可能達到1500-2000活躍充值用户

### 4.2 充值事件預測
- 預計下一個月每週充值事件將在1200-1500個
- 周充值總金額可能達到3000-3500萬

### 4.3 業務建議
1. **高峯時段運營**
   - 15時、9時、1時作為重點運營時段
   - 在這些時間段提供特殊優惠或活動

2. **用户增長策略**
   - 分析新用户轉化路徑
   - 優化新用户充值引導機制

3. **產品優化**
   - 研究用户充值行為特徵
   - 根據高峯時段調整產品功能和運
營策略

## 5. 數據質量與侷限性
- 僅基於4週數據,長期趨勢還需進一步驗證
- 建議持續監控和更新預測模型

## 結論
充值事件呈現快速增長態勢,用户基數和充值金額都有顯著提升。建議密切跟蹤用户行為,及時調整運營策略。

Redshift MCP Server分析方式

上述測試方式中顯式聲明瞭具體需要查詢數據庫的名稱,這是因為數據庫名稱為testdb,定義不明確,建議在生產環境中使用與業務緊密相關的數據庫/表名稱,或通過註釋聲明,或在系統提示詞中添加定義以減少工具調用次數並且提高準確率。同時,我們在提示詞中沒有聲明具體需要查詢的表,因為表名稱定義清晰,在下面的日誌內容分析中可以更加清楚的看到Redshift MCP Server的工具調用過程中,AI可以明確的找到所需要的charge_events表。

根據Redshift MCP Server的工作流可以知道主要的工作為:

  • 發現集羣:(主要使用list_clusters工具)自動掃描亞馬遜雲科技賬户中所有可用的Redshift集羣和Serverless工作組,獲取連接信息、狀態和配置詳情,為後續操作建立連接基礎。
  • 列出數據庫:(主要使用list_databases工具)連接到選定的集羣,查詢系統視圖發現所有可訪問的數據庫。
  • 瀏覽數據庫結構:(主要使用list_schemas, list_tables, list_columns工具)深入每個數據庫,系統性地遍歷所有schema、數據表和字段結構,構建完整的元數據映射,瞭解數據類型、約束條件和表關係。
  • 查詢數據:(主要使用 execute_query工具)將用户的自然語言問題轉換為精確的SQL語句,在只讀事務中安全執行,返回結構化結果和執行性能指標。

AgentCore Runtime日誌內容分析

您可以根據以下日誌內容分析詳細瞭解到Redshift MCP Server中的工具被調用執行的具體邏輯。

1. list_cluster

image.png

image.png

2. list_databases

image.png

image.png

3. list_schemas

image.png

image.png

4. list_tables & list_columns

image.png

image.png

根據前面的查詢結果確定charge_events表並分析充值事件,同時調用了list_columns工具查看charge_events表結構,表結構輸出不在這裏詳細展示。

image.png

5. execute_query

大語言模型根據上述輸出內容完成text2sql生成對應的sql語句,並調用execute_query工具執行,在執行過程中會根據返回的結果調整sql語句直到輸出符合預期。

image.png

image.png

最終根據查詢的結果和提示詞生成詳細的事件分析報告

image.png

總結

本項目成功構建了基於 Amazon Bedrock AgentCore Runtime 的智能數據分析系統,通過集成遊戲業務的多維度事件數據,實現了從自然語言查詢到業務洞察輸出的完整閉環。總體技術核心優勢如下:

  • Amazon AgentCore Runtime:提供企業級無服務器託管環境,內置 CloudWatch 集成和分佈式追蹤,支持Sessions > Traces > Spans 三層監控體系,便於生產環境的性能優化和問題排查,同時通過 CodeBuild 實現雲端構建和一鍵部署,支持快速迭代,讓開發者專注業務邏輯而非基礎設施運維。
  • Strands Agents SDK + Redshift MCP Server:從單純的”查詢生成”升級為”洞察發現”,具備智能化數據探索、上下文感知分析、業務導向輸出和錯誤自愈能力。並且Strands Agents SDK僅需幾行代碼即可通過集成任意 MCP Server,同時支持框架原生工具的快速開發,為開發者提供了從快速原型到生產部署的完整工具生態集成能力。
  • 通過 AgentCore Runtime 的企業級託管能力與 Strands Agents SDK+ Redshift MCP Server的智能化工具生態深度融合,本項目展現了從傳統數據查詢向 AI 驅動的業務洞察分析轉變的技術路徑,為企業級智能數據分析應用提供了完整的雲原生解決方案。

*前述特定亞馬遜雲科技生成式人工智能相關的服務目前在亞馬遜雲科技海外區域可用。亞馬遜雲科技中國區域相關雲服務由西雲數據和光環新網運營,具體信息以中國區域官網為準。

本篇作者

image.png

本期最新實驗《多模一站通 —— Amazon Bedrock 上的基礎模型初體驗》
✨ 精心設計,旨在引導您深入探索Amazon Bedrock的模型選擇與調用、模型自動化評估以及安全圍欄(Guardrail)等重要功能。無需管理基礎設施,利用亞馬遜技術與生態,快速集成與部署生成式AI模型能力。
⏩️[點擊進入實驗] 即刻開啓 AI 開發之旅
構建無限, 探索啓程!
user avatar zhidechaomian_detxs7 頭像 ting_61d6d9790dee8 頭像 u_17569005 頭像 jianweilai 頭像 huandanshendeshoushudao 頭像 nixideshatanku 頭像 explinks 頭像 qcloudcommunity 頭像 tizuqiudexiangpica 頭像
點贊 9 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.