硬件/服务厂商接入指南
本文档说明相机、机械臂等硬件厂商以及地图等系统服务如何接入 Robonix RIDL 接口。设计参考 Android HAL:厂商实现接口子集,按需注册,无需实现全部。
各硬件类型的接口形态详见 抽象硬件原语。
1. 接入流程概览
- 查阅 RIDL 接口定义(robonix-interfaces/ridl/prm/*)
- 选择本硬件/服务支持的接口子集
- 创建 package,编写 manifest 与 entry
- 实现所选接口的 server/publisher,向 meta API 注册
- rbnx build + rbnx start 构建与运行
代码分工:ridlc 生成 create_*_server、create_*_publisher、RobonixRuntimeStub 等;你只写 main 里的连接、回调实现、挂载与启动。
必备流程:rclpy.init() → grpc.insecure_channel(endpoint) → RobonixRuntimeStub(channel) 得到 runtime_client,用于向 meta 注册 channel。所有 server/publisher 都依赖此 client。
业务逻辑补全:command 用 server.execute = callback 挂载,callback 内处理 Goal、可选 goal_handle.publish_feedback()、返回 Result;query 用 server.start(handler) 传入,handler 内根据 request 填 response;stream 在定时器/循环中 publish(msg),订阅方用 subscriber.start(callback)。详见 ridlc 开发手册 §5。
运行方式:必须通过 rbnx start 启动,不支持直接 python -m 运行。
2. 核心原则:按需实现,无需全量
与 Android HAL 类似:RIDL 的 prm/ 目录下定义了大量原语接口,厂商只需实现本硬件实际支持的接口,不必实现整个目录。
| 场景 | 需实现的接口 | 不必实现 |
|---|---|---|
| 仅 RGB 相机 | rgb, intrinsics | depth, rgbd, ir |
| 纯深度相机 | depth(+ 可选 intrinsics) | rgb, rgbd, ir |
| RGB-D 相机 | rgb, depth, rgbd, intrinsics | ir(若硬件不支持) |
| 机械臂(无夹爪) | move_ee, move_joint, state_joint, joint_trajectory | gripper 全部 |
| 机械臂 + 夹爪 | 上述 + gripper.close, gripper.open 等 | 按硬件能力 |
| 语义地图服务 | robonix/system/map/semantic_query | 其他 system 接口 |
运行时行为:node 启动时,仅对实际实现的接口调用 create_*_server / create_*_publisher 并注册。未实现的接口不会被注册,调用方解析时若目标 node 未提供该接口则失败。
可选:manifest 声明:为便于文档与工具检查,可在 manifest 中可选声明 provides(见 §6)。
3. 相机厂商接入示例
3.1 场景
某厂商提供 RGB-D 相机,支持 rgb、depth、rgbd、intrinsics,不支持 ir。
3.2 目录结构
prm_camera_vendor/
├── robonix_manifest.yaml
├── package.xml # 可选:自定义 ROS2 依赖(如 sensor_msgs)
└── prm_camera_vendor/
├── __init__.py
└── camera_node.py # entry: prm_camera_vendor.camera_node:main
3.3 manifest
manifestVersion: 1
package:
id: com.vendor.camera.rgbd
name: prm_camera_vendor
version: 0.1.0
vendor: Example Camera Co.
description: RGB-D camera driver implementing prm::camera (rgb, depth, rgbd, intrinsics)
license: MulanPSL-2.0
# 可选:声明本 package 提供的接口,便于文档与工具
# provides:
# - robonix/prm/camera/rgb
# - robonix/prm/camera/depth
# - robonix/prm/camera/rgbd
# - robonix/prm/camera/intrinsics
nodes:
- id: com.vendor.camera.rgbd
type: python
entry: prm_camera_vendor.camera_node:main
3.4 实现要点
代码分工:create_*_publisher 等由 ridlc 生成;你只需写 main 里的连接、定时/循环 publish。
必备流程:rclpy.init() → grpc.insecure_channel(endpoint) → RobonixRuntimeStub(channel) 得到 runtime_client。
stream 无挂载:直接 publisher.publish(msg),在 timer 或循环里调用即可。
多节点 spin:每个 create_*_publisher 返回独立的 rclpy Node,需用 MultiThreadedExecutor 将全部 publisher 加入 executor 后统一 spin,否则只 spin 一个节点会导致其他 publisher 无法正常工作。
# prm_camera_vendor/camera_node.py
import rclpy
from rclpy.executors import MultiThreadedExecutor
from robonix.prm.camera.rgb_stream import create_rgb_publisher
from robonix.prm.camera.depth_stream import create_depth_publisher
from robonix.prm.camera.rgbd_stream import create_rgbd_publisher
from robonix.prm.camera.intrinsics_stream import create_intrinsics_publisher
def main():
rclpy.init()
runtime_client = ... # gRPC stub
node_id = "com.vendor.camera.rgbd"
# 仅注册本硬件支持的 4 个 stream
rgb_pub = create_rgb_publisher(runtime_client, node_id)
depth_pub = create_depth_publisher(runtime_client, node_id)
rgbd_pub = create_rgbd_publisher(runtime_client, node_id)
intrinsics_pub = create_intrinsics_publisher(runtime_client, node_id)
def publish_frame():
rgb, depth = camera.capture()
rgb_pub.publish(rgb)
depth_pub.publish(depth)
rgbd_pub.publish(RGBD(rgb=rgb, depth=depth))
intrinsics_pub.publish(camera.get_intrinsics())
timer = rgb_pub.create_timer(0.5, publish_frame)
executor = MultiThreadedExecutor()
for node in (rgb_pub, depth_pub, rgbd_pub, intrinsics_pub):
executor.add_node(node)
try:
executor.spin()
finally:
timer.cancel()
executor.shutdown()
for node in (rgb_pub, depth_pub, rgbd_pub, intrinsics_pub):
node.destroy_node()
if rclpy.ok():
rclpy.shutdown()
4. 机械臂厂商接入示例
4.1 场景
某厂商提供机械臂 + 夹爪,实现 move_ee、move_joint、state_joint、joint_trajectory、gripper.close、gripper.open。
4.2 目录结构
prm_arm_vendor/
├── robonix_manifest.yaml
├── package.xml # 可选:自定义 ROS2 依赖(如 std_msgs、trajectory_msgs)
└── prm_arm_vendor/
├── __init__.py
├── arm_node.py # 机械臂 command/stream
└── gripper_node.py # 夹爪 command(可选:与 arm 同进程或分 node)
4.3 manifest(单 node 同时提供 arm + gripper)
manifestVersion: 1
package:
id: com.vendor.arm.robot_arm
name: prm_arm_vendor
version: 0.1.0
vendor: Example Arm Co.
description: Robot arm + gripper implementing prm::arm and prm::gripper
license: MulanPSL-2.0
nodes:
- id: com.vendor.arm.robot_arm
type: python
entry: prm_arm_vendor.arm_node:main
4.4 实现要点
代码分工:create_*_server、RobonixRuntimeStub 等由 ridlc 生成;你只需写 main 里的连接、回调实现、挂载与启动。
必备流程:rclpy.init() → grpc.insecure_channel(endpoint) → RobonixRuntimeStub(channel) 得到 runtime_client,用于向 meta 注册 channel。
回调挂载:command server 通过 server.execute = your_callback 挂载;start() 后收到 action 请求时会调用该回调。
多节点 spin:每个 create_*_server 返回独立的 rclpy Node,需用 MultiThreadedExecutor 将全部 server 加入 executor 后统一 spin。若只 rclpy.spin(某一个),其他 server 的 action 请求将无法被处理。
# prm_arm_vendor/arm_node.py
import rclpy
from rclpy.executors import MultiThreadedExecutor
from robonix.prm.arm.move_ee_command import create_move_ee_server
from robonix.prm.arm.joint_trajectory_command import create_joint_trajectory_server
from robonix.prm.gripper.close_command import create_close_server
from robonix.prm.gripper.open_command import create_open_server
def main():
rclpy.init()
runtime_client = ...
node_id = "com.vendor.arm.robot_arm"
# 注册 command servers
move_ee_srv = create_move_ee_server(runtime_client, node_id)
joint_traj_srv = create_joint_trajectory_server(runtime_client, node_id)
close_srv = create_close_server(runtime_client, node_id)
open_srv = create_open_server(runtime_client, node_id)
# 实现 execute 回调,调用真实硬件驱动
move_ee_srv.execute = lambda req: arm_driver.move_to_pose(req.pose)
joint_traj_srv.execute = lambda req: arm_driver.execute_trajectory(req.trajectory)
close_srv.execute = lambda req: gripper_driver.close()
open_srv.execute = lambda req: gripper_driver.open()
move_ee_srv.start()
joint_traj_srv.start()
close_srv.start()
open_srv.start()
# 可选:state_joint stream 发布关节状态,若实现则需加入 executor
# state_pub = create_state_joint_publisher(runtime_client, node_id)
executor = MultiThreadedExecutor()
nodes = (move_ee_srv, joint_traj_srv, close_srv, open_srv)
for node in nodes:
executor.add_node(node)
try:
executor.spin()
finally:
executor.shutdown()
for node in nodes:
node.destroy_node()
if rclpy.ok():
rclpy.shutdown()
5. 地图服务接入示例
5.1 场景
实现 robonix/system/map/semantic_query 的 query server,提供语义地图查询。
5.2 目录结构
map_semantic_service/
├── robonix_manifest.yaml
├── package.xml # 可选:自定义 ROS2 依赖
└── map_semantic_service/
├── __init__.py
└── semantic_server.py
5.3 manifest
manifestVersion: 1
package:
id: com.robonix.service.map_semantic
name: map_semantic_service
version: 0.1.0
vendor: robonix
description: Semantic map query service implementing robonix/system/map/semantic_query
license: MulanPSL-2.0
nodes:
- id: com.robonix.map.semantic
type: python
entry: map_semantic_service.semantic_server:main
5.4 实现要点
代码分工:create_semantic_query_server 由 ridlc 生成;你只需写 main 里的连接、handler 实现、start(handler)。
必备流程:同上(rclpy.init、grpc channel、runtime_client)。
handler 接法:query server 通过 server.start(handler) 传入,收到请求时调用 handler(request, response)。
# map_semantic_service/semantic_server.py
from robonix.system.map.semantic_query_query import create_semantic_query_server
def main():
runtime_client = ...
node_id = "com.robonix.map.semantic"
server = create_semantic_query_server(runtime_client, node_id)
def handler(request, response):
filter_str = request.filter.data
# 查地图、过滤,填充 response.objects (Object[])
response.objects = map_backend.query(filter_str)
return response
server.start(handler)
rclpy.spin(server)
6. Manifest 可选字段:provides
为便于文档与工具(如列出某 package 提供的接口),可在 manifest 中可选声明:
provides:
- robonix/prm/camera/rgb
- robonix/prm/camera/depth
- robonix/prm/camera/rgbd
- robonix/prm/camera/intrinsics
- 非强制:运行时以实际注册为准;未在 provides 中声明但已注册的接口仍可用。
- 建议:厂商填写 provides,与实现保持一致,便于集成方查阅。
7. 与 Android HAL 的类比
| Android | Robonix |
|---|---|
| HIDL 接口定义 | RIDL 接口定义(ridl/prm/, ridl/system/) |
| 厂商实现 HAL | 厂商实现 package,提供 server/publisher |
| 按能力实现(LEGACY/LIMITED/FULL) | 按硬件能力实现接口子集 |
| 通过 hw_module_t 加载 | 通过 rbnx start 启动 node,向 meta API 注册 |
| 框架通过 Binder 调用 HAL | 调用方通过 Resolve* 解析 channel 后 ROS 调用 |
8. 参考
- 示例 package:
rust/examples/prm_camera_vendor/、rust/examples/prm_arm_vendor/、rust/examples/map_semantic_service/ - Package 开发指南
- ridlc 开发手册
- RFC001: RIDL
- RFC002: Package