逻辑流控制
本模块提供任务脚本的逻辑流控制接口,由任务引擎在运行时自动注入到脚本执行环境中,无需显式 import 即可直接使用。
Note
这些函数仅在通过 /sdk/execute_task 执行的任务脚本中可用。
Warning
执行顺序说明:任务模板采用”步骤提取”机制,以下函数均为可暂停步骤,会按脚本书写顺序依次执行:
navigation_to_location/navigation_to_pose/wait_for_navigationrun_scriptsleeppause_application/resume_application/stop_application
普通 Python 代码(赋值、import、条件判断等)在步骤执行**之前**统一运行(掩码阶段),请注意避免将业务逻辑混写在步骤之间。
—
子脚本调用
- run_script(source, parameters=None)
同步执行子脚本,阻塞直到子脚本完成。
子脚本在父脚本 globals 的浅拷贝命名空间中执行,可访问所有已预加载的 lowlevel_skills / highlevel_skills API,但子脚本的变量写入不影响父命名空间。
- Parameters:
- Raises:
ScriptError – 子脚本内发生异常时抛出,
original_exc属性包含原始异常。FileNotFoundError – 脚本文件不存在。
ValueError – 路径穿越或路径不合法。
# 串行执行巡检子脚本 run_script("patrol_task") # 传递参数给子脚本 run_script("patrol_task", parameters={"zone": "A", "speed": 0.5}) # 子脚本内通过 parameters 访问 # (patrol_task.py 内容) # zone = parameters["zone"] # navigation_to_location(zone)
- run_parallel_script(source, parameters=None)
在后台 daemon 线程中异步执行子脚本,立即返回
ThreadHandle,不阻塞主脚本。后台线程受协作式 tracer 控制:主任务暂停时后台线程同步暂停;线程异常只写 stderr,不影响主脚本执行。
- Parameters:
- Returns:
后台线程句柄。
- Return type:
# 启动后台监控,主脚本继续执行 handle = run_parallel_script("monitor_battery") navigation_to_location("PointA") navigation_to_location("PointB") handle.join() # 可选:等待监控线程结束
—
生命周期控制
- pause_application()
在当前代码位置暂停任务执行,等待外部
/sdk/resume_task信号后继续。实现机制与外部调用
/sdk/pause_task完全等价(均写入控制文件),C++ 引擎会感知到 PAUSED 状态。Tip
建议在两个操作**之间**调用(如两次导航中间),避免在导航进行中调用。
navigation_to_location("room_A") pause_application() # 在此等待人工确认 navigation_to_location("room_B")
- resume_application()
写入 resume 信号,触发当前暂停中的任务继续执行。
典型用法:从后台线程中唤醒主脚本,实现”等待条件满足后继续”的模式。
# watchdog.py(后台脚本) # 检测到条件后唤醒主脚本 if condition_met: resume_application() # 主脚本 handle = run_parallel_script("watchdog") pause_application() # 等待 watchdog 唤醒 navigation_to_location("next_point")
- stop_application()
立即终止当前任务(含所有后台线程),干净退出。
执行流程:
向所有未完成的后台线程发送取消信号(设置 stop_event)
抛出
SystemExit(0),穿透步骤循环到达 finally 清理块引擎标记任务为 STOPPED 状态
battery = get_battery_state() if battery.response.percentage < 10: stop_application() navigation_to_location("PointA")
—
数据类型
- class ThreadHandle
后台脚本线程句柄,由
run_parallel_script()返回。- cancel()
发送取消信号到后台线程(设置 stop_event)。线程内需通过
stop_application()或自行检查信号响应取消。
- exception ScriptError
子脚本执行异常,由
run_script()在子脚本发生异常时抛出。try: run_script("risky_task") except ScriptError as e: print(f"子脚本 {e.source} 失败: {e.original_exc}")
—
完整示例
顺序巡检并在低电量时停止:
# 后台监控电量(monitor.py)
# while True:
# battery = get_battery_state()
# if battery.response.percentage < 15:
# stop_application()
# sleep(30)
# 主脚本
handle = run_parallel_script("monitor")
for zone in ["A", "B", "C"]:
run_script("patrol_zone", parameters={"zone": zone})
sleep(5)
handle.join()
需要人工确认的分段任务:
navigation_to_location("inspection_point_1")
pause_application() # 等待操作员确认检查结果
navigation_to_location("inspection_point_2")
pause_application()
navigation_to_location("home")