Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions ros_tcp_endpoint/action_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# Copyright 2020 Unity Technologies
# Copyright 2026 gd-ros-tcp-connector contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Thin wrapper around ``rclpy.action.ActionClient`` that bridges the
ROS-TCP-Endpoint wire protocol to real ROS 2 Action servers.

A plain ``rclpy.create_client()`` cannot discover Action endpoints
because they use a different DDS endpoint kind / QoS profile than
regular services. This module solves the problem by creating a real
``ActionClient`` and exposing ``send_goal`` / ``get_result`` /
``cancel_goal`` as methods the TCP server can call with serialized
CDR payloads.
"""

import re
import threading

import rclpy
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import rclpy is unused in this module and will fail the repo's ament_flake8 check. Please remove it (or use it if needed).

Suggested change
import rclpy

Copilot uses AI. Check for mistakes.
from rclpy.action import ActionClient
from rclpy.serialization import deserialize_message, serialize_message

from .communication import RosSender


class RosActionClient(RosSender):
"""
Manages one ``rclpy.action.ActionClient`` for a single action name.
"""

def __init__(self, action_name, action_class, tcp_server):
stripped = re.sub("[^A-Za-z0-9_]+", "", action_name)
node_name = f"{stripped}_RosActionClient"
RosSender.__init__(self, node_name)

self.action_name = action_name
self.action_class = action_class
self.tcp_server = tcp_server

self._action_client = ActionClient(self, action_class, action_name)
self._goal_handles = {} # goal_uuid_bytes -> ClientGoalHandle
self._goal_handles_lock = threading.Lock()

def send_goal(self, goal_data):
"""Send a goal. *goal_data* is CDR-serialized Goal body bytes.

Returns the CDR-serialized SendGoal_Response (accepted + stamp),
or None on failure.
"""
import time

goal_msg = deserialize_message(
goal_data, self.action_class.Goal)

# The node is already added to the TcpServer's
# MultiThreadedExecutor, so DDS discovery proceeds on the
# executor thread. We just poll server_is_ready() here; do NOT
# call rclpy.spin_once(self) because the executor already owns
# this node and double-spinning causes deadlocks.
server_ready = False
for _ in range(100): # 100 x 0.1s = 10s max
if self._action_client.server_is_ready():
server_ready = True
break
time.sleep(0.1)

if not server_ready:
self.get_logger().error(
f"Action server {self.action_name} not available "
f"(waited 10s — is the server running?)")
return None

# Do NOT pass feedback_callback here. The client subscribes to the
# feedback topic via __subscribe, which creates a RosSubscriber
# that forwards feedback through the normal topic path. If we
# ALSO forwarded feedback from _on_feedback, every feedback
# would arrive twice on the client side.
future = self._action_client.send_goal_async(goal_msg)

# Wait for the send_goal future. Again, the executor is
# spinning on another thread, so we just poll the future.
for _ in range(100): # 10s max
if future.done():
break
time.sleep(0.1)
goal_handle = future.result()
if goal_handle is None:
return None

Comment on lines +90 to +101
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In send_goal, you poll the future for up to 10s but then call future.result() unconditionally. If the future never completed, result() will block indefinitely and defeats the intended timeout. Please guard with if not future.done(): and return an error/None (or cancel) on timeout before calling result().

Copilot uses AI. Check for mistakes.
# Build a SendGoal_Response to send back to the client.
# rclpy's ActionClient generates its own UUID for the goal
# (ignoring any UUID we might have set), so we need to tell
# the client which UUID was actually used. We prepend the 16-byte
# UUID to the CDR-serialized SendGoal_Response; the client strips
# the first 16 bytes before deserializing.
response_class = self.action_class.Impl.SendGoalService.Response
resp = response_class()
resp.accepted = goal_handle.accepted
resp.stamp = goal_handle.stamp if hasattr(goal_handle, 'stamp') else resp.stamp

goal_uuid = bytes(goal_handle.goal_id.uuid)
if goal_handle.accepted:
with self._goal_handles_lock:
self._goal_handles[goal_uuid] = goal_handle

return goal_uuid + serialize_message(resp)

def get_result(self, goal_id_data):
"""Request the result for a goal. *goal_id_data* is CDR-serialized
GetResult_Request bytes (contains the goal UUID).

Returns CDR-serialized GetResult_Response, or None on failure.
"""
request_class = self.action_class.Impl.GetResultService.Request
req = deserialize_message(goal_id_data, request_class)

key = bytes(req.goal_id.uuid)
with self._goal_handles_lock:
goal_handle = self._goal_handles.get(key)

if goal_handle is None:
self.get_logger().error(
f"get_result: no goal handle for UUID {key.hex()}")
return None

import time
result_future = goal_handle.get_result_async()
# Poll instead of spin_until_future_complete — executor owns us.
for _ in range(3000): # 300s max
if result_future.done():
break
time.sleep(0.1)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In get_result, after polling for up to 300s you still call result_future.result() even if the future is not done, which can block forever. Add an explicit timeout check (if not result_future.done(): ...) before calling result() and decide how to report a timeout to the TCP client.

Suggested change
time.sleep(0.1)
time.sleep(0.1)
if not result_future.done():
self.get_logger().error(
f"get_result: timed out waiting for result for UUID {key.hex()}")
with self._goal_handles_lock:
self._goal_handles.pop(key, None)
return None

Copilot uses AI. Check for mistakes.
result = result_future.result()

# Clean up the handle.
with self._goal_handles_lock:
self._goal_handles.pop(key, None)

if result is None:
return None

# Build GetResult_Response
response_class = self.action_class.Impl.GetResultService.Response
resp = response_class()
resp.status = result.status
resp.result = result.result
return serialize_message(resp)

def cancel_goal(self, goal_info_data):
"""Cancel a goal. *goal_info_data* is CDR-serialized
CancelGoal_Request bytes.

Returns CDR-serialized CancelGoal_Response, or None.
"""
from action_msgs.srv import CancelGoal
req = deserialize_message(goal_info_data, CancelGoal.Request)

key = bytes(req.goal_info.goal_id.uuid)
with self._goal_handles_lock:
goal_handle = self._goal_handles.get(key)

if goal_handle is None:
self.get_logger().error(
f"cancel_goal: no goal handle for UUID {key.hex()}")
return None

import time
cancel_future = goal_handle.cancel_goal_async()
for _ in range(100): # 10s max
if cancel_future.done():
break
time.sleep(0.1)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cancel_goal, you poll cancel_future for up to 10s but then call cancel_future.result() regardless of completion. If it didn't complete, this will block indefinitely. Please check cancel_future.done() and handle the timeout path before calling result().

Suggested change
time.sleep(0.1)
time.sleep(0.1)
if not cancel_future.done():
self.get_logger().error(
f"cancel_goal: timed out waiting for cancel response for UUID {key.hex()}")
return None

Copilot uses AI. Check for mistakes.
cancel_response = cancel_future.result()

if cancel_response is None:
return None
return serialize_message(cancel_response)

def unregister(self):
self._action_client.destroy()
self.destroy_node()
150 changes: 150 additions & 0 deletions ros_tcp_endpoint/action_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright 2020 Unity Technologies
# Copyright 2026 gd-ros-tcp-connector contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Bridge a client-implemented Action server through ROS-TCP-Endpoint.

When a ROS 2 action client sends a goal, the execute_callback
forwards it to the client using the same __request/__response two-frame
pair that regular unity_service uses. The client computes and sends the
result back; feedback is pushed from the client via a separate
__action_publish_feedback syscommand.
"""

import re
import threading
import time

from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.serialization import deserialize_message, serialize_message

Comment on lines +26 to +32
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time is imported but unused in this module, which will fail ament_flake8 (F401). Please remove the unused import (or use it).

Copilot uses AI. Check for mistakes.
from .communication import RosSender


class RosActionServer(RosSender):

def __init__(self, action_name, action_class, tcp_server):
stripped = re.sub("[^A-Za-z0-9_]+", "", action_name)
node_name = f"{stripped}_RosActionServer"
RosSender.__init__(self, node_name)

self.action_name = action_name
self.action_class = action_class
self.tcp_server = tcp_server

# goal_uuid bytes -> ServerGoalHandle (for feedback publishing)
self._goal_handles = {}
self._lock = threading.Lock()

self._action_server = ActionServer(
self,
action_class,
action_name,
execute_callback=self._execute_callback,
goal_callback=lambda _: GoalResponse.ACCEPT,
cancel_callback=lambda _: CancelResponse.ACCEPT,
)

def _execute_callback(self, goal_handle):
"""Forward the goal to the client and wait for the result."""
goal_uuid = bytes(goal_handle.goal_id.uuid)

with self._lock:
self._goal_handles[goal_uuid] = goal_handle

# Serialize the Goal body.
goal_msg = goal_handle.request
goal_cdr = serialize_message(goal_msg)

# Send the goal to the client using the __request / __response
# mechanism. send_unity_service_request blocks until the client
# responds. We use the action_name as the topic/destination
# so the client.s action server can route it.
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment has a typo: client.s should be client's.

Suggested change
# so the client.s action server can route it.
# so the client's action server can route it.

Copilot uses AI. Check for mistakes.
#
# BUT: send_unity_service_request calls deserialize_message
# on the response, which needs a rclpy type. We need raw CDR
# bytes instead. So we use a raw version.
result_cdr = self._send_goal_to_client(goal_uuid, goal_cdr)

# Clean up.
with self._lock:
self._goal_handles.pop(goal_uuid, None)

if result_cdr is None:
goal_handle.abort()
return self.action_class.Result()

result_msg = deserialize_message(result_cdr, self.action_class.Result)
goal_handle.succeed()
return result_msg

def _send_goal_to_client(self, goal_uuid, goal_cdr):
"""Send a goal to the client and wait for the result CDR bytes.

Uses the same ThreadPauser mechanism as send_unity_service_request
but works with raw CDR bytes.
"""
from .tcp_sender import ThreadPauser, SysCommand_Service

sender = self.tcp_server.unity_tcp_sender
if sender.queue is None:
return None

thread_pauser = ThreadPauser()
with sender.srv_lock:
srv_id = sender.next_srv_id
sender.next_srv_id += 1
sender.services_waiting[srv_id] = thread_pauser

# Build __request{srv_id} header.
from .client import ClientThread
command = SysCommand_Service()
command.srv_id = srv_id
serialized_header = ClientThread.serialize_command("__request", command)

# Build the data frame: destination = action_name, payload =
# 16-byte goal UUID + CDR goal body. the client strips the UUID
# to identify which goal this is for.
import struct
dest_bytes = self.action_name.encode("utf-8")
dest_len = len(dest_bytes)
payload = goal_uuid + goal_cdr
dest_info = struct.pack("<I%ss" % dest_len, dest_len, dest_bytes)
msg_length = struct.pack("<I", len(payload))
serialized_message = dest_info + msg_length + payload

sender.queue.put(b"".join([serialized_header, serialized_message]))

# Block until the client sends __response{srv_id} with the result.
thread_pauser.sleep_until_resumed()

return thread_pauser.result # raw CDR bytes

def publish_feedback(self, goal_uuid_bytes, feedback_cdr):
"""Called when the client sends feedback for an in-progress goal."""
with self._lock:
goal_handle = self._goal_handles.get(goal_uuid_bytes)

if goal_handle is None:
self.get_logger().warning(
f"publish_feedback: no goal handle for {goal_uuid_bytes.hex()}")
return

fb_msg = deserialize_message(feedback_cdr, self.action_class.Feedback)
goal_handle.publish_feedback(fb_msg)

def unregister(self):
self._action_server.destroy()
self.destroy_node()
Loading
Loading