-
Notifications
You must be signed in to change notification settings - Fork 190
Add ROS2 Action support (client + server) #178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main-ros2
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,193 @@ | ||||||||||||||||||||
| # Copyright 2020 Unity Technologies | ||||||||||||||||||||
| # Copyright 2026 gd-ros-tcp-connector contributors | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||
| # you may not use this file except in compliance with the License. | ||||||||||||||||||||
| # You may obtain a copy of the License at | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||
| # See the License for the specific language governing permissions and | ||||||||||||||||||||
| # limitations under the License. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| """ | ||||||||||||||||||||
| Thin wrapper around ``rclpy.action.ActionClient`` that bridges the | ||||||||||||||||||||
| ROS-TCP-Endpoint wire protocol to real ROS 2 Action servers. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| A plain ``rclpy.create_client()`` cannot discover Action endpoints | ||||||||||||||||||||
| because they use a different DDS endpoint kind / QoS profile than | ||||||||||||||||||||
| regular services. This module solves the problem by creating a real | ||||||||||||||||||||
| ``ActionClient`` and exposing ``send_goal`` / ``get_result`` / | ||||||||||||||||||||
| ``cancel_goal`` as methods the TCP server can call with serialized | ||||||||||||||||||||
| CDR payloads. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import re | ||||||||||||||||||||
| import threading | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import rclpy | ||||||||||||||||||||
| from rclpy.action import ActionClient | ||||||||||||||||||||
| from rclpy.serialization import deserialize_message, serialize_message | ||||||||||||||||||||
|
|
||||||||||||||||||||
| from .communication import RosSender | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| class RosActionClient(RosSender): | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| Manages one ``rclpy.action.ActionClient`` for a single action name. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def __init__(self, action_name, action_class, tcp_server): | ||||||||||||||||||||
| stripped = re.sub("[^A-Za-z0-9_]+", "", action_name) | ||||||||||||||||||||
| node_name = f"{stripped}_RosActionClient" | ||||||||||||||||||||
| RosSender.__init__(self, node_name) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| self.action_name = action_name | ||||||||||||||||||||
| self.action_class = action_class | ||||||||||||||||||||
| self.tcp_server = tcp_server | ||||||||||||||||||||
|
|
||||||||||||||||||||
| self._action_client = ActionClient(self, action_class, action_name) | ||||||||||||||||||||
| self._goal_handles = {} # goal_uuid_bytes -> ClientGoalHandle | ||||||||||||||||||||
| self._goal_handles_lock = threading.Lock() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def send_goal(self, goal_data): | ||||||||||||||||||||
| """Send a goal. *goal_data* is CDR-serialized Goal body bytes. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Returns the CDR-serialized SendGoal_Response (accepted + stamp), | ||||||||||||||||||||
| or None on failure. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| import time | ||||||||||||||||||||
|
|
||||||||||||||||||||
| goal_msg = deserialize_message( | ||||||||||||||||||||
| goal_data, self.action_class.Goal) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # The node is already added to the TcpServer's | ||||||||||||||||||||
| # MultiThreadedExecutor, so DDS discovery proceeds on the | ||||||||||||||||||||
| # executor thread. We just poll server_is_ready() here; do NOT | ||||||||||||||||||||
| # call rclpy.spin_once(self) because the executor already owns | ||||||||||||||||||||
| # this node and double-spinning causes deadlocks. | ||||||||||||||||||||
| server_ready = False | ||||||||||||||||||||
| for _ in range(100): # 100 x 0.1s = 10s max | ||||||||||||||||||||
| if self._action_client.server_is_ready(): | ||||||||||||||||||||
| server_ready = True | ||||||||||||||||||||
| break | ||||||||||||||||||||
| time.sleep(0.1) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if not server_ready: | ||||||||||||||||||||
| self.get_logger().error( | ||||||||||||||||||||
| f"Action server {self.action_name} not available " | ||||||||||||||||||||
| f"(waited 10s — is the server running?)") | ||||||||||||||||||||
| return None | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Do NOT pass feedback_callback here. The client subscribes to the | ||||||||||||||||||||
| # feedback topic via __subscribe, which creates a RosSubscriber | ||||||||||||||||||||
| # that forwards feedback through the normal topic path. If we | ||||||||||||||||||||
| # ALSO forwarded feedback from _on_feedback, every feedback | ||||||||||||||||||||
| # would arrive twice on the client side. | ||||||||||||||||||||
| future = self._action_client.send_goal_async(goal_msg) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Wait for the send_goal future. Again, the executor is | ||||||||||||||||||||
| # spinning on another thread, so we just poll the future. | ||||||||||||||||||||
| for _ in range(100): # 10s max | ||||||||||||||||||||
| if future.done(): | ||||||||||||||||||||
| break | ||||||||||||||||||||
| time.sleep(0.1) | ||||||||||||||||||||
| goal_handle = future.result() | ||||||||||||||||||||
| if goal_handle is None: | ||||||||||||||||||||
| return None | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
Comment on lines
+90
to
+101
|
||||||||||||||||||||
| # Build a SendGoal_Response to send back to the client. | ||||||||||||||||||||
| # rclpy's ActionClient generates its own UUID for the goal | ||||||||||||||||||||
| # (ignoring any UUID we might have set), so we need to tell | ||||||||||||||||||||
| # the client which UUID was actually used. We prepend the 16-byte | ||||||||||||||||||||
| # UUID to the CDR-serialized SendGoal_Response; the client strips | ||||||||||||||||||||
| # the first 16 bytes before deserializing. | ||||||||||||||||||||
| response_class = self.action_class.Impl.SendGoalService.Response | ||||||||||||||||||||
| resp = response_class() | ||||||||||||||||||||
| resp.accepted = goal_handle.accepted | ||||||||||||||||||||
| resp.stamp = goal_handle.stamp if hasattr(goal_handle, 'stamp') else resp.stamp | ||||||||||||||||||||
|
|
||||||||||||||||||||
| goal_uuid = bytes(goal_handle.goal_id.uuid) | ||||||||||||||||||||
| if goal_handle.accepted: | ||||||||||||||||||||
| with self._goal_handles_lock: | ||||||||||||||||||||
| self._goal_handles[goal_uuid] = goal_handle | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return goal_uuid + serialize_message(resp) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def get_result(self, goal_id_data): | ||||||||||||||||||||
| """Request the result for a goal. *goal_id_data* is CDR-serialized | ||||||||||||||||||||
| GetResult_Request bytes (contains the goal UUID). | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Returns CDR-serialized GetResult_Response, or None on failure. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| request_class = self.action_class.Impl.GetResultService.Request | ||||||||||||||||||||
| req = deserialize_message(goal_id_data, request_class) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| key = bytes(req.goal_id.uuid) | ||||||||||||||||||||
| with self._goal_handles_lock: | ||||||||||||||||||||
| goal_handle = self._goal_handles.get(key) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if goal_handle is None: | ||||||||||||||||||||
| self.get_logger().error( | ||||||||||||||||||||
| f"get_result: no goal handle for UUID {key.hex()}") | ||||||||||||||||||||
| return None | ||||||||||||||||||||
|
|
||||||||||||||||||||
| import time | ||||||||||||||||||||
| result_future = goal_handle.get_result_async() | ||||||||||||||||||||
| # Poll instead of spin_until_future_complete — executor owns us. | ||||||||||||||||||||
| for _ in range(3000): # 300s max | ||||||||||||||||||||
| if result_future.done(): | ||||||||||||||||||||
| break | ||||||||||||||||||||
| time.sleep(0.1) | ||||||||||||||||||||
|
||||||||||||||||||||
| time.sleep(0.1) | |
| time.sleep(0.1) | |
| if not result_future.done(): | |
| self.get_logger().error( | |
| f"get_result: timed out waiting for result for UUID {key.hex()}") | |
| with self._goal_handles_lock: | |
| self._goal_handles.pop(key, None) | |
| return None |
Copilot
AI
Apr 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In cancel_goal, you poll cancel_future for up to 10s but then call cancel_future.result() regardless of completion. If it didn't complete, this will block indefinitely. Please check cancel_future.done() and handle the timeout path before calling result().
| time.sleep(0.1) | |
| time.sleep(0.1) | |
| if not cancel_future.done(): | |
| self.get_logger().error( | |
| f"cancel_goal: timed out waiting for cancel response for UUID {key.hex()}") | |
| return None |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,150 @@ | ||||||
| # Copyright 2020 Unity Technologies | ||||||
| # Copyright 2026 gd-ros-tcp-connector contributors | ||||||
| # | ||||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
| # you may not use this file except in compliance with the License. | ||||||
| # You may obtain a copy of the License at | ||||||
| # | ||||||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| # | ||||||
| # Unless required by applicable law or agreed to in writing, software | ||||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
| # See the License for the specific language governing permissions and | ||||||
| # limitations under the License. | ||||||
|
|
||||||
| """ | ||||||
| Bridge a client-implemented Action server through ROS-TCP-Endpoint. | ||||||
|
|
||||||
| When a ROS 2 action client sends a goal, the execute_callback | ||||||
| forwards it to the client using the same __request/__response two-frame | ||||||
| pair that regular unity_service uses. The client computes and sends the | ||||||
| result back; feedback is pushed from the client via a separate | ||||||
| __action_publish_feedback syscommand. | ||||||
| """ | ||||||
|
|
||||||
| import re | ||||||
| import threading | ||||||
| import time | ||||||
|
|
||||||
| from rclpy.action import ActionServer, CancelResponse, GoalResponse | ||||||
| from rclpy.serialization import deserialize_message, serialize_message | ||||||
|
|
||||||
|
Comment on lines
+26
to
+32
|
||||||
| from .communication import RosSender | ||||||
|
|
||||||
|
|
||||||
| class RosActionServer(RosSender): | ||||||
|
|
||||||
| def __init__(self, action_name, action_class, tcp_server): | ||||||
| stripped = re.sub("[^A-Za-z0-9_]+", "", action_name) | ||||||
| node_name = f"{stripped}_RosActionServer" | ||||||
| RosSender.__init__(self, node_name) | ||||||
|
|
||||||
| self.action_name = action_name | ||||||
| self.action_class = action_class | ||||||
| self.tcp_server = tcp_server | ||||||
|
|
||||||
| # goal_uuid bytes -> ServerGoalHandle (for feedback publishing) | ||||||
| self._goal_handles = {} | ||||||
| self._lock = threading.Lock() | ||||||
|
|
||||||
| self._action_server = ActionServer( | ||||||
| self, | ||||||
| action_class, | ||||||
| action_name, | ||||||
| execute_callback=self._execute_callback, | ||||||
| goal_callback=lambda _: GoalResponse.ACCEPT, | ||||||
| cancel_callback=lambda _: CancelResponse.ACCEPT, | ||||||
| ) | ||||||
|
|
||||||
| def _execute_callback(self, goal_handle): | ||||||
| """Forward the goal to the client and wait for the result.""" | ||||||
| goal_uuid = bytes(goal_handle.goal_id.uuid) | ||||||
|
|
||||||
| with self._lock: | ||||||
| self._goal_handles[goal_uuid] = goal_handle | ||||||
|
|
||||||
| # Serialize the Goal body. | ||||||
| goal_msg = goal_handle.request | ||||||
| goal_cdr = serialize_message(goal_msg) | ||||||
|
|
||||||
| # Send the goal to the client using the __request / __response | ||||||
| # mechanism. send_unity_service_request blocks until the client | ||||||
| # responds. We use the action_name as the topic/destination | ||||||
| # so the client.s action server can route it. | ||||||
|
||||||
| # so the client.s action server can route it. | |
| # so the client's action server can route it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import rclpyis unused in this module and will fail the repo'sament_flake8check. Please remove it (or use it if needed).