Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 67 additions & 11 deletions src/viam/robot/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@
ShutdownRequest,
StopAllRequest,
StopExtraParameters,
TransformPCDRequest,
TransformPCDResponse,
TransformPoseRequest,
TransformPoseResponse,
)
from viam.resource.base import ResourceBase
from viam.resource.manager import ResourceManager
from viam.resource.registry import Registry
from viam.resource.rpc_client_base import ReconfigurableResourceRPCClientBase, ResourceRPCClientBase
from viam.resource.rpc_client_base import (
ReconfigurableResourceRPCClientBase,
ResourceRPCClientBase,
)
from viam.resource.types import API, RESOURCE_TYPE_COMPONENT, RESOURCE_TYPE_SERVICE
from viam.rpc.dial import DialOptions, ViamChannel, _dial_inner, dial
from viam.services.service_base import ServiceBase
Expand Down Expand Up @@ -239,7 +244,11 @@ async def connect_with_channel() -> RobotClient:

@classmethod
async def _with_channel(
cls, channel: Union[Channel, ViamChannel], options: Options, close_channel: bool, robot_addr: Optional[str] = None
cls,
channel: Union[Channel, ViamChannel],
options: Options,
close_channel: bool,
robot_addr: Optional[str] = None,
):
"""INTERNAL USE ONLY"""

Expand All @@ -261,7 +270,11 @@ async def _with_channel(
self._options = options
self._address = self._channel._path if self._channel._path else f"{self._channel._host}:{self._channel._port}"
self._sessions_client = SessionsClient(
self._channel, self._address, self._options.dial_options, disabled=self._options.disable_sessions, robot_addr=robot_addr
self._channel,
self._address,
self._options.dial_options,
disabled=self._options.disable_sessions,
robot_addr=robot_addr,
)

try:
Expand All @@ -273,12 +286,16 @@ async def _with_channel(

if options.refresh_interval > 0:
self._refresh_task = asyncio.create_task(
self._refresh_every(options.refresh_interval), name=f"{viam._TASK_PREFIX}-robot_refresh_metadata"
self._refresh_every(options.refresh_interval),
name=f"{viam._TASK_PREFIX}-robot_refresh_metadata",
)

if options.check_connection_interval > 0 or options.attempt_reconnect_interval > 0:
self._check_connection_task = asyncio.create_task(
self._check_connection(options.check_connection_interval, options.attempt_reconnect_interval),
self._check_connection(
options.check_connection_interval,
options.attempt_reconnect_interval,
),
name=f"{viam._TASK_PREFIX}-robot_check_connection",
)

Expand Down Expand Up @@ -431,7 +448,10 @@ async def _check_connection(self, check_every: int, reconnect_every: int):
LOGGER.debug("Successfully reconnected machine")
break
except Exception as e:
LOGGER.error(f"Failed to reconnect, trying again in {reconnect_every}sec", exc_info=e)
LOGGER.error(
f"Failed to reconnect, trying again in {reconnect_every}sec",
exc_info=e,
)
self._sessions_client.reset()
self._close_channel()
await asyncio.sleep(reconnect_every)
Expand Down Expand Up @@ -683,7 +703,10 @@ async def get_frame_system_config(self, additional_transforms: Optional[List[Tra
return list(response.frame_system_configs)

async def transform_pose(
self, query: PoseInFrame, destination: str, additional_transforms: Optional[List[Transform]] = None
self,
query: PoseInFrame,
destination: str,
additional_transforms: Optional[List[Transform]] = None,
) -> PoseInFrame:
"""
Transform a given source Pose from the reference frame to a new specified destination which is a reference frame.
Expand Down Expand Up @@ -719,12 +742,39 @@ async def transform_pose(

For more information, see `Machine Management API <https://docs.viam.com/appendix/apis/robot/>`_.
"""
request = TransformPoseRequest(source=query, destination=destination, supplemental_transforms=additional_transforms)
request = TransformPoseRequest(
source=query,
destination=destination,
supplemental_transforms=additional_transforms,
)
response: TransformPoseResponse = await self._client.TransformPose(request)
return response.pose

async def transform_point_cloud(self):
raise NotImplementedError()
async def transform_pcd(self, point_cloud_pcd: bytes, source: str, destination: str) -> bytes:
"""
Transform given pointcloud data from the source reference frame to a new specified destination which is a reference frame.

::

my_camera = Camera.from_robot(robot=machine, name="my_camera")
data, _ = await my_camera.get_point_cloud()

transformed_pcd = await machine.transform_pcd(pcd, "my_camera", "world")

Args:

point_cloud_pcd (bytes): The point cloud data to transform
source (str) : The name of the reference frame the point cloud data came from, i.e. camera resource.
destination (str) : The name of the reference frame to transform the given data to, i.e. world.

Returns:
bytes: The point cloud data relative to the destination reference frame

For more information, see `Machine Management API <https://docs.viam.com/appendix/apis/robot/>`_.
"""
request = TransformPCDRequest(point_cloud_pcd=point_cloud_pcd, source=source, destination=destination)
response: TransformPCDResponse = await self._client.TransformPCD(request)
return response.point_cloud_pcd

#################
# MODULE MODELS #
Expand Down Expand Up @@ -795,7 +845,13 @@ async def log(self, name: str, level: str, time: datetime, message: str, stack:

For more information, see `Machine Management API <https://docs.viam.com/appendix/apis/robot/>`_.
"""
entry = LogEntry(level=level, time=datetime_to_timestamp(time), logger_name=name, message=message, stack=stack)
entry = LogEntry(
level=level,
time=datetime_to_timestamp(time),
logger_name=name,
message=message,
stack=stack,
)
request = LogRequest(logs=[entry])
await self._client.Log(request)

Expand Down