逻辑流控制

本模块提供任务脚本的逻辑流控制接口,由任务引擎在运行时自动注入到脚本执行环境中,无需显式 import 即可直接使用

Note

这些函数仅在通过 /sdk/execute_task 执行的任务脚本中可用。

Warning

执行顺序说明:任务模板采用”步骤提取”机制,以下函数均为可暂停步骤,会按脚本书写顺序依次执行:

  • navigation_to_location / navigation_to_pose / wait_for_navigation

  • run_script

  • sleep

  • pause_application / resume_application / stop_application

普通 Python 代码(赋值、import、条件判断等)在步骤执行**之前**统一运行(掩码阶段),请注意避免将业务逻辑混写在步骤之间。

子脚本调用

run_script(source, parameters=None)

同步执行子脚本,阻塞直到子脚本完成。

子脚本在父脚本 globals 的浅拷贝命名空间中执行,可访问所有已预加载的 lowlevel_skills / highlevel_skills API,但子脚本的变量写入不影响父命名空间。

Parameters:
  • source (str) – task_id 或相对于 $DAYSTAR_DATA_ROOT/scripts/ 的路径(无需 .py 后缀)。

  • parameters (dict | None) – 注入子脚本命名空间的参数字典,子脚本通过 parameters["key"] 访问。默认为 None

Raises:
  • ScriptError – 子脚本内发生异常时抛出,original_exc 属性包含原始异常。

  • FileNotFoundError – 脚本文件不存在。

  • ValueError – 路径穿越或路径不合法。

# 串行执行巡检子脚本
run_script("patrol_task")

# 传递参数给子脚本
run_script("patrol_task", parameters={"zone": "A", "speed": 0.5})

# 子脚本内通过 parameters 访问
# (patrol_task.py 内容)
# zone = parameters["zone"]
# navigation_to_location(zone)
run_parallel_script(source, parameters=None)

在后台 daemon 线程中异步执行子脚本,立即返回 ThreadHandle,不阻塞主脚本。

后台线程受协作式 tracer 控制:主任务暂停时后台线程同步暂停;线程异常只写 stderr,不影响主脚本执行。

Parameters:
  • source (str) – task_id 或相对于 $DAYSTAR_DATA_ROOT/scripts/ 的路径(无需 .py 后缀)。

  • parameters (dict | None) – 注入子脚本命名空间的参数字典。默认为 None

Returns:

后台线程句柄。

Return type:

ThreadHandle

# 启动后台监控,主脚本继续执行
handle = run_parallel_script("monitor_battery")

navigation_to_location("PointA")
navigation_to_location("PointB")

handle.join()  # 可选:等待监控线程结束

生命周期控制

pause_application()

在当前代码位置暂停任务执行,等待外部 /sdk/resume_task 信号后继续。

实现机制与外部调用 /sdk/pause_task 完全等价(均写入控制文件),C++ 引擎会感知到 PAUSED 状态。

Tip

建议在两个操作**之间**调用(如两次导航中间),避免在导航进行中调用。

navigation_to_location("room_A")
pause_application()   # 在此等待人工确认
navigation_to_location("room_B")
resume_application()

写入 resume 信号,触发当前暂停中的任务继续执行。

典型用法:从后台线程中唤醒主脚本,实现”等待条件满足后继续”的模式。

# watchdog.py(后台脚本)
# 检测到条件后唤醒主脚本
if condition_met:
    resume_application()

# 主脚本
handle = run_parallel_script("watchdog")
pause_application()   # 等待 watchdog 唤醒
navigation_to_location("next_point")
stop_application()

立即终止当前任务(含所有后台线程),干净退出。

执行流程:

  1. 向所有未完成的后台线程发送取消信号(设置 stop_event)

  2. 抛出 SystemExit(0),穿透步骤循环到达 finally 清理块

  3. 引擎标记任务为 STOPPED 状态

battery = get_battery_state()
if battery.response.percentage < 10:
    stop_application()

navigation_to_location("PointA")

数据类型

class ThreadHandle

后台脚本线程句柄,由 run_parallel_script() 返回。

join(timeout=None)

阻塞等待后台线程结束。

Parameters:

timeout (float | None) – 超时秒数,None 表示永久等待。

is_done()

返回后台线程是否已完成。

Returns:

线程已结束返回 True,否则返回 False

Return type:

bool

cancel()

发送取消信号到后台线程(设置 stop_event)。线程内需通过 stop_application() 或自行检查信号响应取消。

exception ScriptError

子脚本执行异常,由 run_script() 在子脚本发生异常时抛出。

source: str

发生异常的脚本名称(task_id)。

original_exc: Exception

子脚本内抛出的原始异常对象。

try:
    run_script("risky_task")
except ScriptError as e:
    print(f"子脚本 {e.source} 失败: {e.original_exc}")

完整示例

顺序巡检并在低电量时停止

# 后台监控电量(monitor.py)
# while True:
#     battery = get_battery_state()
#     if battery.response.percentage < 15:
#         stop_application()
#     sleep(30)

# 主脚本
handle = run_parallel_script("monitor")

for zone in ["A", "B", "C"]:
    run_script("patrol_zone", parameters={"zone": zone})
    sleep(5)

handle.join()

需要人工确认的分段任务

navigation_to_location("inspection_point_1")
pause_application()   # 等待操作员确认检查结果

navigation_to_location("inspection_point_2")
pause_application()

navigation_to_location("home")