Skip to content

Robot

Robot

Bases: ABC

Abstract base class for robot hardware interfaces.

This class serves as a blueprint for creating interfaces to control robot hardware or other devices. It provides essential methods and guidelines to implement functionalities such as executing motions, capturing observations, and recording robot data, which can be used for training models.

Key Features: - organize your hardware interface in a modular fashion - support asynchronous dataset creation

Recording Capabilities: To enable data recording for model training, the following methods need implementation: - get_observation(): Captures the current observation/image of the robot. - get_state(): Retrieves the current state (pose) of the robot. - prepare_action(): Computes the action performed between two robot states.

Example Usage:

robot = MyRobot()
robot.init_recorder(frequency_hz=5, recorder_kwargs={...})
with robot.record("pick up the remote"):
    robot.do(motion1)
    robot.do(motion2)
    ...
When with robot.record is called, it starts recording the robot's observation and actions at the desired frequency. It stops when the context manager exits.

Alternatively, you can manage recordings with start_recording() and stop_recording() methods.

Source code in mbodied/robots/robot.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class Robot(ABC):
    """Abstract base class for robot hardware interfaces.

    This class serves as a blueprint for creating interfaces to control robot hardware
    or other devices. It provides essential methods and guidelines to implement
    functionalities such as executing motions, capturing observations, and recording
    robot data, which can be used for training models.

    Key Features:
    - organize your hardware interface in a modular fashion
    - support asynchronous dataset creation

    **Recording Capabilities:**
    To enable data recording for model training, the following methods need implementation:
    - `get_observation()`: Captures the current observation/image of the robot.
    - `get_state()`: Retrieves the current state (pose) of the robot.
    - `prepare_action()`: Computes the action performed between two robot states.

    **Example Usage:**
    ```python
    robot = MyRobot()
    robot.init_recorder(frequency_hz=5, recorder_kwargs={...})
    with robot.record("pick up the remote"):
        robot.do(motion1)
        robot.do(motion2)
        ...
    ```
    When ``with robot.record`` is called, it starts recording the robot's observation and actions
    at the desired frequency. It stops when the context manager exits.

    Alternatively, you can manage recordings with `start_recording()` and `stop_recording()` methods.
    """

    @abstractmethod
    def __init__(self, **kwargs):
        """Initializes the robot hardware interface.

        Args:
            kwargs: Additional arguments to pass to the robot hardware interface.
        """
        raise NotImplementedError

    @abstractmethod
    def do(self, *args, **kwargs) -> None:  # noqa
        """Executes motion.

        Args:
            args: Arguments to pass to the robot hardware interface.
            kwargs: Additional arguments to pass to the robot hardware interface.
        """
        raise NotImplementedError

    async def async_do(self, *args, **kwargs) -> None:
        """Asynchronously executes motion.

        Args:
            args: Arguments to pass to the robot hardware interface.
            kwargs: Additional arguments to pass to the robot hardware interface.
        """
        return await asyncio.to_thread(self.do, *args, **kwargs)

    def fetch(self, *args, **kwargs) -> None:
        """Fetches data from the hardware.

        Args:
            args: Arguments to pass to the robot hardware interface.
            kwargs: Additional arguments to pass to the robot hardware interface.
        """
        raise NotImplementedError

    def capture(self, *args, **kwargs) -> None:
        """Captures continuous data from the hardware.

        Args:
            args: Arguments to pass to the robot hardware interface.
            kwargs: Additional arguments to pass to the robot hardware interface.
        """
        raise NotImplementedError

    def get_observation(self) -> Sample:
        """(Optional for robot recorder): Captures the observation/image of the robot.

        This will be used by the robot recorder to record the current observation/image of the robot.
        """
        raise NotImplementedError

    def get_state(self) -> Sample:
        """(Optional for robot recorder): Gets the current state (pose) of the robot.

        This will be used by the robot recorder to record the current state of the robot.
        """
        raise NotImplementedError

    def prepare_action(self, old_state: Sample, new_state: Sample) -> Sample:
        """(Optional for robot recorder): Prepare the the action between two robot states.

        This is what you are recording as the action. For example, substract old from new hand position
        and use absolute value for grasp, etc.

        Args:
            old_state: The old state (pose) of the robot.
            new_state: The new state (pose) of the robot.
        """
        raise NotImplementedError

    def init_recorder(
        self,
        frequency_hz: int = 5,
        recorder_kwargs: dict = None,
        on_static: str = "omit",
    ) -> None:
        """Initializes the recorder for the robot."""
        self.robot_recorder = RobotRecorder(
            self.get_state,
            self.get_observation,
            self.prepare_action,
            frequency_hz,
            recorder_kwargs,
            on_static=on_static,
        )

    def record(self, task: str) -> RobotRecorder:
        """Start recording with the given task with context manager.

        Usage:
        ```python
        with robot.record("pick up the remote"):
            robot.do(motion1)
            robot.do(motion2)
            ...
        ```
        """
        return self.robot_recorder.record(task)

    def start_recording(self, task: str) -> None:
        """Start recording with the given task."""
        self.robot_recorder.start_recording(task)

    def stop_recording(self) -> None:
        """Stop recording."""
        self.robot_recorder.stop_recording()

__init__(**kwargs) abstractmethod

Initializes the robot hardware interface.

Parameters:

Name Type Description Default
kwargs

Additional arguments to pass to the robot hardware interface.

{}
Source code in mbodied/robots/robot.py
55
56
57
58
59
60
61
62
@abstractmethod
def __init__(self, **kwargs):
    """Initializes the robot hardware interface.

    Args:
        kwargs: Additional arguments to pass to the robot hardware interface.
    """
    raise NotImplementedError

async_do(*args, **kwargs) async

Asynchronously executes motion.

Parameters:

Name Type Description Default
args

Arguments to pass to the robot hardware interface.

()
kwargs

Additional arguments to pass to the robot hardware interface.

{}
Source code in mbodied/robots/robot.py
74
75
76
77
78
79
80
81
async def async_do(self, *args, **kwargs) -> None:
    """Asynchronously executes motion.

    Args:
        args: Arguments to pass to the robot hardware interface.
        kwargs: Additional arguments to pass to the robot hardware interface.
    """
    return await asyncio.to_thread(self.do, *args, **kwargs)

capture(*args, **kwargs)

Captures continuous data from the hardware.

Parameters:

Name Type Description Default
args

Arguments to pass to the robot hardware interface.

()
kwargs

Additional arguments to pass to the robot hardware interface.

{}
Source code in mbodied/robots/robot.py
92
93
94
95
96
97
98
99
def capture(self, *args, **kwargs) -> None:
    """Captures continuous data from the hardware.

    Args:
        args: Arguments to pass to the robot hardware interface.
        kwargs: Additional arguments to pass to the robot hardware interface.
    """
    raise NotImplementedError

do(*args, **kwargs) abstractmethod

Executes motion.

Parameters:

Name Type Description Default
args

Arguments to pass to the robot hardware interface.

()
kwargs

Additional arguments to pass to the robot hardware interface.

{}
Source code in mbodied/robots/robot.py
64
65
66
67
68
69
70
71
72
@abstractmethod
def do(self, *args, **kwargs) -> None:  # noqa
    """Executes motion.

    Args:
        args: Arguments to pass to the robot hardware interface.
        kwargs: Additional arguments to pass to the robot hardware interface.
    """
    raise NotImplementedError

fetch(*args, **kwargs)

Fetches data from the hardware.

Parameters:

Name Type Description Default
args

Arguments to pass to the robot hardware interface.

()
kwargs

Additional arguments to pass to the robot hardware interface.

{}
Source code in mbodied/robots/robot.py
83
84
85
86
87
88
89
90
def fetch(self, *args, **kwargs) -> None:
    """Fetches data from the hardware.

    Args:
        args: Arguments to pass to the robot hardware interface.
        kwargs: Additional arguments to pass to the robot hardware interface.
    """
    raise NotImplementedError

get_observation()

(Optional for robot recorder): Captures the observation/image of the robot.

This will be used by the robot recorder to record the current observation/image of the robot.

Source code in mbodied/robots/robot.py
101
102
103
104
105
106
def get_observation(self) -> Sample:
    """(Optional for robot recorder): Captures the observation/image of the robot.

    This will be used by the robot recorder to record the current observation/image of the robot.
    """
    raise NotImplementedError

get_state()

(Optional for robot recorder): Gets the current state (pose) of the robot.

This will be used by the robot recorder to record the current state of the robot.

Source code in mbodied/robots/robot.py
108
109
110
111
112
113
def get_state(self) -> Sample:
    """(Optional for robot recorder): Gets the current state (pose) of the robot.

    This will be used by the robot recorder to record the current state of the robot.
    """
    raise NotImplementedError

init_recorder(frequency_hz=5, recorder_kwargs=None, on_static='omit')

Initializes the recorder for the robot.

Source code in mbodied/robots/robot.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def init_recorder(
    self,
    frequency_hz: int = 5,
    recorder_kwargs: dict = None,
    on_static: str = "omit",
) -> None:
    """Initializes the recorder for the robot."""
    self.robot_recorder = RobotRecorder(
        self.get_state,
        self.get_observation,
        self.prepare_action,
        frequency_hz,
        recorder_kwargs,
        on_static=on_static,
    )

prepare_action(old_state, new_state)

(Optional for robot recorder): Prepare the the action between two robot states.

This is what you are recording as the action. For example, substract old from new hand position and use absolute value for grasp, etc.

Parameters:

Name Type Description Default
old_state Sample

The old state (pose) of the robot.

required
new_state Sample

The new state (pose) of the robot.

required
Source code in mbodied/robots/robot.py
115
116
117
118
119
120
121
122
123
124
125
def prepare_action(self, old_state: Sample, new_state: Sample) -> Sample:
    """(Optional for robot recorder): Prepare the the action between two robot states.

    This is what you are recording as the action. For example, substract old from new hand position
    and use absolute value for grasp, etc.

    Args:
        old_state: The old state (pose) of the robot.
        new_state: The new state (pose) of the robot.
    """
    raise NotImplementedError

record(task)

Start recording with the given task with context manager.

Usage:

with robot.record("pick up the remote"):
    robot.do(motion1)
    robot.do(motion2)
    ...

Source code in mbodied/robots/robot.py
143
144
145
146
147
148
149
150
151
152
153
154
def record(self, task: str) -> RobotRecorder:
    """Start recording with the given task with context manager.

    Usage:
    ```python
    with robot.record("pick up the remote"):
        robot.do(motion1)
        robot.do(motion2)
        ...
    ```
    """
    return self.robot_recorder.record(task)

start_recording(task)

Start recording with the given task.

Source code in mbodied/robots/robot.py
156
157
158
def start_recording(self, task: str) -> None:
    """Start recording with the given task."""
    self.robot_recorder.start_recording(task)

stop_recording()

Stop recording.

Source code in mbodied/robots/robot.py
160
161
162
def stop_recording(self) -> None:
    """Stop recording."""
    self.robot_recorder.stop_recording()