PAI Physical AI Notebook詳解1:基於Isaac仿真的操作動作數據擴增與模仿學習
Physical AI是AI技術演進的一個熱門話題,目的是基於Transformer、Diffusion等主流大模型結構,訓練得到可以在實際物理空間中指導機器人本體完成各種任務的AI模型。
在Physical AI模型的開發過程中,需要用到遙操採集、數據合成、數據增強、模仿學習、模型測評等多個過程,也會用到Isaac Sim、Isaac Lab、Cosmos等多種工具棧和模型。
基於和NVIDIA Physical AI套件的深度整合,阿里雲PAI人工智能平台整合了上述全套工具棧,並在Notebook Gallery中上線了多套開箱即用的最佳實踐,供開發者熟悉技術、快速啓動Physical AI項目使用。
從本期開始,我們會陸續介紹5款最佳實踐:
- 基於Isaac仿真的操作動作數據擴增與模仿學習
- 基於Cosmos世界模型的操作動作數據擴增與模仿學習
- 基於X-Mobility的通用導航與運動控制
- GR00T-N1模型微調
- 基於Isaac-Cortex的軟件在環驗證
本期我們介紹基於Isaac仿真的操作動作數據擴增與模仿學習
Physical AI模型模仿學習的一般過程
和LLM一樣,Physical AI模型的訓練也是神經網絡模型,也需要大量數據訓練,才能學會特定技能。但是與LLM使用文本數據訓練不同,Physical AI模型訓練使用的數據,需要是針對特定場景、特定對象、特定任務的多模態數據,一般包含視頻、觸覺傳感信息、關節位置數據。
一般來説,遙控真實機器人完成特定任務,即可獲取上述數據。但是,由於真實機器人尚未大規模普及,為了特定任務專門搭建演示場景的成本又較高,一般使用人工演示 + 數據擴增 + 數據增強的方式來獲取足夠的訓練數據,其中:
- 人工演示:由真人在仿真環境,或真實環境中,遙控機器人完成特定任務,從而獲取完整的任務數據
- 數據擴增:基於人工演示數據,引入一定隨機化(隨機初始位置、隨機過程軌跡等),擴增出與人工演示類似但有細微不同的數據
- 數據增強:在擴增數據的基礎上,通過生成式大模型,改善視頻數據中的貼圖、紋理、光照等細節,使之更符合真實環境
- 模仿學習:使用擴增並增強後的數據對基礎模型進行訓練
- 模型測評:在仿真環境或真實環境中,使用訓練得到的模型指導機器人運行,完成任務A,記錄其成功率,以體現模型質量
在PAI的Notebook Gallery中,我們已經預置了一個最佳實踐,就是這個過程的一個具體示例:
https://gallery.pai-ml.com/#/preview/deepLearning/cv/isaac\_lab\_wf1
下面我們來詳細解讀這個示例。
人工少量演示
人工少量演示的目的是給後續的數據擴增打下樣例。人工演示可以在真實環境下通過遙控真機完成,也可以在仿真環境下通過遙控仿真環境下的本體完成。
在本最佳實踐中,我們基於MimicGen進行數據擴增,而MimicGen需要仿真環境提供的真值數據,因此我們在Isaac Lab仿真環境下采集人工演示。以下是在DSW中啓動Isaac Lab仿真環境的一個樣例:
# 使用鍵盤遙操仿真環境進行數據集標註
cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/environments/teleoperation/teleop_se3_agent.py --task Isaac-Stack-Cube-Franka-IK-Rel-v0 --num_envs 1 --teleop_device keyboard --livestream 1"
print(f"執行命令: {cmd}")
!{cmd}
在DSW中執行此命令,稍等片刻,即可通過Isaac Sim WebRTC Streaming Client連接DSW公網IP從而啓動遙操界面
通過以下鍵盤操作可以控制機械臂的動作:
Toggle gripper (open/close): K
Move arm along x-axis: W/S
Move arm along y-axis: A/D
Move arm along z-axis: Q/E
Rotate arm along x-axis: Z/X
Rotate arm along y-axis: T/G
Rotate arm along z-axis: C/V
在完成遙控操作後,還需要使用Mimic Gen管線中的annotate\_demos.py對人工演示數據進行子任務標註,這是確保Isaac Lab Mimic功能正常運行的關鍵步驟。
cmd = f"/workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/isaaclab_mimic/annotate_demos.py \
--enable_cameras --task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Mimic-v0 --auto \
--input_file {dataset_path} --output_file {annotated_dataset_path} --headless"
print(f"執行命令: {cmd}")
!{cmd}
完成子任務標註後,可以通過以下命令,查看人工演示的數據:
# 使用LiveStream預覽已標註數據集
os.environ["ACCEPT_EULA"] = "Y"
cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /isaac-sim/python.sh /workspace/isaaclab/scripts/tools/replay_demos.py --dataset_file /mnt/data/isaac_tmp/dataset/annotated_dataset.hdf5 --task Isaac-Stack-Cube-Franka-IK-Rel-v0 --num_envs 1 --livestream 1"
print(f"執行命令: {cmd}")
!{cmd}
視頻演示 >>
數據擴增
使用Isaac Lab Mimic功能,從少量已標註的專家演示中自動擴增額外的演示數據。用户可以調節命令中的num\_envs參數,以增加或減少並行環境數量,充分利用顯存。還可以調節generation\_num\_trials,以調節希望擴增的演示數據數量。例如,可以設置num\_envs為8,generation\_num\_trials為1000,這樣可以以8份數據為一組,不斷重複,直至得到1000份數據。
# 使用Isaac Lab Mimic生成數據集
os.environ["ACCEPT_EULA"] = "Y"
cmd = f"PUBLIC_IP=$(curl -s ifconfig.me) /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/isaaclab_mimic/generate_dataset.py \
--enable_cameras --headless --num_envs 8 --generation_num_trials 1000 \
--input_file {annotated_dataset_path} --output_file {mimic_dataset_path} \
--task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0 --livestream 1"
print(f"執行命令: {cmd}")
!{cmd}
視頻演示 >>
對於規模較大的生成或者mimic任務,也可以利用DLC拉起離線仿真任務;此外,DLC也提供了對Ray的支持,我們也提供了示例,使用DLC-Ray拉起多任務,實現計算資源的編排和充分使用。
# 創建DLC作業。
create_job_resp = dlc_client.create_job(CreateJobRequest().from_map({
'WorkspaceId': workspace_id,
'DisplayName': display_name,
'JobType': 'RayJob',
'ResourceId': resource_quota_id,
'JobSpecs': [
{
#假設我們可用資源是8GPU * 128CPU * 1024G內存,可以按照如下分配head和worker
"Type": "Head",
"Image": image_uri,
"PodCount": 1,
"ResourceConfig": {
"CPU": "8", # 指定CPU核心數
"Memory": "32Gi", # 指定內存大小(單位:GB)
"GPU": "0" # Head節點無需GPU,因此設置為0
}
},
{
"Type": "Worker",
"Image": image_uri,
"PodCount": 2, #模擬兩個worker node
"ResourceConfig": {
"CPU": "56", # 指定CPU核心數
"Memory": "448Gi", # 指定內存大小(單位:GB)
"GPU": "4" # 指定GPU數量,與UserCommand中的--num_per_worker一致,以保證一個任務使用一個gpu
}
},
],
'DataSources': [
{
"DataSourceId": dataset_id,
"MountPath": "/mnt/data", # 掛載路徑
},
{
"DataSourceId": pub_dataset_id,
"MountPath": "/mnt/isaac_assets", # 掛載路徑
}
],
'UserVpc': {
"VpcId": vpc_id, # 替換為實際 VPC ID
"SwitchId": switch_id, # 替換為實際交換機 ID
"SecurityGroupId": security_groupid # 替換為實際安全組 ID
},
# 根據可用資源以及任務,示例中為每個任務分配1gpu * 14cpu * 96G內存,由於每個worker我們設置了4張卡,因此可以啓動4個任務
"UserCommand": f"""
/workspace/isaaclab/isaaclab.sh -p /mnt/data/isaac_tmp/ray_isaac_new.py \
--command "/workspace/isaaclab/isaaclab.sh -p /mnt/data/isaac_tmp/generate_dataset_ray.py \
--enable_cameras --headless --num_envs 8 --generation_num_trials 125 \
--input_file {annotated_dataset_path} --output_file {mimic_dataset_path} \
--task Isaac-Stack-Cube-Franka-IK-Rel-Visuomotor-Cosmos-Mimic-v0 \
--asset_path {asset_root_dir}" \
--gpu 1 \
--cpu 14 \
--memory 96 \
--num_per_worker 4
"""
}))
job_id = create_job_resp.body.job_id
wait_for_job_to_terminate(dlc_client, job_id)
數據增強
通過Isaac Lab提供的格式轉換功能,可以提取hdf5文件中記錄的視頻幀,將其轉換為mp4文件:
cmd = f"/workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/tools/hdf5_to_mp4.py \
--input_file {input_file} \
--output_dir {output_dir} \
--input_keys {' '.join(input_keys)}"
print(f"執行命令: {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
不過,仿真得到的mp4文件通常不會十分逼真,例如:
視頻演示 >>
我們可以通過cosmos-transer1-7b模型將其增強。
首先在Model Gallery中找到cosmos-transfer1-7b模型並將其部署:
在完成部署後,通過以下代碼可以調用cosmos服務,從而增強擴增的數據:
import json
from pathlib import Path
import os
import shutil
import requests
import gradio_client.client as gradio_client
import gradio_client.utils as gradio_utils
def load_prompts_from_file(prompts_file_path):
"""從文件中加載提示詞。"""
if not prompts_file_path.exists():
return []
with open(prompts_file_path, 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip()]
def create_cosmos_request(input_video_path, prompt=None, depth_control_path=None, seg_control_path=None):
"""動態創建Cosmos請求參數。"""
request = {
"vis": {
"control_weight": 0.1,
},
"edge": {
"control_weight": 0.3,
},
"depth": {
"control_weight": 0.6,
"input_control": depth_control_path,
},
"seg": {
"control_weight": 0.7,
"input_control": seg_control_path,
},
"input_video_path": input_video_path,
"negative_prompt": "The video captures a game playing, with bad crappy graphics and cartoonish frames. It represents a recording of old outdated games. The lighting looks very fake. The textures are very raw and basic. The geometries are very primitive. The images are very pixelated and of poor CG quality. There are many subtitles in the footage. Overall, the video is unrealistic at all.",
"prompt": prompt,
"sigma_max": 50.0,
}
return request
def cosmos_sync_with_upload(client, input_video_path, output_dir, prompt, depth_video_path=None, seg_video_path=None):
"""上傳文件,調用API進行增強,並下載結果。"""
def upload_file(filepath):
if not filepath or not filepath.exists():
return None
print(f" Uploading: {filepath.name}")
file_descriptor = gradio_utils.handle_file(str(filepath))
upload_result_str = client.predict(file_descriptor, api_name="/upload_file")
return json.loads(upload_result_str).get("path")
remote_main_video_path = upload_file(input_video_path)
if not remote_main_video_path:
print(f" 主視頻文件上傳失敗: {input_video_path.name}")
return False, "主視頻上傳失敗"
remote_depth_path = upload_file(depth_video_path)
remote_seg_path = upload_file(seg_video_path)
request_dict = create_cosmos_request(remote_main_video_path, prompt, remote_depth_path, remote_seg_path)
print(" Sending generation request...")
result = client.predict(json.dumps(request_dict), api_name="/generate_video")
if result and isinstance(result, tuple) and len(result) >= 2:
video_info, message = result
print(video_info, message)
if isinstance(video_info, dict) and "video" in video_info:
video_path = video_info["video"]
if os.path.exists(video_path):
output_file = Path(output_dir) / f"{Path(input_video_path).name}"
import shutil
shutil.copy2(video_path, output_file)
return True, str(output_file)
if video_path.startswith(("http://", "https://")):
import requests
resp = requests.get(video_path, stream=True)
output_file = Path(output_dir) / f"cosmos_{Path(input_video_path).name}"
with open(output_file, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True, str(output_file)
def run_cosmos_augmentation_direct(mp4_input_dir, cosmos_output_dir, prompts_file_path):
"""
主執行函數:遍歷輸入目錄,為每個主視頻調用增強服務。
"""
input_dir = Path(mp4_input_dir)
output_dir = Path(cosmos_output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
main_video_files = sorted(list(input_dir.glob("*cam.mp4")))
if not main_video_files:
print(f"在目錄 {input_dir} 中未找到任何 '*cam.mp4' 文件。")
return 0, []
print(f"找到 {len(main_video_files)} 個主視頻文件待處理。")
prompts = load_prompts_from_file(prompts_file_path)
client = gradio_client.Client(COSMOS_SERVICE_URL, hf_token=EAS_TOKEN)
success_count = 0
failed_files = []
for idx, main_video_path in enumerate(main_video_files, 1):
print("-" * 50)
print(f"[{idx}/{len(main_video_files)}] Processing: {main_video_path.name}")
base_name = main_video_path.name.replace("_cam.mp4", "")
depth_path = main_video_path.with_name(f"{base_name}_cam_depth.mp4")
seg_path = main_video_path.with_name(f"{base_name}_cam_shaded_segmentation.mp4")
depth_path = depth_path if depth_path.exists() else None
seg_path = seg_path if seg_path.exists() else None
prompt = prompts[(idx-1) % len(prompts)]
ok, result_info = cosmos_sync_with_upload(
client, main_video_path, output_dir, prompt,
depth_video_path=depth_path, seg_video_path=seg_path
)
if ok:
success_count += 1
print(result_info)
else:
failed_files.append(main_video_path.name)
print("\n" + "="*20 + " 處理完成統計 " + "="*20)
print(f"成功: {success_count}/{len(main_video_files)}")
print(f"失敗: {len(failed_files)}")
if failed_files:
print("失敗文件列表:", failed_files)
return success_count, failed_files
# --- 程序入口 ---
if __name__ == "__main__":
if not mp4_output_dir.exists():
print(f"錯誤:輸入目錄 {mp4_output_dir} 不存在!")
else:
print("開始進行Cosmos視覺增強...")
success_count, _ = run_cosmos_augmentation_direct(
# mp4_output_dir,
"/mnt/data/isaac_tmp/dataset/mimic_dataset_1k_mp4_backup",
cosmos_output_dir,
prompts_file
)
print(f"Cosmos增強流程結束!共成功處理 {success_count} 個文件。")
完成增強後的視頻文件可以得到更逼真的視覺效果:
視頻演示 >>
模仿學習
在本最佳實踐中,使用簡單的BC-RNN模型作為模仿學習的示例:
def train_model(task_name, dataset_path, model_name):
"""
訓練視覺運動BC代理
"""
cmd = f"cd {workspace_dir} && /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/train.py \
--task {task_name} --algo bc \
--dataset {dataset_path} \
--name {model_name}"
print(f"訓練模型: {cmd}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
return f"logs/robomimic/{task_name}/{model_name}"
# 選擇要使用的數據集
if merged_dataset_path.exists():
training_dataset = merged_dataset_path
print(f"使用合併數據集進行訓練: {training_dataset}")
elif cosmos_dataset_path.exists():
training_dataset = cosmos_dataset_path
print(f"使用Cosmos數據集進行訓練: {training_dataset}")
elif mimic_dataset_path.exists():
training_dataset = mimic_dataset_path
print(f"使用Mimic數據集進行訓練: {training_dataset}")
else:
print("沒有可用的訓練數據集")
training_dataset = None
if training_dataset:
model_dir = train_model(task_name, training_dataset, model_name)
print(f"模型訓練完成,保存在: {model_dir}")
同樣,也可以使用DLC啓動分佈式任務來提高訓練效率:
# 創建DLC作業。
create_job_resp = dlc_client.create_job(CreateJobRequest().from_map({
'WorkspaceId': workspace_id,
'DisplayName': display_name,
'JobType': 'PyTorchJob',
'ResourceId': resource_quota_id,
'JobSpecs': [
{
"Type": "Master",
"Image": image_uri,
"PodCount": 1,
# "EcsSpec": ecs_spec,
"ResourceConfig": {
"CPU": "48", # 指定CPU核心數
"Memory": "256Gi", # 指定內存大小(單位:GB)
"GPU": "2" # 指定GPU數量,與UserCommand中的--num_per_worker一致,以保證一個任務使用一個gpu
}
},
],
'DataSources': [
{
"DataSourceId": dataset_id,
"MountPath": "/mnt/data", # 掛載路徑
},
{
"DataSourceId": pub_dataset_id,
"MountPath": "/mnt/isaac_assets", # 掛載路徑
}
],
'UserVpc': {
"VpcId": vpc_id, # 替換為實際 VPC ID
"SwitchId": switch_id, # 替換為實際交換機 ID
"SecurityGroupId": security_groupid # 替換為實際安全組 ID
},
"UserCommand": f"cd {workspace_dir} && \
/workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/train.py \
--task {task_name} --algo bc \
--dataset {dataset_path} \
--name {model_name} && \
sleep 30",
}))
job_id = create_job_resp.body.job_id
wait_for_job_to_terminate(dlc_client, job_id)
完成訓練後,BC-RNN模型即可獲得模仿人工演示任務的能力:
模型測評
可以使用以下代碼,在不同的環境條件變化下,測試不同模型模仿人工演示任務的成功率
import glob
def evaluate_model(task_name, model_dir, log_dir, num_rollouts=15):
"""
評估最新的訓練權重
"""
timestamp_dirs = glob.glob(f"{model_dir}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]")
timestamp_dirs.sort(key=lambda x: os.path.getmtime(x), reverse=True)
latest_dir = timestamp_dirs[0]
model_dir_path = f"{latest_dir}/models"
cmd = f"cd {workspace_dir} && /workspace/isaaclab/isaaclab.sh -p /workspace/isaaclab/scripts/imitation_learning/robomimic/robust_eval.py \
--task {task_name} \
--input_dir {model_dir_path} \
--log_dir {log_dir} \
--log_file result \
--enable_cameras \
--livestream 1 \
--seeds 0 \
--num_rollouts {num_rollouts} \
--headless"
print(f"評估模型: {cmd}")
print("\n 注意: 評估過程可能需要很長時間(超過一天),這是正常現象。")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
# 評估設置説明
evaluation_settings = {
"Vanilla": "與Mimic數據生成時完全相同的設置",
"Light Intensity": "光照強度/亮度變化,其他方面保持不變",
"Light Color": "光照顏色變化,其他方面保持不變",
"Light Texture (Background)": "光照紋理/背景變化,其他方面保持不變",
"Table Texture": "桌面視覺紋理變化,其他方面保持不變",
"Robot Arm Texture": "機器人手臂視覺紋理變化,其他方面保持不變"
}
print("評估設置説明:")
for setting, description in evaluation_settings.items():
print(f"- {setting}: {description}")
# 執行評估(如果模型已訓練)
# 假如您希望使用預訓練的模型,請取消下列路徑其中之一的註釋
# model_name = "franka_stack_mimic_1k_table_only"
# model_name = "franka_stack_mimic_2k_table_only"
# model_name = "franka_stack_mimic_cosmos_2k_table_only"
model_dir = f"{workspace_dir}/logs/robomimic/{task_name}/{model_name}"
if Path(model_dir).exists():
log_dir = f"robust_results/{model_name}"
evaluate_model(task_name, model_dir, log_dir)
else:
print("模型目錄不存在,跳過評估步驟")
如果一切正常,可以得到以下測試結果:
測試結果顯示,在大部分環境條件變化下,使用數據擴增+數據增強訓練得到的模型,總能得到最好的結果。尤其在光照強度、光照顏色和光照紋理變化的條件下,使用數據擴增+數據增強得到的模型,成功率得到了大幅提升。
總結
在本最佳實踐中,基於阿里雲 PAI 平台的特性,我們實現了基於Isaac仿真的操作動作數據擴增與模仿學習,包含從人工少量演示、數據擴增、數據增強、模仿學習再到模型測評的端到端實現:
- 人工演示:在仿真環境下遙控機械臂本體,進行任務演示數據的記錄
- 數據擴增:使用 Isaac Lab Mimic 從少量專家演示生成大規模仿真數據集
- 數據增強:通過 Cosmos-Transfer1 對視頻數據進行增強,提升數據多樣性
- 模仿學習:基於 robomimic 訓練 BC-RNN 視覺運動策略模型
Cosmos 數據增強後的訓練模型在各個場景下的成功率均有較高提升,這一工作流程為具身智能的機器人視覺操作提供了一套完整的技術解決方案,在 sim2real 的訓練過程中提高了模型在複雜視覺環境下的泛化能力。