Conversation
| .coverage.* | ||
|
|
||
| # Created from simulation | ||
| MUJOCO_LOG.TXT |
There was a problem hiding this comment.
Yeah we've been missing this for a long time
dimos/record/record_replay.py
Outdated
|
|
||
| def start_recording( | ||
| self, | ||
| pubsubs: Collection[LCMPubSubBase], |
There was a problem hiding this comment.
needs to be generic, idk what's the best interface to specify topics, but we should be able to take a subscription from any transport at this layer.
at this stage doesn't have to be a glob supporting pubsub, this is for discovery, all topics you want to record are known at this stage, so ros transport, shm should work here.
There was a problem hiding this comment.
Yes, was just looking at updating this annotation. Complication is that something like AllPubSub doesn't have .start() method.
dimos/record/record_replay.py
Outdated
| def _on_message(self, msg: bytes, topic: Topic) -> None: | ||
| stream_name = topic_to_stream_name(topic.pattern) | ||
|
|
||
| if self._topic_filter is not None and stream_name not in self._topic_filter: |
There was a problem hiding this comment.
you are doing subscribe_all then filtering in python which is too costly, at this stage python code already decoded LCM message and built a class for you just to throw it away, recorder will be used in actual robot modules with predefined topics, so we need efficient compute here..
this also makes me realize we might want to extend our pubsub base definition to support just watching for topics without subscribing/parsing so actual CLI recorder can be efficient, but can do it in a follow up
|
btw sharing if helpful, current module I use to hardcode implicit recording for specific robots class Recorder(Module[RecorderConfig]):
"""Records all ``In`` ports to a memory2 SQLite database.
Subclass with the topics you want to record::
class MyRecorder(Recorder):
color_image: In[Image]
lidar: In[PointCloud2]
blueprint.add(MyRecorder, db_path="session.db")
"""
default_config = RecorderConfig
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._store: SqliteStore | None = None
@rpc
def start(self) -> None:
super().start()
self._store = self.register_disposable(SqliteStore(path=self.config.db_path))
self._store.start()
if not self.inputs:
logger.warning("Recorder has no In ports — nothing to record, subclass the Recorder")
return
for name, port in self.inputs.items():
stream: Stream[Any] = self._store.stream(name, port.type)
unsub = port.subscribe(lambda msg, s=stream: s.append(msg))
self.register_disposable(Disposable(unsub))
logger.info("Recording %s (%s)", name, port.type.__name__)
@rpc
def stop(self) -> None:
super().stop() |
|
Can we get docs on how this is used? how do I test full functionality? I see recording to record can be set via CLI? |
Problem
We want a recorder tool with the ability to record a selection of streams for modules using the new memory2 module, and the ability to replay those files.
Closes #1575
Solution
Added a RecordReplay class to handle the main functionality, a separate recorder UI, and appropriate adjustments to link it all together.
Also made the run() entrypoint async, so it is possible to migrate gradually to asyncio (probably needs some discussion).
How to Test
Run simulation or similar, while also running
dimos recorder. A selection of streams should appear in the recorder. Select some streams with space and then use r to start/stop a recording.Replay with
--replay --replay-file=....