ridlc 开发手册

本文档描述 ridlc 的代码生成规范与通信实现,供实现者与开发者参考。RIDL 语义定义见 RFC001

RIDL 语法速览(关键词、文件规则、命名等)见 RFC001 §3。

类型引用简写import pkg/msg/Name 后,字段类型可使用短名 Name,也可使用全名 pkg/msg/Name;若多个 import 的末段相同则需用全名避免歧义。详见 RFC001 §5.1。

LLM/Agent 友好注释@desc("...") 用于接口或字段,描述用途、含义与格式。会写入 DESCRIPTOR.annotations,供 AI 理解接口、正确构造请求。详见 RFC001 §5.2。

业务逻辑补全:生成代码只提供 channel 注册、ROS 绑定和工厂函数;用户需在指定位置补全回调或业务逻辑。§5 详细说明每种原语下在哪里补全补全什么如何调用,是编写 server/client/stream 代码的核心参考。


0. 文件与接口约定

  • 一文件一接口:每个 .ridl 文件只定义一个接口。
  • 接口名 = 文件名:接口名必须与文件名(不含扩展名)一致,如 depth.ridl 定义 stream depthmove.ridl 定义 command move。类似 Java:一个文件一个 public 类,类名与文件名一致。
  • 接口语法command/stream/query 接口名 [注解...] { 字段... };字段为 方向 字段名 类型 [注解...];。注解可多个,如 @desc@frame。完整语法见 RFC001 §3.1

1. 通信实现映射

当前 ridlc 采用 ROS 2 作为后端。各原语与 ROS 2 / gRPC 的映射如下:

原语ROS 2gRPC(可选实现)
streamtopicserver/client streaming
commandaction双向流或 server streaming
queryserviceunary RPC

2. 代码生成规范

2.1 命名规律

原语生成文件Python 工厂Rust 模块
query{name}_query.pycreate_{name}_client, create_{name}_server{name}_query
stream{name}_stream.pycreate_{name}_publisher, create_{name}_subscriber{name}_stream
command{name}_command.pycreate_{name}_client, create_{name}_server{name}_command

2.2 命名空间映射

  • robonix/a/b/c → Python 包 robonix.a.b.c
  • robonix/a/b/c → Rust 模块 robonix_a_b_c
  • ROS 2 类型名:namespace 去掉 robonix/ 后按层级 PascalCase 拼接 + 接口名,如 robonix/system/debug + pingSystemDebugPing

3. 运行时与 channel

生成代码通过 gRPC meta API 完成 Register*/Resolve* 调用。channel 由 robonix-server 分配,生成代码据此创建 ROS service/topic/action 并绑定。


4. 注解

RIDL 支持的注解(如 @desc@frame@requires_interface@interruptible)见 RFC001 §5.2。ridlc 解析后传入 codegen,具体语义由生成代码与运行时实现。


5. 用户逻辑补全(Python)

生成代码提供工厂函数和基类,用户需补全「回调」或「业务逻辑」。下表说明每种通信模式下,在哪里补全补全什么

5.1 Query(请求-响应)

角色补全位置补全方式说明
Serverserver.start(handler)传入 handler 函数每次收到请求时调用
Client无回调直接调用 client.call(request)同步阻塞,返回 Response \| None

Server 补全示例

server = create_semantic_query_server(runtime_client, node_id=node_id)

def handler(request, response):
    # request: 服务 Request 类型;response: 服务 Response 类型;需返回 response
    response.objects = [...]  # 填充 response
    return response

server.start(handler)   # 传入 handler
rclpy.spin(server)

Client 补全示例

client = create_semantic_query_client(runtime_client, requester_id=..., target=...)

request = client._srv_type.Request()
request.filter.data = "room"

response = client.call(request)
if response is None:
    raise RuntimeError("call failed or timed out")
for obj in response.objects:
    print(obj.id, obj.label)

5.2 Stream(发布/订阅)

角色补全位置补全方式说明
Publisher定时器或业务逻辑调用 publisher.publish(msg)构造消息并发布
Subscribersubscriber.start(callback)传入 callback(msg) 函数每次收到消息时调用

Publisher 补全示例

publisher = create_pose_cov_publisher(runtime_client, node_id=node_id)

def publish_once():
    msg = publisher._msg_type()
    msg.header.frame_id = "map"
    msg.pose.pose.position.x = 1.25
    publisher.publish(msg)   # 调用 publish

timer = publisher.create_timer(0.5, publish_once)
rclpy.spin(publisher)

Subscriber 补全示例

subscriber = create_pose_cov_subscriber(runtime_client, requester_id=..., target=...)

def on_msg(msg):
    # msg: 消息类型(如 PoseWithCovarianceStamped)
    print(msg.pose.pose.position)

subscriber.start(on_msg)   # 传入 callback
rclpy.spin(subscriber)

5.3 Command(动作/技能)

Command 有 input/output/result 三个字段,对应 ROS2 Action 的 Goal/Feedback/Result。

角色补全位置补全方式说明
Serverserver.execute赋值 server.execute = your_execute 或子类重写处理 Goal,可选发 Feedback,返回 Result
Client无回调(简单用法)调用 client.send(request)goal_handleget_result_async()rclpy.spin_until_future_complete
Clientsend_goal_async(..., feedback_callback=...)传入 feedback_callback 接收 output接收执行过程中的 feedback

Server 补全示例

server = create_greet_server(runtime_client, node_id=node_id)

def execute(request, goal_handle):
    # request: Action.Goal;goal_handle: ServerGoalHandle | None
    # output (feedback): 可选,通过 goal_handle.publish_feedback(fb) 发送
    if goal_handle:
        fb = server._action_type.Feedback()
        fb.feedback.progress = "processing..."
        goal_handle.publish_feedback(fb)
    # result: 必须返回
    result = server._action_type.Result()
    result.response.message = f"Hello, {request.request.name}!"
    return result

server.execute = execute   # 赋值 execute
server.start()
rclpy.spin(server)

Client 补全示例(简单用法,不接收 feedback)

client = create_greet_client(runtime_client, requester_id=..., target=...)

request = client._action_type.Goal()
request.request = GreetRequest()
request.request.name = "world"

goal_handle = client.send(request)
if goal_handle is None or not goal_handle.accepted:
    raise RuntimeError("goal not accepted")

result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(client, result_future, timeout_sec=10.0)
wrapped = result_future.result()
if wrapped is None:
    raise RuntimeError("timeout waiting for result")
print(wrapped.result.response.message, wrapped.result.response.success)

Client 补全示例(接收 feedback)

client = create_greet_client(runtime_client, requester_id=..., target=...)

request = client._action_type.Goal()
request.request = GreetRequest()
request.request.name = "world"

def on_feedback(fb_msg):
    # fb_msg 为 FeedbackMessage,访问自定义 output 字段:fb_msg.feedback.feedback.<字段名>
    print("feedback:", fb_msg.feedback.feedback.progress)

goal_future = client._client.send_goal_async(request, feedback_callback=on_feedback)
rclpy.spin_until_future_complete(client, goal_future, timeout_sec=10.0)
goal_handle = goal_future.result()
if goal_handle is None or not goal_handle.accepted:
    raise RuntimeError("goal not accepted")

result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(client, result_future, timeout_sec=10.0)
wrapped = result_future.result()
# 处理 wrapped.result

Client 接收 feedback 注意feedback_callback 收到的是 FeedbackMessage(含 goal_idfeedback),不是裸 Feedback。访问自定义 output 字段需:fb_msg.feedback.feedback.<字段名>(第一个 feedback 为 FeedbackMessage 的字段,第二个为 action Feedback 中 output 字段名,如 progress)。


5.4 快速对照表

原语Server/Provider 补全Client/Consumer 补全
Queryserver.start(handler)handler(request, response) -> response构造 Requestclient.call(request) → 处理 Response \| None
Streampublisher.publish(msg)(在定时器或业务逻辑中调用)subscriber.start(callback)callback(msg) 处理消息
Commandserver.execute = fnfn(request, goal_handle) -> result;可选 goal_handle.publish_feedback(fb)构造 Goalclient.send(request)send_goal_async(..., feedback_callback=on_fb)goal_handle.get_result_async()rclpy.spin_until_future_completewrapped.result;feedback 回调内用 fb_msg.feedback.feedback.<字段名> 访问 output