Skip to content

Robot Recording

RobotRecorder

A class for recording robot observation and actions.

Recording at a specified frequency on the observation and action of a robot. It leverages a queue and a worker thread to handle the recording asynchronously, ensuring that the main operations of the robot are not blocked.

Robot class must pass in the get_state, get_observation, prepare_action methods.` get_state() gets the current state/pose of the robot. get_observation() captures the observation/image of the robot. prepare_action() calculates the action between the new and old states.

Source code in mbodied/robots/robot_recording.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class RobotRecorder:
    """A class for recording robot observation and actions.

    Recording at a specified frequency on the observation and action of a robot. It leverages a queue and a worker
    thread to handle the recording asynchronously, ensuring that the main operations of the
    robot are not blocked.

    Robot class must pass in the `get_state`, `get_observation`, `prepare_action` methods.`
    get_state() gets the current state/pose of the robot.
    get_observation() captures the observation/image of the robot.
    prepare_action() calculates the action between the new and old states.
    """

    def __init__(
        self,
        get_state: Callable,
        get_observation: Callable,
        prepare_action: Callable,
        frequency_hz: int = 5,
        recorder_kwargs: dict = None,
        on_static: Literal["record", "omit"] = "omit",
    ) -> None:
        """Initializes the RobotRecorder.

        This constructor sets up the recording mechanism on the given robot, including the recorder instance,
        recording frequency, and the asynchronous processing queue and worker thread. It also
        initializes attributes to track the last recorded pose and the current instruction.

        Args:
            get_state: A function that returns the current state of the robot.
            get_observation: A function that captures the observation/image of the robot.
            prepare_action: A function that calculates the action between the new and old states.
            frequency_hz: Frequency at which to record pose and image data (in Hz).
            recorder_kwargs: Keyword arguments to pass to the Recorder constructor.
            on_static: Whether to record on static poses or not. If "record", it will record when the robot is not moving.
        """
        if recorder_kwargs is None:
            recorder_kwargs = {}
        self.recorder = Recorder(**recorder_kwargs)
        self.task = None

        self.last_recorded_state = None
        self.last_image = None

        self.recording = False
        self.frequency_hz = frequency_hz
        self.record_on_static = on_static == "record"
        self.recording_queue = Queue()

        self.get_state = get_state
        self.get_observation = get_observation
        self.prepare_action = prepare_action

        self._worker_thread = threading.Thread(target=self._process_queue, daemon=True)
        self._worker_thread.start()

    def __enter__(self):
        """Enter the context manager, starting the recording."""
        self.start_recording(self.task)

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Exit the context manager, stopping the recording."""
        self.stop_recording()

    def record(self, task: str) -> "RobotRecorder":
        """Set the task and return the context manager."""
        self.task = task
        return self

    def reset_recorder(self) -> None:
        """Reset the recorder."""
        while self.recording:
            time.sleep(0.1)
        self.recorder.reset()

    def record_from_robot(self) -> None:
        """Records the current pose and captures an image at the specified frequency."""
        while self.recording:
            start_time = time.perf_counter()
            self.record_current_state()
            elapsed_time = time.perf_counter() - start_time
            # Sleep for the remaining time to maintain the desired frequency
            sleep_time = max(0, (1.0 / self.frequency_hz) - elapsed_time)
            time.sleep(sleep_time)

    def start_recording(self, task: str = "") -> None:
        """Starts the recording of pose and image."""
        if not self.recording:
            self.task = task
            self.recording = True
            self.recording_thread = threading.Thread(target=self.record_from_robot)
            self.recording_thread.start()

    def stop_recording(self) -> None:
        """Stops the recording of pose and image."""
        if self.recording:
            self.recording = False
            self.recording_thread.join()

    def _process_queue(self) -> None:
        """Processes the recording queue asynchronously."""
        while True:
            image, instruction, action, state = self.recording_queue.get()
            self.recorder.record(observation={"image": image, "instruction": instruction}, action=action, state=state)
            self.recording_queue.task_done()

    def record_current_state(self) -> None:
        """Records the current pose and image if the pose has changed."""
        state = self.get_state()
        image = self.get_observation()

        # This is the beginning of the episode
        if self.last_recorded_state is None:
            self.last_recorded_state = state
            self.last_image = image
            return

        if state != self.last_recorded_state or self.record_on_static:
            action = self.prepare_action(self.last_recorded_state, state)
            self.recording_queue.put(
                (
                    self.last_image,
                    self.task,
                    action,
                    self.last_recorded_state,
                ),
            )
            self.last_image = image
            self.last_recorded_state = state

    def record_last_state(self) -> None:
        """Records the final pose and image after the movement completes."""
        self.record_current_state()

__enter__()

Enter the context manager, starting the recording.

Source code in mbodied/robots/robot_recording.py
65
66
67
def __enter__(self):
    """Enter the context manager, starting the recording."""
    self.start_recording(self.task)

__exit__(exc_type, exc_value, traceback)

Exit the context manager, stopping the recording.

Source code in mbodied/robots/robot_recording.py
69
70
71
def __exit__(self, exc_type, exc_value, traceback) -> None:
    """Exit the context manager, stopping the recording."""
    self.stop_recording()

__init__(get_state, get_observation, prepare_action, frequency_hz=5, recorder_kwargs=None, on_static='omit')

Initializes the RobotRecorder.

This constructor sets up the recording mechanism on the given robot, including the recorder instance, recording frequency, and the asynchronous processing queue and worker thread. It also initializes attributes to track the last recorded pose and the current instruction.

Parameters:

Name Type Description Default
get_state Callable

A function that returns the current state of the robot.

required
get_observation Callable

A function that captures the observation/image of the robot.

required
prepare_action Callable

A function that calculates the action between the new and old states.

required
frequency_hz int

Frequency at which to record pose and image data (in Hz).

5
recorder_kwargs dict

Keyword arguments to pass to the Recorder constructor.

None
on_static Literal['record', 'omit']

Whether to record on static poses or not. If "record", it will record when the robot is not moving.

'omit'
Source code in mbodied/robots/robot_recording.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(
    self,
    get_state: Callable,
    get_observation: Callable,
    prepare_action: Callable,
    frequency_hz: int = 5,
    recorder_kwargs: dict = None,
    on_static: Literal["record", "omit"] = "omit",
) -> None:
    """Initializes the RobotRecorder.

    This constructor sets up the recording mechanism on the given robot, including the recorder instance,
    recording frequency, and the asynchronous processing queue and worker thread. It also
    initializes attributes to track the last recorded pose and the current instruction.

    Args:
        get_state: A function that returns the current state of the robot.
        get_observation: A function that captures the observation/image of the robot.
        prepare_action: A function that calculates the action between the new and old states.
        frequency_hz: Frequency at which to record pose and image data (in Hz).
        recorder_kwargs: Keyword arguments to pass to the Recorder constructor.
        on_static: Whether to record on static poses or not. If "record", it will record when the robot is not moving.
    """
    if recorder_kwargs is None:
        recorder_kwargs = {}
    self.recorder = Recorder(**recorder_kwargs)
    self.task = None

    self.last_recorded_state = None
    self.last_image = None

    self.recording = False
    self.frequency_hz = frequency_hz
    self.record_on_static = on_static == "record"
    self.recording_queue = Queue()

    self.get_state = get_state
    self.get_observation = get_observation
    self.prepare_action = prepare_action

    self._worker_thread = threading.Thread(target=self._process_queue, daemon=True)
    self._worker_thread.start()

record(task)

Set the task and return the context manager.

Source code in mbodied/robots/robot_recording.py
73
74
75
76
def record(self, task: str) -> "RobotRecorder":
    """Set the task and return the context manager."""
    self.task = task
    return self

record_current_state()

Records the current pose and image if the pose has changed.

Source code in mbodied/robots/robot_recording.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def record_current_state(self) -> None:
    """Records the current pose and image if the pose has changed."""
    state = self.get_state()
    image = self.get_observation()

    # This is the beginning of the episode
    if self.last_recorded_state is None:
        self.last_recorded_state = state
        self.last_image = image
        return

    if state != self.last_recorded_state or self.record_on_static:
        action = self.prepare_action(self.last_recorded_state, state)
        self.recording_queue.put(
            (
                self.last_image,
                self.task,
                action,
                self.last_recorded_state,
            ),
        )
        self.last_image = image
        self.last_recorded_state = state

record_from_robot()

Records the current pose and captures an image at the specified frequency.

Source code in mbodied/robots/robot_recording.py
84
85
86
87
88
89
90
91
92
def record_from_robot(self) -> None:
    """Records the current pose and captures an image at the specified frequency."""
    while self.recording:
        start_time = time.perf_counter()
        self.record_current_state()
        elapsed_time = time.perf_counter() - start_time
        # Sleep for the remaining time to maintain the desired frequency
        sleep_time = max(0, (1.0 / self.frequency_hz) - elapsed_time)
        time.sleep(sleep_time)

record_last_state()

Records the final pose and image after the movement completes.

Source code in mbodied/robots/robot_recording.py
139
140
141
def record_last_state(self) -> None:
    """Records the final pose and image after the movement completes."""
    self.record_current_state()

reset_recorder()

Reset the recorder.

Source code in mbodied/robots/robot_recording.py
78
79
80
81
82
def reset_recorder(self) -> None:
    """Reset the recorder."""
    while self.recording:
        time.sleep(0.1)
    self.recorder.reset()

start_recording(task='')

Starts the recording of pose and image.

Source code in mbodied/robots/robot_recording.py
 94
 95
 96
 97
 98
 99
100
def start_recording(self, task: str = "") -> None:
    """Starts the recording of pose and image."""
    if not self.recording:
        self.task = task
        self.recording = True
        self.recording_thread = threading.Thread(target=self.record_from_robot)
        self.recording_thread.start()

stop_recording()

Stops the recording of pose and image.

Source code in mbodied/robots/robot_recording.py
102
103
104
105
106
def stop_recording(self) -> None:
    """Stops the recording of pose and image."""
    if self.recording:
        self.recording = False
        self.recording_thread.join()