Sensors¶
This page documents the sensor system: the abstraction MolmoSpaces uses to turn raw simulator state into the observation dicts that policies consume and that data generation writes to disk.
If you're reading this because you're writing a new task or a new robot and need to decide which sensors to attach, the short version is:
New users: start from get_core_sensors()
For almost any new task you should compose your sensor suite by starting
from get_core_sensors(exp_config)
and then extending it with the few task-specific sensors you actually
need. Do not copy or extend
get_rby1_door_opening_sensors,
get_nav_task_sensors,
or any other ad-hoc bundle — they exist for backward compatibility and
bake in legacy assumptions (e.g. robot name checks) that you almost
certainly do not want.
The rest of this page explains how the system fits together.
How sensors work¶
A sensor is a small object that, given the env and the current task, returns
one piece of an observation. Every sensor inherits from
Sensor (in
molmo_spaces/env/abstract_sensors.py):
class Sensor(ABC):
uuid: str # unique identifier, used as the obs dict key
observation_space: gym.Space # gymnasium space describing the output
is_dict: bool = False # if True, output is a dict that will be JSON-encoded
str_max_len: int = 2000 # padding length for the JSON byte buffer
@abstractmethod
def get_observation(self, env, task, batch_index: int = 0, ...): ...
def reset(self) -> None: ... # optional, override if the sensor has state
The uuid becomes the observation key¶
A sensor's uuid is the string key it occupies in the observation dict that
task.step() returns and in the HDF5 file that data generation produces.
Two sensors in the same suite are not allowed to share a uuid — the
SensorSuite constructor and
add() method both assert uniqueness.
is_dict and str_max_len¶
There are two flavors of sensor output:
- Plain array sensors (
is_dict = False) return anp.ndarraywhose shape and dtype matchobservation_space. Camera RGB, depth, TCP pose, etc. are all in this category. - Dict sensors (
is_dict = True) return a (possibly nested) Pythondictof JSON-serializable values. At save time this dict isjson.dumps'd, UTF-8 encoded, and packed into a fixed-lengthnp.uint8buffer of lengthstr_max_len(right-padded with\x00). The correspondingobservation_spaceis always aBox(0, 255, (str_max_len,), uint8).
The encoding round-trip happens in
save_utils.dict_to_byte_array
/ byte_array_to_string; see Saving below.
A warning is logged if your JSON exceeds str_max_len and gets truncated, so
if you're storing variable-length data (e.g. per-object dicts) pick
str_max_len generously.
reset()¶
Sensors that maintain state across calls — for example
LastCommandedRelativeJointPosSensor
(needs the previous joint pos),
ObjectStartPoseSensor
(caches the first observed pose), and
GraspStateSensor (caches
gripper/object geom ids) — override reset(). The base
BaseMujocoTask.reset() calls reset() on every sensor in the suite, so
state never leaks between episodes.
SensorSuite¶
A SensorSuite is just an
OrderedDict[str, Sensor] with a couple of helpers:
get_observations(env, task, batch_index=...)calls every sensor'sget_observation(...)and returns the resulting dict.add(sensor)/extend(sensors)append to the suite while enforcing theuuiduniqueness invariant.
The task's get_observations() invokes the suite once per env in the batch
(n_batch is typically 1; see the warning in
Key Concepts).
Where sensors are registered¶
Sensor registration is split across the three layers that own the data (this was the result of a recent refactor — see commit history for details). Each layer contributes the sensors it knows about; the task composes them into the final suite at construction and policy-registration time.
1. Task — task-specific sensors¶
Each BaseMujocoTask subclass implements
[_create_sensor_suite_from_config(exp_config)][molmo_spaces.tasks.task.BaseMujocoTask._create_sensor_suite_from_config]
(which is @abstractmethod). This is where the core sensor bundle plus any
task-specific sensors get instantiated. The recommended pattern is:
from molmo_spaces.env.abstract_sensors import SensorSuite
from molmo_spaces.env.sensors import GraspStateSensor, ObjectStartPoseSensor, get_core_sensors
class MyTask(BaseMujocoTask):
def _create_sensor_suite_from_config(self, config):
sensors = get_core_sensors(config)
sensors.extend([
ObjectStartPoseSensor(object_name=config.task_config.pickup_obj_name,
uuid="obj_start_pose"),
GraspStateSensor(object_name=config.task_config.pickup_obj_name,
uuid="grasp_state_pickup_obj"),
])
return SensorSuite(sensors)
PickTask and
PickAndPlaceTask
are good reference implementations.
2. Robot — robot-specific sensors¶
Each Robot subclass overrides
create_robot_sensors(),
which returns a list of sensors that only make sense for that robot. Currently:
| Robot | Adds |
|---|---|
FrankaRobot, MobileFrankaRobot, I2rtYamRobot, FloatingRUMRobot |
TCPPoseSensor(uuid="tcp_pose") |
BimanualYamRobot |
TCPPoseSensor(uuid="tcp_pose_left"), TCPPoseSensor(uuid="tcp_pose_right") |
RBY1 |
RBY1GraspStateSensor(uuid="rby1_left_grasp_state", ...), RBY1GraspStateSensor(uuid="rby1_right_grasp_state", ...) |
BaseMujocoTask.__init__ calls current_robot.create_robot_sensors() and
extends the suite with them, so tasks never have to think about
robot-specific sensors directly.
3. Policy — policy-specific sensors¶
Each BasePolicy subclass overrides
create_policy_sensors().
The defaults are:
| Policy class | Adds |
|---|---|
BasePolicy |
nothing |
PlannerPolicy |
PolicyPhaseSensor(uuid="policy_phase"), PolicyNumRetriesSensor(uuid="policy_num_retries") |
BaseObjectManipulationPlannerPolicy |
GraspPoseSensor(uuid="grasp_pose") |
These are attached when the policy is bound to the task via
task.register_policy(policy).
A task may only have one policy registered over its lifetime — re-registration
raises ValueError.
Composition order¶
The final suite for a typical episode is built up in this order:
BaseMujocoTask.__init__:
suite = task._create_sensor_suite_from_config(exp_config) # task sensors (incl. core)
suite.extend(robot.create_robot_sensors()) # robot sensors
task.register_policy(policy):
suite.extend(policy.create_policy_sensors()) # policy sensors
Putting register_policy after construction is important because, e.g.,
PolicyPhaseSensor and GraspPoseSensor reference the bound policy via
task._registered_policy and would fail otherwise.
Opting out¶
If exp_config.task_config.use_sensors = False, BaseMujocoTask skips
creation of the suite entirely (self._sensor_suite = None) and
get_observations() returns an empty dict per env. register_policy then
becomes a no-op for sensor extension. This codepath exists for cases like
unit tests or pure physics-only rollouts.
The core sensor suite¶
get_core_sensors(exp_config) is
the recommended starting point for any new task. It is task-, policy-, and
robot-agnostic — anything that would be specific to one of those should be
contributed by the corresponding create_*_sensors hook, not by extending or
copying this function.
It builds:
| Category | Sensors |
|---|---|
Cameras (per camera_spec in exp_config.camera_config.cameras) |
CameraParameterSensor (sensor_param_{name}), CameraSensor ({name}), and conditionally DepthSensor ({name}_depth) when camera_spec.record_depth is true |
| Robot proprioception | RobotJointPositionSensor (qpos), RobotJointVelocitySensor (qvel), RobotBasePoseSensor (robot_base_pose) |
| Environment state | EnvStateSensor (env_states), TaskInfoSensor (task_info) |
| Actions | LastActionSensor (actions/commanded_action), LastCommandedJointPosSensor (actions/joint_pos), LastCommandedRelativeJointPosSensor (actions/joint_pos_rel), LastCommandedEETwistSensor (actions/ee_twist), LastCommandedEEPoseSensor (actions/ee_pose) |
| Object tracking | ObjectImagePointsSensor (object_image_points) — samples in-mask pixel coordinates per camera for the task objects returned by task.get_task_objects() |
A few notes:
- The action sensors all return
{}whentask.is_terminal(). This is used as the sentineldoneaction at the tail of every trajectory — see Data Format for what consumers do with this. LastCommandedRelativeJointPosSensorandLastCommandedEETwistSensorneed the previous joint pos / leaf pose, so they ship a dummy zero observation on the first step.- Move groups that aren't position-commanded (e.g. velocity-controlled grippers with mismatched action/state dims) are silently dropped from the relative-action sensor output.
ObjectImagePointsSensorfalls back to the legacytask_config.pickup_obj_name/place_receptacle_namelookup if a task doesn't overrideget_task_objects(). For new tasks, prefer overridingget_task_objects()(seeBaseMujocoTask.get_task_objects).
Task-specific sensors¶
These are added by individual tasks on top of the core suite. Notable examples:
| Task | Adds (on top of get_core_sensors) |
|---|---|
PickTask |
ObjectStartPoseSensor(uuid="obj_start_pose"), GraspStateSensor(uuid="grasp_state_pickup_obj"), PickupObjGoalPoseSensor(uuid="obj_end_pose") |
PickAndPlaceTask |
ObjectStartPoseSensor, GraspStateSensor for both the pickup object and the place receptacle |
OpeningTask |
Inherits the PickTask suite (since opening an articulated object is structurally similar to picking) |
DoorOpeningTask |
Uses get_rby1_door_opening_sensors — see warning below |
NavToObjTask |
Uses get_nav_task_sensors — see warning below |
MultiTask |
Shares its child task's suite; its own _create_sensor_suite_from_config is a stub that returns SensorSuite(get_core_sensors(...)) |
Some commonly-reused task-side sensors:
ObjectPoseSensor— pose of one or more named objects, expressed relative to the robot base. Used by navigation and door opening, output is a dict (is_dict = True).ObjectStartPoseSensor— caches the object's pose at the start of the episode. For pick / open / close tasks it short-circuits totask.config.task_config.pickup_obj_start_pose.GraspStateSensor— per-gripper{"touching": bool, "held": bool}heuristic based on MuJoCo contact pairs.DoorStateSensor— joint angle, opening percentage, handle position/extents (door-task only).PickupObjGoalPoseSensor(lives next toPickTask) — the target end pose for the picked object.
Policy-specific sensors¶
These are attached when a policy is registered with the task:
PolicyPhaseSensor(policy_phase) — integer index of the current phase inpolicy.get_all_phases(). Added by everyPlannerPolicy.PolicyNumRetriesSensor(policy_num_retries) —policy.retry_count. Added by everyPlannerPolicy.GraspPoseSensor(grasp_pose) — the planned grasp pose in 7D. Added byBaseObjectManipulationPlannerPolicyand lives in the same module as the policy.
For learned / inference policies that don't carry phase or grasp metadata,
create_policy_sensors() returns [] and nothing is appended.
Robot-specific sensors¶
Robot-specific sensors are added in
create_robot_sensors()
on each Robot subclass. Today these are mostly TCP poses for arms; see
Where sensors are registered above for the
full table.
If you're adding a new robot, register here any sensor that is meaningless on other robots (e.g. a specific gripper's contact state). Don't add it to the core sensor suite.
How sensors relate to saving¶
The full path from "sensor returns a numpy array" to "value on disk in HDF5" runs through three stages.
1. Per-step accumulation¶
BaseMujocoTask.step() calls task.get_observations(), which in turn calls
SensorSuite.get_observations(env, task, batch_index). The resulting dict
(keyed by sensor uuid) is appended to task.observation_cache along with
rewards, terminals, etc. After the episode ends, task.get_history() packs
all of these into a single dict.
2. prepare_episode_for_saving¶
The data-generation pipeline (molmo_spaces/data_generation/pipeline.py,
function save_house_trajectories) takes the per-episode history dict and
hands it to
prepare_episode_for_saving.
This step:
- Flattens the per-timestep list of dicts (one entry per env in the batch;
we only support
n_batch=1for saving). - Saves all camera videos before batching, via
save_videos_from_raw_observations. Each camera (RGB and depth) becomes its own MP4 file in the house output directory. This is a memory optimization: RGB/depth frames are ~80% of an episode's memory, and saving them out before thetorch.stackstep in batching avoids the giant transient tensor copies. - Removes the camera sensor keys from the in-memory observation dicts
so they're not also batched as tensors. Camera sensors are identified by
is_camera_sensor, which checks the suite forCameraSensor/DepthSensorinstances (with a name-based fallback). - Calls
batch_observationsto transposeList[dict]toDict[uuid, Tensor(T, ...)]. As part of this,convert_to_arrlooks upis_dictandstr_max_lenon each sensor and JSON-encodes dict sensors into fixed-lengthuint8buffers (right-padded with\x00). - Appends
rewards,terminals,truncateds,successes, andobs_scene(JSON string).
The fact that all of this works correctly — for both plain-tensor and
dict-encoded sensors — depends on the SensorSuite being available so that
is_dict and str_max_len can be looked up by uuid. This is why
prepare_episode_for_saving takes a sensor_suite argument.
3. save_trajectories¶
save_trajectories writes
the batched episode dict to an HDF5 file with the layout documented in
Data Format. The relevant routing is:
| Episode key | Goes to HDF5 path | Notes |
|---|---|---|
qpos, qvel |
traj_{i}/obs/agent/{name} |
from RobotJointPositionSensor / RobotJointVelocitySensor |
actions/* |
traj_{i}/actions/{name} |
strips the actions/ prefix |
sensor_param_{cam} |
traj_{i}/obs/sensor_param/{cam}/{intrinsic_cv,extrinsic_cv,cam2world_gl} |
one group per camera |
Camera sensors ({cam}, {cam}_depth) |
traj_{i}/obs/sensor_data/{cam} |
dataset value is the byte-encoded MP4 filename, not the frames themselves; the file lives next to the HDF5 |
env_states |
traj_{i}/env_states/{actors,articulations} |
JSON byte buffer is decoded and re-bucketed |
Anything in the extra_sensor_mapping allowlist |
traj_{i}/obs/extra/{target_name} |
This is an explicit allowlist in _save_extra_data_from_batched — see warning below |
rewards, terminated, truncated, success, fail, obs_scene |
traj_{i}/{name} |
rest is metadata |
The extra_sensor_mapping allowlist
_save_extra_data_from_batched has a hard-coded dict that maps sensor
uuids to HDF5 dataset names. If you register a brand-new sensor with a
uuid that isn't in this dict and that isn't a camera / camera-param /
action / qpos / qvel / env_states / metadata sensor, your sensor's data
will not be written to disk — it'll happily flow through the in-memory
observation pipeline but be silently dropped at save time. If you add a
new task-side sensor that should be persisted, add its uuid to
extra_sensor_mapping in molmo_spaces/utils/save_utils.py.
Quick mental model¶
sensor.get_observation() # numpy / dict / nested dict
└── task.get_observations() # dict[uuid -> obs]
└── observation_cache (per-step list)
└── prepare_episode_for_saving():
├── save_videos_from_raw_observations() # MP4 to disk
├── strip camera keys
└── batch_observations() # → Dict[uuid, Tensor(T, ...)]
└── save_trajectories() # HDF5 on disk
Adding a new sensor¶
The minimal recipe:
- Subclass
Sensorand place where appropriate. - Pick a uuid that doesn't collide with anything in the core suite.
- Set
is_dictandstr_max_lenif your output is a (nested) dict. Otherwise setobservation_spaceto agym.spaces.Boxthat matches the array you return. - Implement
get_observation(env, task, batch_index=0, ...). Don't raise on missing state silently — either raise loudly, or return a well-defined sentinel that downstream consumers can detect. - Implement
reset()if you cache anything across calls. - Register it in the right place:
- Task-specific → extend the list returned by your task's
_create_sensor_suite_from_config. - Robot-specific → extend the list returned by your robot's
create_robot_sensors. - Policy-specific → extend the list returned by your policy's
create_policy_sensors.
- Task-specific → extend the list returned by your task's
- Add the uuid to
extra_sensor_mappinginmolmo_spaces/utils/save_utils.pyif you want it persisted to theobs/extra/...group of the HDF5 file. (Camera / action / qpos sensors are routed automatically.)
When in doubt, look at how PickTask
composes its suite — it's the smallest end-to-end example that uses the
recommended get_core_sensors() + task-side extensions pattern.