Skip to content

evaluation

evaluation

Evaluation utilities for MolmoSpaces benchmarks.

Programmatic usage

from molmo_spaces.evaluation import run_evaluation

results = run_evaluation( eval_config_cls=MyEvalConfig, benchmark_dir="/path/to/benchmark", checkpoint_path="/path/to/checkpoint", ) print(f"Success rate: {results.success_rate:.1%}")

See run_evaluation() for full documentation.

Modules:

Name Description
benchmark_schema

JSON-based benchmark schema definitions.

configs
eval_main

Evaluation entrypoint for learned policies on JSON-based benchmarks.

json_eval_runner

JSON-based benchmark evaluation runner.

policy_server

Modified from: https://github.com/Physical-Intelligence/openpi/blob/main/src/openpi/serving/websocket_policy_server.py

robot_eval_overrides

Classes:

Name Description
BaseTaskSpec

Base task specification with fields common to all task types.

BenchmarkMetadata

Optional metadata for a benchmark directory.

EpisodeSpec

Complete specification for a single benchmark episode.

EvaluationResults

Results from running an evaluation on a benchmark.

ExocentricCameraSpec

Specification for an exocentric (fixed) camera.

JsonEvalRunner

Evaluation runner for JSON-based benchmarks.

LanguageSpec

Natural language task specification.

NavToObjTaskSpec

Task-specific parameters for navigation to object tasks.

OpenCloseTaskSpec

Task-specific parameters for open/close tasks.

PickAndPlaceTaskSpec

Task-specific parameters for pick and place tasks.

PickTaskSpec

Task-specific parameters for pick tasks.

RobotMountedCameraSpec

Specification for a camera mounted on the robot.

RobotSpec

Robot initialization specification.

SceneModificationsSpec

Scene modifications required for this episode.

SourceSpec

Provenance information for this episode.

Functions:

Name Description
load_all_episodes

Load all episodes from a benchmark directory as a flat list.

load_benchmark

Load a benchmark directory.

run_evaluation

Run evaluation on a JSON benchmark programmatically.

Attributes:

Name Type Description
CameraSpec
TaskSpec

CameraSpec module-attribute

__all__ module-attribute

__all__ = ['run_evaluation', 'EvaluationResults', 'JsonEvalRunner', 'BaseTaskSpec', 'BenchmarkMetadata', 'CameraSpec', 'EpisodeSpec', 'ExocentricCameraSpec', 'LanguageSpec', 'NavToObjTaskSpec', 'OpenCloseTaskSpec', 'PickAndPlaceTaskSpec', 'PickTaskSpec', 'RobotMountedCameraSpec', 'RobotSpec', 'SceneModificationsSpec', 'SourceSpec', 'TaskSpec', 'load_all_episodes', 'load_benchmark']

BaseTaskSpec

Bases: BaseModel

Base task specification with fields common to all task types.

robot_base_pose is the authoritative field for robot world placement. This comes from task_config in the codebase, not robot_config.

task_cls is the authoritative identifier for the task type. The eval task sampler is responsible for interpreting task_cls and creating the appropriate task. task_type is optional and for human convenience only.

Attributes:

Name Type Description
robot_base_pose list[float]
task_cls str
task_type str | None

robot_base_pose class-attribute instance-attribute

robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)

task_cls instance-attribute

task_cls: str

task_type class-attribute instance-attribute

task_type: str | None = None

BenchmarkMetadata

Bases: BaseModel

Optional metadata for a benchmark directory.

This is NOT required - each episode is fully self-contained. This file provides optional human-readable metadata about the benchmark.

Classes:

Name Description
Config

Methods:

Name Description
from_json_file

Load benchmark metadata from a JSON file.

to_json_file

Save the benchmark metadata to a JSON file.

Attributes:

Name Type Description
benchmark_created_date str | None
camera_system_class str | None
created_at str | None
description str | None
episode_length_stats dict[str, float] | None
house_counts dict[int, int] | None
num_episodes int | None
num_houses int | None
object_category_counts dict[str, int] | None
robot_counts dict[str, int] | None
source_data_date str | None
source_datagen_path str | None
task_cls_counts dict[str, int] | None

benchmark_created_date class-attribute instance-attribute

benchmark_created_date: str | None = None

camera_system_class class-attribute instance-attribute

camera_system_class: str | None = None

created_at class-attribute instance-attribute

created_at: str | None = None

description class-attribute instance-attribute

description: str | None = None

episode_length_stats class-attribute instance-attribute

episode_length_stats: dict[str, float] | None = None

house_counts class-attribute instance-attribute

house_counts: dict[int, int] | None = None

num_episodes class-attribute instance-attribute

num_episodes: int | None = None

num_houses class-attribute instance-attribute

num_houses: int | None = None

object_category_counts class-attribute instance-attribute

object_category_counts: dict[str, int] | None = None

robot_counts class-attribute instance-attribute

robot_counts: dict[str, int] | None = None

source_data_date class-attribute instance-attribute

source_data_date: str | None = None

source_datagen_path class-attribute instance-attribute

source_datagen_path: str | None = None

task_cls_counts class-attribute instance-attribute

task_cls_counts: dict[str, int] | None = None

Config

Attributes:

Name Type Description
extra
extra class-attribute instance-attribute
extra = 'allow'

from_json_file classmethod

from_json_file(path: str | Path) -> BenchmarkMetadata

Load benchmark metadata from a JSON file.

Source code in molmo_spaces/evaluation/benchmark_schema.py
@classmethod
def from_json_file(cls, path: str | Path) -> "BenchmarkMetadata":
    """Load benchmark metadata from a JSON file."""
    path = Path(path)
    with open(path) as f:
        import json

        data = json.load(f)
    return cls.model_validate(data)

to_json_file

to_json_file(path: str | Path) -> None

Save the benchmark metadata to a JSON file.

Source code in molmo_spaces/evaluation/benchmark_schema.py
def to_json_file(self, path: str | Path) -> None:
    """Save the benchmark metadata to a JSON file."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w") as f:
        import json

        json.dump(self.model_dump(), f, indent=2)

EpisodeSpec

Bases: BaseModel

Complete specification for a single benchmark episode.

This is a FULLY SELF-CONTAINED specification - no external config needed. Contains all information needed to recreate the exact initial conditions for an episode: scene, robot, cameras, and task parameters.

NOTE: Timing/execution parameters (policy_dt_ms, ctrl_dt_ms, sim_dt_ms, task_horizon) are NOT stored per-episode. They come from the evaluation config or command line.

A benchmark is simply a list of EpisodeSpec objects in a single JSON file.

Classes:

Name Description
Config

Methods:

Name Description
from_json_file

Load an episode spec from a JSON file.

get_task_cls

Get fully qualified task class name from task dict (authoritative identifier).

get_task_type

Get optional human-readable task type from task dict.

to_json_file

Save the episode spec to a JSON file.

Attributes:

Name Type Description
cameras list[CameraSpec]
data_split str
house_index int
img_resolution tuple[int, int]
language LanguageSpec
robot RobotSpec
scene_dataset str
scene_modifications SceneModificationsSpec
seed int | None
source SourceSpec | None
task dict
task_relevant_objects list[str]

cameras class-attribute instance-attribute

cameras: list[CameraSpec] = Field(default_factory=list)

data_split class-attribute instance-attribute

data_split: str = 'val'

house_index instance-attribute

house_index: int

img_resolution instance-attribute

img_resolution: tuple[int, int]

language instance-attribute

language: LanguageSpec

robot instance-attribute

robot: RobotSpec

scene_dataset instance-attribute

scene_dataset: str

scene_modifications class-attribute instance-attribute

scene_modifications: SceneModificationsSpec = Field(default_factory=SceneModificationsSpec)

seed class-attribute instance-attribute

seed: int | None = None

source class-attribute instance-attribute

source: SourceSpec | None = None

task instance-attribute

task: dict

task_relevant_objects class-attribute instance-attribute

task_relevant_objects: list[str] = Field(default_factory=list)

Config

Attributes:

Name Type Description
extra
extra class-attribute instance-attribute
extra = 'allow'

from_json_file classmethod

from_json_file(path: str | Path) -> EpisodeSpec

Load an episode spec from a JSON file.

Source code in molmo_spaces/evaluation/benchmark_schema.py
@classmethod
def from_json_file(cls, path: str | Path) -> "EpisodeSpec":
    """Load an episode spec from a JSON file."""
    path = Path(path)
    with open(path) as f:
        import json

        data = json.load(f)
    return cls.model_validate(data)

get_task_cls

get_task_cls() -> str

Get fully qualified task class name from task dict (authoritative identifier).

Source code in molmo_spaces/evaluation/benchmark_schema.py
def get_task_cls(self) -> str:
    """Get fully qualified task class name from task dict (authoritative identifier)."""
    task_cls = self.task.get("task_cls")
    if not task_cls:
        raise ValueError("task dict missing required 'task_cls' field")
    # TODO(max): XXX remove this
    # if "molmo_spaces" in task_cls:  # TODO(rose): forking breanch
    #    print("XXX patching config molmo_spaces->mujoco_thor")
    #    task_cls = task_cls.replace("molmo_spaces", "mujoco_thor")
    return task_cls

get_task_type

get_task_type() -> str | None

Get optional human-readable task type from task dict.

Source code in molmo_spaces/evaluation/benchmark_schema.py
def get_task_type(self) -> str | None:
    """Get optional human-readable task type from task dict."""
    return self.task.get("task_type")

to_json_file

to_json_file(path: str | Path) -> None

Save the episode spec to a JSON file.

Source code in molmo_spaces/evaluation/benchmark_schema.py
def to_json_file(self, path: str | Path) -> None:
    """Save the episode spec to a JSON file."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w") as f:
        import json

        json.dump(self.model_dump(), f, indent=2)

EvaluationResults dataclass

EvaluationResults(success_count: int, total_count: int, output_dir: Path, episode_results: list[EpisodeResult] = list(), exp_config: MlSpacesExpConfig | None = None)

Results from running an evaluation on a benchmark.

Attributes:

Name Type Description
success_count int

Number of successful episodes

total_count int

Total number of episodes evaluated

output_dir Path

Path where evaluation outputs were saved

episode_results list[EpisodeResult]

Per-episode results with details

exp_config MlSpacesExpConfig | None

The experiment config used for evaluation

episode_results class-attribute instance-attribute

episode_results: list[EpisodeResult] = field(default_factory=list)

exp_config class-attribute instance-attribute

exp_config: MlSpacesExpConfig | None = None

output_dir instance-attribute

output_dir: Path

success_count instance-attribute

success_count: int

success_rate property

success_rate: float

Compute success rate as a fraction.

total_count instance-attribute

total_count: int

ExocentricCameraSpec

Bases: BaseModel

Specification for an exocentric (fixed) camera.

Attributes:

Name Type Description
forward list[float]
fov float
name str
pos list[float]
record_depth bool
type Literal['exocentric']
up list[float]

forward class-attribute instance-attribute

forward: list[float] = Field(..., min_length=3, max_length=3)

fov instance-attribute

fov: float

name instance-attribute

name: str

pos class-attribute instance-attribute

pos: list[float] = Field(..., min_length=3, max_length=3)

record_depth class-attribute instance-attribute

record_depth: bool = False

type class-attribute instance-attribute

type: Literal['exocentric'] = 'exocentric'

up class-attribute instance-attribute

up: list[float] = Field(..., min_length=3, max_length=3)

JsonEvalRunner

JsonEvalRunner(exp_config: MlSpacesExpConfig, benchmark_dir: Path)

Bases: ParallelRolloutRunner

Evaluation runner for JSON-based benchmarks.

This runner differs from the standard ParallelRolloutRunner in several ways: 1. Episodes are loaded from JSON files, not from H5 frozen configs 2. Each episode is fully self-contained (timing, cameras, task config) 3. Task samplers are created per-episode to support mixed task types 4. Uses patch_config to add evaluation-specific runtime parameters

The runner inherits process_single_house from ParallelRolloutRunner and customizes behavior by overriding hook methods.

Initialize the JSON eval runner.

The benchmark is authoritative - all episode data comes from the JSON files. No fallbacks or defaults; missing data is an error.

Parameters:

Name Type Description Default
exp_config MlSpacesExpConfig

Base experiment config (provides robot_config, policy_config)

required
benchmark_dir Path

Path to benchmark directory containing benchmark.json

required

Methods:

Name Description
adjust_robot

Apply robot-specific evaluation overrides if configured.

get_episode_seed

Get seed from episode spec, falling back to index.

get_episode_spec_at_index

Get episode specification at given index.

get_episode_task_sampler

Create per-episode JsonEvalTaskSampler.

get_episodes_for_house

Get all episode specs for a given house.

get_max_episode_attempts

Process all episodes in the benchmark - no retry multiplier.

load_episodes_for_house

Load episode specifications from JSON benchmark.

patch_config

Patch evaluation config with runtime evaluation-specific parameters.

prepare_episode_config

Prepare episode-specific config from JSON spec.

process_single_house

Process all episodes for a single house using customizable hooks.

run

Run house-by-house rollouts using multiprocessing workers.

run_single_rollout

Execute a single rollout with the given task and policy.

sample_task_from_spec

Sample task - episode spec is already in the JsonEvalTaskSampler.

should_close_episode_task_sampler

Close task sampler after each episode - we create per-episode.

should_stop_early

Stop early if evaluating a single episode (--idx provided) and it's been collected.

Attributes:

Name Type Description
benchmark_dir
completed_houses
config
counter_lock
house_counter
house_indices
logger
max_allowed_sequential_irrecoverable_failures
max_allowed_sequential_rollout_failures
max_allowed_sequential_task_sampler_failures
profiler
samples_per_house
shutdown_event
skipped_houses
success_count
total_count
total_houses
wandb_enabled
Source code in molmo_spaces/evaluation/json_eval_runner.py
def __init__(
    self,
    exp_config: MlSpacesExpConfig,
    benchmark_dir: Path,
) -> None:
    """
    Initialize the JSON eval runner.

    The benchmark is authoritative - all episode data comes from the JSON files.
    No fallbacks or defaults; missing data is an error.

    Args:
        exp_config: Base experiment config (provides robot_config, policy_config)
        benchmark_dir: Path to benchmark directory containing benchmark.json
    """
    self.benchmark_dir = benchmark_dir.resolve()

    all_episodes = load_all_episodes(self.benchmark_dir)
    if not all_episodes:
        raise ValueError(
            f"No episodes found in benchmark at {self.benchmark_dir}. "
            f"Expected benchmark.json file with list of episode specs."
        )

    eval_params = exp_config.eval_runtime_params
    if eval_params.max_episodes is not None and len(all_episodes) > eval_params.max_episodes:
        log.info(
            f"Limiting to first {eval_params.max_episodes} of {len(all_episodes)} episodes"
        )
        all_episodes = all_episodes[: eval_params.max_episodes]

    self._episodes_by_house: dict[int, list[EpisodeSpec]] = defaultdict(list)
    for ep in all_episodes:
        self._episodes_by_house[ep.house_index].append(ep)
    self._episodes_by_house = dict(self._episodes_by_house)

    # If episode_idx is specified, only process the house containing that episode
    episode_idx = eval_params.episode_idx
    if episode_idx is not None:
        if episode_idx < 0 or episode_idx >= len(all_episodes):
            raise ValueError(
                f"Episode index {episode_idx} is out of range. "
                f"Benchmark has {len(all_episodes)} episodes (indices 0-{len(all_episodes) - 1})"
            )
        target_episode = all_episodes[episode_idx]
        # Only process the house containing the target episode
        exp_config.task_sampler_config.house_inds = [target_episode.house_index]
        exp_config.task_sampler_config.samples_per_house = 1
    else:
        exp_config.task_sampler_config.house_inds = sorted(self._episodes_by_house.keys())
        max_episodes = max(len(eps) for eps in self._episodes_by_house.values())
        exp_config.task_sampler_config.samples_per_house = max_episodes
    exp_config.benchmark_path = self.benchmark_dir

    super().__init__(exp_config)

    total_episodes = sum(len(eps) for eps in self._episodes_by_house.values())
    log.info(
        f"JsonEvalRunner initialized: {len(self._episodes_by_house)} houses, "
        f"{total_episodes} episodes from {self.benchmark_dir}"
    )

benchmark_dir instance-attribute

benchmark_dir = resolve()

completed_houses instance-attribute

completed_houses = Value('i', 0)

config instance-attribute

config = exp_config

counter_lock instance-attribute

counter_lock = Lock()

house_counter instance-attribute

house_counter = Value('i', 0)

house_indices instance-attribute

house_indices = house_inds

logger instance-attribute

logger = get_logger()

max_allowed_sequential_irrecoverable_failures instance-attribute

max_allowed_sequential_irrecoverable_failures = max_allowed_sequential_irrecoverable_failures

max_allowed_sequential_rollout_failures instance-attribute

max_allowed_sequential_rollout_failures = max_allowed_sequential_rollout_failures

max_allowed_sequential_task_sampler_failures instance-attribute

max_allowed_sequential_task_sampler_failures = max_allowed_sequential_task_sampler_failures

profiler instance-attribute

profiler = profiler

samples_per_house instance-attribute

samples_per_house = samples_per_house

shutdown_event instance-attribute

shutdown_event = Event()

skipped_houses instance-attribute

skipped_houses = Value('i', 0)

success_count instance-attribute

success_count = Value('i', 0)

total_count instance-attribute

total_count = Value('i', 0)

total_houses instance-attribute

total_houses = len(house_indices)

wandb_enabled instance-attribute

wandb_enabled = True

adjust_robot staticmethod

adjust_robot(exp_config: MlSpacesExpConfig) -> None

Apply robot-specific evaluation overrides if configured.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def adjust_robot(exp_config: MlSpacesExpConfig) -> None:
    """Apply robot-specific evaluation overrides if configured."""
    robot_override = get_robot_override(exp_config.robot_config)
    if robot_override is not None:
        exp_config._robot_eval_override = robot_override

get_episode_seed staticmethod

get_episode_seed(episode_idx: int, episode_spec: EpisodeSpec, task_sampler: JsonEvalTaskSampler) -> int

Get seed from episode spec, falling back to index.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def get_episode_seed(
    episode_idx: int,
    episode_spec: EpisodeSpec,
    task_sampler: JsonEvalTaskSampler,
) -> int:
    """Get seed from episode spec, falling back to index."""
    return episode_spec.seed if episode_spec.seed is not None else episode_idx

get_episode_spec_at_index staticmethod

get_episode_spec_at_index(episode_specs: list[EpisodeSpec], idx: int) -> EpisodeSpec

Get episode specification at given index.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def get_episode_spec_at_index(episode_specs: list[EpisodeSpec], idx: int) -> EpisodeSpec:
    """Get episode specification at given index."""
    return episode_specs[idx]

get_episode_task_sampler staticmethod

get_episode_task_sampler(exp_config: MlSpacesExpConfig, episode_spec: EpisodeSpec, shared_task_sampler, datagen_profiler: DatagenProfiler | None) -> JsonEvalTaskSampler

Create per-episode JsonEvalTaskSampler.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def get_episode_task_sampler(
    exp_config: MlSpacesExpConfig,
    episode_spec: EpisodeSpec,
    shared_task_sampler,
    datagen_profiler: DatagenProfiler | None,
) -> JsonEvalTaskSampler:
    """Create per-episode JsonEvalTaskSampler."""
    sampler = JsonEvalTaskSampler(exp_config, episode_spec)
    if datagen_profiler is not None:
        sampler.set_datagen_profiler(datagen_profiler)
    return sampler

get_episodes_for_house

get_episodes_for_house(house_id: int) -> list[EpisodeSpec]

Get all episode specs for a given house.

Source code in molmo_spaces/evaluation/json_eval_runner.py
def get_episodes_for_house(self, house_id: int) -> list[EpisodeSpec]:
    """Get all episode specs for a given house."""
    if house_id not in self._episodes_by_house:
        raise KeyError(
            f"House {house_id} not found in benchmark. "
            f"Available houses: {sorted(self._episodes_by_house.keys())}"
        )
    return self._episodes_by_house[house_id]

get_max_episode_attempts staticmethod

get_max_episode_attempts(episode_specs: list[EpisodeSpec], samples_per_house: int, exp_config: MlSpacesExpConfig) -> int

Process all episodes in the benchmark - no retry multiplier.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def get_max_episode_attempts(
    episode_specs: list[EpisodeSpec],
    samples_per_house: int,
    exp_config: MlSpacesExpConfig,
) -> int:
    """Process all episodes in the benchmark - no retry multiplier."""
    return len(episode_specs)

load_episodes_for_house staticmethod

load_episodes_for_house(exp_config: MlSpacesExpConfig, house_id: int, batch_suffix: str, worker_task_sampler, worker_logger) -> tuple[list[EpisodeSpec], None]

Load episode specifications from JSON benchmark.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def load_episodes_for_house(
    exp_config: MlSpacesExpConfig,
    house_id: int,
    batch_suffix: str,
    worker_task_sampler,
    worker_logger,
) -> tuple[list[EpisodeSpec], None]:
    """Load episode specifications from JSON benchmark."""
    benchmark_path = exp_config.benchmark_path
    all_episodes = load_all_episodes(benchmark_path)

    if not all_episodes:
        worker_logger.error(
            f"No episodes found in benchmark at {benchmark_path}. Expected benchmark.json file."
        )
        return [], None

    eval_params = exp_config.eval_runtime_params

    # Truncate to max_episodes before any filtering
    if eval_params.max_episodes is not None and len(all_episodes) > eval_params.max_episodes:
        all_episodes = all_episodes[: eval_params.max_episodes]

    # Filter by episode index if specified
    episode_idx = eval_params.episode_idx
    if episode_idx is not None:
        if episode_idx < 0 or episode_idx >= len(all_episodes):
            worker_logger.error(
                f"Episode index {episode_idx} is out of range. "
                f"Benchmark has {len(all_episodes)} episodes (indices 0-{len(all_episodes) - 1})"
            )
            return [], None
        # Filter to only the specified episode, but still need to check house_id
        target_episode = all_episodes[episode_idx]
        if target_episode.house_index != house_id:
            # This house doesn't contain the target episode, return empty list
            return [], None
        all_episodes = [target_episode]

    house_episodes = [ep for ep in all_episodes if ep.house_index == house_id]

    if not house_episodes:
        available_houses = sorted(set(ep.house_index for ep in all_episodes))
        worker_logger.error(
            f"House {house_id} not found in benchmark. Available houses: {available_houses}"
        )
        return [], None

    # Apply custom object replacement if requested
    add_custom_object = eval_params.add_custom_object
    custom_object_path = eval_params.custom_object_path
    custom_object_name = eval_params.custom_object_name
    if add_custom_object and custom_object_path is not None:
        from pathlib import Path

        from molmo_spaces.evaluation.benchmark_schema import replace_target_object_with_custom

        custom_object_path = Path(custom_object_path)
        worker_logger.info(f"Replacing target objects with custom object: {custom_object_path}")
        if custom_object_name:
            worker_logger.info(f"Using custom object name: '{custom_object_name}'")
        house_episodes = [
            replace_target_object_with_custom(ep, custom_object_path, custom_object_name)
            for ep in house_episodes
        ]

    worker_logger.info(
        f"Loaded {len(house_episodes)} episodes for house {house_id} from {benchmark_path}"
    )
    return house_episodes, None

patch_config staticmethod

patch_config(exp_config: MlSpacesExpConfig, episode_idx: int | None = None, max_episodes: int | None = None, add_custom_object: bool = False, custom_object_path: str | Path | None = None, custom_object_name: str | None = None) -> MlSpacesExpConfig

Patch evaluation config with runtime evaluation-specific parameters.

This method modifies the config object to store evaluation-specific runtime parameters that are not part of the base config schema. These parameters are used by the evaluation runner to customize episode processing.

Parameters:

Name Type Description Default
exp_config MlSpacesExpConfig

The experiment config to patch

required
episode_idx int | None

Optional index of a specific episode to evaluate. If provided, only that episode will be evaluated and the process will stop after it.

None
max_episodes int | None

Optional maximum number of episodes to evaluate. If provided, only the episodes for the houses used in the first N episodes will be evaluated. Note that the final number of episodes can differ from N if more than one episode is sampled for any of the houses among the first N episodes.

None
add_custom_object bool

Whether to replace the target object with a custom object.

False
custom_object_path str | Path | None

Path to the custom object XML file. Required if add_custom_object is True.

None
custom_object_name str | None

Natural language name for the custom object (e.g., 'lemon', 'cup').

None

Returns:

Type Description
MlSpacesExpConfig

The patched config (same object, modified in place)

Note

These parameters are stored in an EvalRuntimeParams dataclass attached to the config object as exp_config.eval_runtime_params for access by worker processes. They are not part of the base MlSpacesExpConfig schema but are necessary for runtime evaluation customization.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def patch_config(
    exp_config: MlSpacesExpConfig,
    episode_idx: int | None = None,
    max_episodes: int | None = None,
    add_custom_object: bool = False,
    custom_object_path: str | Path | None = None,
    custom_object_name: str | None = None,
) -> MlSpacesExpConfig:
    """Patch evaluation config with runtime evaluation-specific parameters.

    This method modifies the config object to store evaluation-specific runtime
    parameters that are not part of the base config schema. These parameters are
    used by the evaluation runner to customize episode processing.

    Args:
        exp_config: The experiment config to patch
        episode_idx: Optional index of a specific episode to evaluate. If provided,
            only that episode will be evaluated and the process will stop after it.
        max_episodes: Optional maximum number of episodes to evaluate. If provided,
            only the episodes for the houses used in the first N episodes will be
            evaluated. Note that the final number of episodes can differ from N
            if more than one episode is sampled for any of the houses among the
            first N episodes.
        add_custom_object: Whether to replace the target object with a custom object.
        custom_object_path: Path to the custom object XML file. Required if
            add_custom_object is True.
        custom_object_name: Natural language name for the custom object (e.g., 'lemon', 'cup').

    Returns:
        The patched config (same object, modified in place)

    Note:
        These parameters are stored in an EvalRuntimeParams dataclass attached to
        the config object as `exp_config.eval_runtime_params` for access by worker
        processes. They are not part of the base MlSpacesExpConfig schema but are
        necessary for runtime evaluation customization.
    """
    # Import here to avoid circular dependency
    from molmo_spaces.evaluation.eval_main import EvalRuntimeParams

    # eval_runtime_params is now a proper field in MlSpacesExpConfig, so normal assignment works
    exp_config.eval_runtime_params = EvalRuntimeParams(
        episode_idx=episode_idx,
        max_episodes=max_episodes,
        add_custom_object=add_custom_object,
        custom_object_path=custom_object_path,
        custom_object_name=custom_object_name,
    )

    return exp_config

prepare_episode_config staticmethod

prepare_episode_config(exp_config: MlSpacesExpConfig, episode_spec: EpisodeSpec, episode_idx: int) -> MlSpacesExpConfig

Prepare episode-specific config from JSON spec.

Note: task_horizon is NOT read from episode_spec. It's an evaluation parameter that comes from exp_config (set via command line or defaults).

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def prepare_episode_config(
    exp_config: MlSpacesExpConfig,
    episode_spec: EpisodeSpec,
    episode_idx: int,
) -> MlSpacesExpConfig:
    """Prepare episode-specific config from JSON spec.

    Note: task_horizon is NOT read from episode_spec. It's an evaluation
    parameter that comes from exp_config (set via command line or defaults).
    """
    episode_config = exp_config.model_copy(deep=True)
    episode_config.scene_dataset = episode_spec.scene_dataset
    episode_config.data_split = episode_spec.data_split
    # task_horizon comes from exp_config, not episode_spec
    return episode_config

process_single_house staticmethod

process_single_house(worker_id: int, worker_logger, house_id: int, exp_config: MlSpacesExpConfig, samples_per_house: int, shutdown_event, task_sampler, preloaded_policy: BasePolicy | None = None, max_allowed_sequential_task_sampler_failures: int = 10, max_allowed_sequential_rollout_failures: int = 10, filter_for_successful_trajectories: bool = False, runner_class=None, batch_num: int | None = None, total_batches: int | None = None, datagen_profiler: DatagenProfiler | None = None) -> tuple[int, int, bool]

Process all episodes for a single house using customizable hooks.

This method uses a while loop to iterate over episodes, calling hook methods via runner_class to allow subclasses to customize behavior without duplicating the entire method.

Hooks called (override in subclass to customize): - load_episodes_for_house: Load episode specs from source (JSON, etc.) - get_max_episode_attempts: Maximum iterations of the episode loop - should_stop_early: Whether to stop before max attempts (e.g., enough successes) - prepare_episode_config: Modify config per-episode - get_episode_task_sampler: Get/create task sampler for episode - sample_task_from_spec: Sample task from specification - get_episode_seed: Get seed for episode - should_close_episode_task_sampler: Whether to close sampler per-episode

Parameters:

Name Type Description Default
worker_id int

ID of the worker thread/process

required
worker_logger

Logger instance for this worker

required
house_id int

Index of the house to process

required
exp_config MlSpacesExpConfig

Experiment configuration

required
samples_per_house int

Number of episodes to collect for this house

required
shutdown_event

Event to signal shutdown

required
task_sampler

Task sampler instance (shared across houses for this worker)

required
preloaded_policy BasePolicy | None

Optional pre-initialized policy instance

None
max_allowed_sequential_task_sampler_failures int

Max consecutive task sampling failures

10
max_allowed_sequential_rollout_failures int

Max consecutive rollout failures

10
filter_for_successful_trajectories bool

Whether to filter for successful trajectories only

False
runner_class

Runner class with hook methods to call

None
batch_num int | None

Batch number for this house (for batched processing)

None
total_batches int | None

Total number of batches for this house

None
datagen_profiler DatagenProfiler | None

DatagenProfiler for per-worker timing (optional)

None

Returns:

Name Type Description
tuple tuple[int, int, bool]

(house_success_count, house_total_count, irrecoverable_failure_flag)

Source code in molmo_spaces/data_generation/pipeline.py
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
@staticmethod
def process_single_house(
    worker_id: int,
    worker_logger,
    house_id: int,
    exp_config: MlSpacesExpConfig,
    samples_per_house: int,
    shutdown_event,
    task_sampler,
    preloaded_policy: BasePolicy | None = None,
    max_allowed_sequential_task_sampler_failures: int = 10,
    max_allowed_sequential_rollout_failures: int = 10,
    filter_for_successful_trajectories: bool = False,
    runner_class=None,
    batch_num: int | None = None,
    total_batches: int | None = None,
    datagen_profiler: DatagenProfiler | None = None,
) -> tuple[int, int, bool]:
    """
    Process all episodes for a single house using customizable hooks.

    This method uses a while loop to iterate over episodes, calling hook methods
    via runner_class to allow subclasses to customize behavior without duplicating
    the entire method.

    Hooks called (override in subclass to customize):
    - load_episodes_for_house: Load episode specs from source (JSON, etc.)
    - get_max_episode_attempts: Maximum iterations of the episode loop
    - should_stop_early: Whether to stop before max attempts (e.g., enough successes)
    - prepare_episode_config: Modify config per-episode
    - get_episode_task_sampler: Get/create task sampler for episode
    - sample_task_from_spec: Sample task from specification
    - get_episode_seed: Get seed for episode
    - should_close_episode_task_sampler: Whether to close sampler per-episode

    Args:
        worker_id: ID of the worker thread/process
        worker_logger: Logger instance for this worker
        house_id: Index of the house to process
        exp_config: Experiment configuration
        samples_per_house: Number of episodes to collect for this house
        shutdown_event: Event to signal shutdown
        task_sampler: Task sampler instance (shared across houses for this worker)
        preloaded_policy: Optional pre-initialized policy instance
        max_allowed_sequential_task_sampler_failures: Max consecutive task sampling failures
        max_allowed_sequential_rollout_failures: Max consecutive rollout failures
        filter_for_successful_trajectories: Whether to filter for successful trajectories only
        runner_class: Runner class with hook methods to call
        batch_num: Batch number for this house (for batched processing)
        total_batches: Total number of batches for this house
        datagen_profiler: DatagenProfiler for per-worker timing (optional)

    Returns:
        tuple: (house_success_count, house_total_count, irrecoverable_failure_flag)
    """
    house_success_count = 0
    house_total_count = 0
    irrecoverable_failure_in_house = False

    # Setup directories and check for existing output
    house_output_dir, house_debug_dir, batch_suffix, should_skip = setup_house_dirs(
        exp_config, house_id, batch_num, total_batches
    )
    if should_skip:
        worker_logger.info(
            f"SKIPPING HOUSE {house_id} BATCH {batch_num}/{total_batches}: "
            f"Output already exists at {house_output_dir / f'trajectories{batch_suffix}.h5'}"
        )
        return 0, 0, False

    # Load episodes using hook - allows subclasses to load from different scene datasets
    episode_specs, shared_task_sampler = runner_class.load_episodes_for_house(
        exp_config, house_id, batch_suffix, task_sampler, worker_logger
    )

    if not episode_specs:
        worker_logger.warning(f"No episodes to process for house {house_id}")
        return 0, 0, False

    max_attempts = runner_class.get_max_episode_attempts(
        episode_specs, samples_per_house, exp_config
    )

    # Collect raw history data for this house
    house_raw_histories = []
    house_debug_raw_histories = []

    # Sequential failure tracking
    num_sequential_task_sampler_failures = 0
    num_sequential_rollout_failures = 0
    viewer = None

    # While loop with explicit index - allows subclasses to customize iteration
    episode_idx = 0
    while episode_idx < max_attempts:
        # Check early stop condition (e.g., enough successes for datagen)
        should_stop = runner_class.should_stop_early(
            len(house_raw_histories), samples_per_house, exp_config=exp_config
        )
        if should_stop:
            break

        # Check for shutdown signal
        if shutdown_event.is_set():
            worker_logger.info(f"Worker {worker_id} house {house_id} received shutdown signal")
            irrecoverable_failure_in_house = True
            break

        # Check for too many consecutive task sampling failures
        if num_sequential_task_sampler_failures >= max_allowed_sequential_task_sampler_failures:
            worker_logger.error(
                f"Worker {worker_id} house {house_id} encountered "
                f"{num_sequential_task_sampler_failures} consecutive task sampling failures. "
                "This is unrecoverable."
            )
            irrecoverable_failure_in_house = True
            break

        # Check for too many consecutive rollout failures
        if num_sequential_rollout_failures >= max_allowed_sequential_rollout_failures:
            worker_logger.error(
                f"Worker {worker_id} house {house_id} rollout failed across "
                f"{num_sequential_rollout_failures} retries. This is irrecoverable."
            )
            irrecoverable_failure_in_house = True
            break

        # Get episode spec for this iteration
        episode_spec = runner_class.get_episode_spec_at_index(episode_specs, episode_idx)

        # Track state for this episode
        task = None
        policy = None
        episode_task_sampler = None
        success = False
        task_sampling_failed = False
        house_invalid = False

        if datagen_profiler is not None:
            datagen_profiler.start("episode_total")

        # Prepare episode-specific config
        episode_config = runner_class.prepare_episode_config(
            exp_config, episode_spec, episode_idx
        )

        with cleanup_context():
            if viewer is not None:
                viewer.close()
                viewer = None

            # Task sampling phase
            task_sampling_start = time.perf_counter()

            try:
                # Get task sampler for this episode (shared or per-episode)
                episode_task_sampler = runner_class.get_episode_task_sampler(
                    episode_config, episode_spec, shared_task_sampler, datagen_profiler
                )

                # Sample task
                task = runner_class.sample_task_from_spec(
                    episode_task_sampler, house_id, episode_spec, episode_idx
                )

                if task is None:
                    worker_logger.info(
                        f"Worker {worker_id} house {house_id} episode {episode_idx}: "
                        "task sampling returned None"
                    )
                    house_invalid = True
                else:
                    # Record successful sampling time
                    if datagen_profiler is not None:
                        datagen_profiler.record(
                            "task_sampling", time.perf_counter() - task_sampling_start
                        )
                        task.set_datagen_profiler(datagen_profiler)

                    num_sequential_task_sampler_failures = 0

                    worker_logger.info(
                        f"Worker {worker_id} house {house_id} episode {episode_idx}/{max_attempts} "
                        f"collected={len(house_raw_histories)}/{samples_per_house}"
                    )

            except HouseInvalidForTask as e:
                traceback.print_exc()
                worker_logger.warning(
                    f"Worker {worker_id} house {house_id} episode {episode_idx} "
                    f"HouseInvalidForTask: {e.reason}"
                )
                house_invalid = True
                if datagen_profiler is not None:
                    datagen_profiler.record(
                        "task_sampling_failed", time.perf_counter() - task_sampling_start
                    )

            except Exception as e:
                traceback.print_exc()
                worker_logger.error(
                    f"Worker {worker_id} house {house_id} episode {episode_idx} "
                    f"task sampling error: {str(e)}"
                )
                num_sequential_task_sampler_failures += 1
                task_sampling_failed = True
                if datagen_profiler is not None:
                    datagen_profiler.record(
                        "task_sampling_failed", time.perf_counter() - task_sampling_start
                    )

            # Rollout phase (only if task sampling succeeded)
            if task is not None and not house_invalid and not task_sampling_failed:
                try:
                    # Setup policy and viewer
                    policy = setup_policy(
                        episode_config, task, preloaded_policy, datagen_profiler
                    )
                    viewer = setup_viewer(episode_config, task, policy, viewer)

                    # Get episode seed
                    episode_seed = runner_class.get_episode_seed(
                        episode_idx, episode_spec, episode_task_sampler
                    )

                    # Run the rollout
                    success = runner_class.run_single_rollout(
                        episode_seed=episode_seed,
                        task=task,
                        policy=policy,
                        profiler=episode_config.profiler,
                        viewer=viewer,
                        shutdown_event=shutdown_event,
                        datagen_profiler=datagen_profiler,
                        end_on_success=exp_config.end_on_success,
                    )

                    num_sequential_rollout_failures = 0

                    # Extract object name for logging if available
                    object_name = "unknown"
                    if hasattr(task, "config") and hasattr(task.config, "task_config"):
                        if hasattr(task.config.task_config, "pickup_obj_name"):
                            object_name = task.config.task_config.pickup_obj_name

                    worker_logger.info(
                        f"Worker {worker_id} house {house_id} episode {episode_idx} "
                        f"object {object_name} completed with success={success}"
                    )

                    # Collect trajectory
                    should_save = success or not filter_for_successful_trajectories
                    history = task.get_history()
                    should_save_debug = not should_save and random.random() < 0.01

                    if should_save or should_save_debug:
                        episode_info = {
                            "history": history,
                            "sensor_suite": task.sensor_suite,
                            "success": success,
                            "seed": episode_seed,
                        }
                        if should_save:
                            house_raw_histories.append(episode_info)
                        elif should_save_debug:
                            house_debug_raw_histories.append(episode_info)
                            worker_logger.info(
                                f"Queueing failed trajectory for debug (seed: {episode_seed})"
                            )
                    else:
                        del history

                    # Update house counters
                    house_total_count += 1
                    if success:
                        house_success_count += 1
                    else:
                        # Report failure for this asset (may lead to dynamic blacklisting)
                        asset_uid = task_sampler.get_asset_uid_from_object(
                            task.env, object_name
                        )
                        if asset_uid:
                            task_sampler.report_asset_failure(
                                asset_uid, "rollout failed (e.g., IK failure)"
                            )

                    if datagen_profiler is not None:
                        datagen_profiler.end("episode_total")
                        datagen_profiler.log_episode_summary(
                            episode_idx=episode_idx,
                            house_id=house_id,
                            success=success,
                        )

                except Exception as e:
                    worker_logger.error(
                        f"Worker {worker_id} house {house_id} episode {episode_idx} rollout error: {str(e)}"
                    )
                    traceback.print_exc()
                    num_sequential_rollout_failures += 1

                    # Report failure for this asset (may lead to dynamic blacklisting)
                    try:
                        asset_uid = task_sampler.get_asset_uid_from_object(
                            task.env, object_name
                        )
                        if asset_uid:
                            task_sampler.report_asset_failure(
                                asset_uid, f"rollout exception: {e}"
                            )
                    except Exception:
                        pass  # Don't let failure tracking break the error handling

                    if datagen_profiler is not None:
                        datagen_profiler.end("episode_total")

            else:
                # Task sampling failed or house invalid
                if datagen_profiler is not None:
                    datagen_profiler.end("episode_total")

            # Cleanup resources
            cleanup_episode_resources(
                task=task,
                policy=policy,
                task_sampler=episode_task_sampler,
                preloaded_policy=preloaded_policy,
                close_task_sampler=runner_class.should_close_episode_task_sampler(),
            )

        # Handle house invalid - break after cleanup
        if house_invalid:
            irrecoverable_failure_in_house = True
            break

        # Always increment episode index
        episode_idx += 1

    # Cleanup viewer
    if viewer is not None:
        viewer.close()
        viewer = None

    # Check shutdown signal before saving
    if shutdown_event.is_set():
        worker_logger.info(
            f"Worker {worker_id} house {house_id} shutdown requested, skipping save"
        )
        return house_success_count, house_total_count, True

    # Save trajectories
    save_house_trajectories(
        worker_logger,
        house_raw_histories,
        house_output_dir,
        exp_config,
        batch_suffix,
        datagen_profiler,
        batch_num,
        total_batches,
    )

    # Save debug trajectories
    save_house_trajectories(
        worker_logger,
        house_debug_raw_histories,
        house_debug_dir,
        exp_config,
        batch_suffix,
        datagen_profiler=None,
        batch_num=batch_num,
        total_batches=total_batches,
    )

    worker_logger.info(
        f"Worker {worker_id} completed house {house_id}: "
        f"{house_success_count}/{house_total_count} successful episodes"
    )

    if datagen_profiler is not None:
        datagen_profiler.log_house_summary(
            house_id=house_id,
            success_count=house_success_count,
            total_count=house_total_count,
        )

    return house_success_count, house_total_count, irrecoverable_failure_in_house

run

run(preloaded_policy: BasePolicy | None = None) -> tuple[int, int]

Run house-by-house rollouts using multiprocessing workers.

Parameters:

Name Type Description Default
preloaded_policy BasePolicy | None

Optional pre-initialized policy instance to use for rollouts. If None, a new policy will be created for each rollout.

None

Returns:

Name Type Description
tuple tuple[int, int]

(success_count, total_count)

Source code in molmo_spaces/data_generation/pipeline.py
def run(self, preloaded_policy: BasePolicy | None = None) -> tuple[int, int]:
    """
    Run house-by-house rollouts using multiprocessing workers.

    Args:
        preloaded_policy: Optional pre-initialized policy instance to use for rollouts.
            If None, a new policy will be created for each rollout.

    Returns:
        tuple: (success_count, total_count)
    """
    total_expected_episodes = self.total_houses * self.samples_per_house
    self.logger.info(
        f"Starting house-by-house rollout of {self.total_houses} houses "
        f"with {self.samples_per_house} episodes each ({total_expected_episodes} total episodes) "
        f"using {self.config.num_workers} worker processes"
    )

    # make a copy of the config in the output directory
    self.logger.info("Evaluation configuration:")
    self.logger.info(pprint.pformat(self.config.model_dump()))
    self.config.save_config(output_dir=Path(self.config.output_dir))

    # Start timing for WandB metrics
    start_time = time.time()

    # Launch worker processes
    if self.config.num_workers > 1:
        processes = []
        for worker_id in range(self.config.num_workers):
            p = mp_context.Process(
                target=house_processing_worker,
                args=(
                    worker_id,
                    self.config,
                    self.house_indices,
                    self.samples_per_house,
                    self.shutdown_event,
                    self.counter_lock,
                    self.house_counter,
                    self.success_count,
                    self.total_count,
                    self.completed_houses,
                    self.skipped_houses,
                    self.max_allowed_sequential_task_sampler_failures,
                    self.max_allowed_sequential_rollout_failures,
                    self.max_allowed_sequential_irrecoverable_failures,
                    preloaded_policy,
                    self.config.filter_for_successful_trajectories,
                    type(self),  # Pass the runner class to enable customization via subclassing
                ),
            )
            p.start()
            processes.append(p)

        # Periodic logging loop that monitors progress while workers run
        last_log_time = start_time
        log_interval = 60  # Log every 60 seconds

        while any(p.is_alive() for p in processes):
            # Check if it's time to log
            current_time = time.time()
            if self.wandb_enabled and (current_time - last_log_time) >= log_interval:
                try:
                    # Read current progress from shared counters
                    elapsed_time = current_time - start_time
                    completed = self.completed_houses.value
                    skipped = self.skipped_houses.value
                    success = self.success_count.value
                    total = self.total_count.value
                    active = sum(1 for p in processes if p.is_alive())

                    # Calculate metrics
                    success_rate = success / total if total > 0 else 0.0
                    episodes_per_second = total / elapsed_time if elapsed_time > 0 else 0.0
                    completion_percentage = (completed + skipped) / self.total_houses * 100

                    # Log to WandB
                    wandb.log(
                        {
                            "elapsed_time_seconds": elapsed_time,
                            "elapsed_time_hours": elapsed_time / 3600,
                            "completed_houses": completed,
                            "skipped_houses": skipped,
                            "success_count": success,
                            "total_count": total,
                            "success_rate": success_rate,
                            "episodes_per_second": episodes_per_second,
                            "active_workers": active,
                            "completion_percentage": completion_percentage,
                        }
                    )
                    self.logger.info(
                        f"Progress: {completed}/{self.total_houses} houses completed "
                        f"({completion_percentage:.1f}%), {success}/{total} successful episodes "
                        f"({success_rate * 100:.1f}%), {active} workers active"
                    )
                    last_log_time = current_time
                except Exception as e:
                    self.logger.warning(f"WandB periodic logging failed: {e}")

            # Sleep briefly before checking again

            time.sleep(5)

        # Wait for all processes to complete
        for p in processes:
            p.join()
            p.close()

    else:
        # Single-worker mode runs in the main process
        house_processing_worker(
            worker_id=0,
            exp_config=self.config,
            house_indices=self.house_indices,
            samples_per_house=self.samples_per_house,
            shutdown_event=self.shutdown_event,
            counter_lock=self.counter_lock,
            house_counter=self.house_counter,
            success_count=self.success_count,
            total_count=self.total_count,
            completed_houses=self.completed_houses,
            skipped_houses=self.skipped_houses,
            max_allowed_sequential_task_sampler_failures=self.max_allowed_sequential_task_sampler_failures,
            max_allowed_sequential_rollout_failures=self.max_allowed_sequential_rollout_failures,
            max_allowed_sequential_irrecoverable_failures=self.max_allowed_sequential_irrecoverable_failures,
            preloaded_policy=preloaded_policy,
            filter_for_successful_trajectories=self.config.filter_for_successful_trajectories,
            runner_class=type(
                self
            ),  # Pass the runner class to enable customization via subclassing
        )

    # Extract final values from shared multiprocessing state
    success_count_val = self.success_count.value
    total_count_val = self.total_count.value
    completed_houses_val = self.completed_houses.value
    skipped_houses_val = self.skipped_houses.value

    success_rate = success_count_val / total_count_val if total_count_val > 0 else 0.0
    self.logger.info(
        f"Completed {completed_houses_val} houses, skipped {skipped_houses_val} houses"
    )
    self.logger.info(f"Success count: {success_count_val}, Total count: {total_count_val}")
    self.logger.info(f"Success rate: {success_rate * 100:.2f}%")

    # Log final metrics to WandB
    if self.wandb_enabled:
        try:
            final_elapsed_time = time.time() - start_time
            wandb.log(
                {
                    "final_success_count": success_count_val,
                    "final_total_count": total_count_val,
                    "final_success_rate": success_rate,
                    "final_completed_houses": completed_houses_val,
                    "final_skipped_houses": skipped_houses_val,
                    "final_elapsed_time_seconds": final_elapsed_time,
                    "final_elapsed_time_hours": final_elapsed_time / 3600,
                }
            )
            wandb.finish()
            self.logger.info("WandB logging finished")
        except Exception as e:
            self.logger.warning(f"WandB final logging failed: {e}")

    return success_count_val, total_count_val

run_single_rollout staticmethod

run_single_rollout(episode_seed: int, task: BaseMujocoTask, policy: Any, profiler: Profiler | None = None, viewer=None, shutdown_event=None, datagen_profiler: DatagenProfiler | None = None, end_on_success: bool = False) -> bool

Execute a single rollout with the given task and policy.

Parameters:

Name Type Description Default
episode_seed int

Seed for this episode

required
task BaseMujocoTask

The task to run

required
policy Any

Policy to use for action selection

required
profiler Profiler | None

Legacy Profiler instance (optional)

None
viewer

MuJoCo viewer for visualization (optional)

None
shutdown_event

Event to signal shutdown (optional)

None
datagen_profiler DatagenProfiler | None

DatagenProfiler for per-worker timing (optional)

None

Returns:

Name Type Description
bool bool

Whether the episode was successful

Source code in molmo_spaces/data_generation/pipeline.py
@staticmethod
def run_single_rollout(
    episode_seed: int,
    task: BaseMujocoTask,
    policy: Any,
    profiler: Profiler | None = None,
    viewer=None,
    shutdown_event=None,
    datagen_profiler: DatagenProfiler | None = None,
    end_on_success: bool = False,
) -> bool:
    """Execute a single rollout with the given task and policy.

    Args:
        episode_seed: Seed for this episode
        task: The task to run
        policy: Policy to use for action selection
        profiler: Legacy Profiler instance (optional)
        viewer: MuJoCo viewer for visualization (optional)
        shutdown_event: Event to signal shutdown (optional)
        datagen_profiler: DatagenProfiler for per-worker timing (optional)

    Returns:
        bool: Whether the episode was successful
    """
    if profiler is not None:
        profiler.start("rollout")
    if datagen_profiler is not None:
        datagen_profiler.start("rollout_total")
        datagen_profiler.start("rollout_reset")

    observation, _info = task.reset()

    if datagen_profiler is not None:
        datagen_profiler.end("rollout_reset")

    if viewer is not None:
        viewer.sync()

    try:
        task.env.current_model.opt.enableflags |= int(mujoco.mjtEnableBit.mjENBL_SLEEP)
    except AttributeError:
        print("Not setting mujoco sleep. Needs version >=mujoco-3.8")

    step_count = 0
    while not task.is_done():
        # Check for shutdown signal
        if shutdown_event is not None and shutdown_event.is_set():
            if datagen_profiler is not None:
                datagen_profiler.end("rollout_total")
            return False

        # Step with policy
        if profiler is not None:
            profiler.start("policy_get_action")
        if datagen_profiler is not None:
            datagen_profiler.start("policy_get_action")
        action_cmd = policy.get_action(observation)
        if profiler is not None:
            profiler.end("policy_get_action")
        if datagen_profiler is not None:
            datagen_profiler.end("policy_get_action")

        # Step the task
        if profiler is not None:
            profiler.start("task_step")
        if datagen_profiler is not None:
            datagen_profiler.start("task_step")
        if action_cmd is None:
            print("Policy returned None action, ending episode")
            break
        observation, reward, terminal, truncated, infos = task.step(action_cmd)
        if profiler is not None:
            profiler.end("task_step")
        if datagen_profiler is not None:
            datagen_profiler.end("task_step")

        step_count += 1
        # Add termination if succ
        if end_on_success and "success" in infos[0] and infos[0]["success"]:
            success = True
            break

        if viewer is not None:
            viewer.sync()

    try:
        task.env.current_model.opt.enableflags &= ~int(mujoco.mjtEnableBit.mjENBL_SLEEP)
    except AttributeError:
        print("Not setting mujoco sleep. Needs version >=mujoco-3.8")

    # Save profiler summary
    if profiler is not None:
        profiler.end("rollout")
    if datagen_profiler is not None:
        datagen_profiler.end("rollout_total")
        # Record step count for reference
        datagen_profiler.record(
            "step_count_indicator", step_count / 1000.0
        )  # Scale down to avoid confusion

    # Check success if method exists
    success = task.judge_success() if hasattr(task, "judge_success") else False

    return success

sample_task_from_spec staticmethod

sample_task_from_spec(task_sampler: JsonEvalTaskSampler, house_id: int, episode_spec: EpisodeSpec, episode_idx: int) -> BaseMujocoTask | None

Sample task - episode spec is already in the JsonEvalTaskSampler.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def sample_task_from_spec(
    task_sampler: JsonEvalTaskSampler,
    house_id: int,
    episode_spec: EpisodeSpec,
    episode_idx: int,
) -> BaseMujocoTask | None:
    """Sample task - episode spec is already in the JsonEvalTaskSampler."""
    return task_sampler.sample_task(house_index=house_id)

should_close_episode_task_sampler staticmethod

should_close_episode_task_sampler() -> bool

Close task sampler after each episode - we create per-episode.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def should_close_episode_task_sampler() -> bool:
    """Close task sampler after each episode - we create per-episode."""
    return True

should_stop_early staticmethod

should_stop_early(num_collected: int, samples_per_house: int, exp_config: MlSpacesExpConfig | None = None) -> bool

Stop early if evaluating a single episode (--idx provided) and it's been collected.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def should_stop_early(
    num_collected: int, samples_per_house: int, exp_config: MlSpacesExpConfig | None = None
) -> bool:
    """Stop early if evaluating a single episode (--idx provided) and it's been collected."""
    if exp_config is not None:
        eval_params = exp_config.eval_runtime_params
        if eval_params.episode_idx is not None:
            # Stop after collecting the single requested episode
            return num_collected >= 1
    return False

LanguageSpec

Bases: BaseModel

Natural language task specification.

Attributes:

Name Type Description
referral_expressions dict[str, str]
referral_expressions_priority dict[str, list[list[float | str]]]
task_description str

referral_expressions class-attribute instance-attribute

referral_expressions: dict[str, str] = Field(default_factory=dict)

referral_expressions_priority class-attribute instance-attribute

referral_expressions_priority: dict[str, list[list[float | str]]] = Field(default_factory=dict)

task_description instance-attribute

task_description: str

NavToObjTaskSpec

Bases: BaseTaskSpec

Task-specific parameters for navigation to object tasks.

Attributes:

Name Type Description
pickup_obj_candidates list[str] | None
pickup_obj_name str
pickup_obj_start_pose list[float] | None
receptacle_name str | None
robot_base_pose list[float]
succ_pos_threshold float
task_cls str
task_type str | None

pickup_obj_candidates class-attribute instance-attribute

pickup_obj_candidates: list[str] | None = None

pickup_obj_name instance-attribute

pickup_obj_name: str

pickup_obj_start_pose class-attribute instance-attribute

pickup_obj_start_pose: list[float] | None = Field(default=None, min_length=7, max_length=7)

receptacle_name class-attribute instance-attribute

receptacle_name: str | None = None

robot_base_pose class-attribute instance-attribute

robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)

succ_pos_threshold class-attribute instance-attribute

succ_pos_threshold: float = 1.5

task_cls instance-attribute

task_cls: str

task_type class-attribute instance-attribute

task_type: str | None = None

OpenCloseTaskSpec

Bases: BaseTaskSpec

Task-specific parameters for open/close tasks.

Attributes:

Name Type Description
any_inst_of_category bool
articulation_object_name str | None
joint_goal_position float | None
joint_index int
joint_name str
joint_start_position float | list[float]
pickup_obj_name str
pickup_obj_start_pose list[float]
robot_base_pose list[float]
task_cls str
task_success_threshold float
task_type str | None

any_inst_of_category class-attribute instance-attribute

any_inst_of_category: bool = False

articulation_object_name class-attribute instance-attribute

articulation_object_name: str | None = None

joint_goal_position class-attribute instance-attribute

joint_goal_position: float | None = None

joint_index class-attribute instance-attribute

joint_index: int = 0

joint_name instance-attribute

joint_name: str

joint_start_position instance-attribute

joint_start_position: float | list[float]

pickup_obj_name instance-attribute

pickup_obj_name: str

pickup_obj_start_pose class-attribute instance-attribute

pickup_obj_start_pose: list[float] = Field(..., min_length=7, max_length=7)

robot_base_pose class-attribute instance-attribute

robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)

task_cls instance-attribute

task_cls: str

task_success_threshold class-attribute instance-attribute

task_success_threshold: float = 0.2

task_type class-attribute instance-attribute

task_type: str | None = None

PickAndPlaceTaskSpec

Bases: PickTaskSpec

Task-specific parameters for pick and place tasks.

Attributes:

Name Type Description
max_place_receptacle_pos_displacement float
max_place_receptacle_rot_displacement float
pickup_obj_goal_pose list[float] | None
pickup_obj_name str
pickup_obj_start_pose list[float]
place_receptacle_name str
place_receptacle_start_pose list[float]
receptacle_name str | None
receptacle_supported_weight_frac float
robot_base_pose list[float]
succ_pos_threshold float
task_cls str
task_type str | None

max_place_receptacle_pos_displacement class-attribute instance-attribute

max_place_receptacle_pos_displacement: float = 0.15

max_place_receptacle_rot_displacement class-attribute instance-attribute

max_place_receptacle_rot_displacement: float = deg2rad(60)

pickup_obj_goal_pose class-attribute instance-attribute

pickup_obj_goal_pose: list[float] | None = Field(default=None, min_length=7, max_length=7)

pickup_obj_name instance-attribute

pickup_obj_name: str

pickup_obj_start_pose class-attribute instance-attribute

pickup_obj_start_pose: list[float] = Field(..., min_length=7, max_length=7)

place_receptacle_name instance-attribute

place_receptacle_name: str

place_receptacle_start_pose class-attribute instance-attribute

place_receptacle_start_pose: list[float] = Field(..., min_length=7, max_length=7)

receptacle_name class-attribute instance-attribute

receptacle_name: str | None = None

receptacle_supported_weight_frac class-attribute instance-attribute

receptacle_supported_weight_frac: float = 0.5

robot_base_pose class-attribute instance-attribute

robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)

succ_pos_threshold class-attribute instance-attribute

succ_pos_threshold: float = 0.01

task_cls instance-attribute

task_cls: str

task_type class-attribute instance-attribute

task_type: str | None = None

PickTaskSpec

Bases: BaseTaskSpec

Task-specific parameters for pick tasks.

Attributes:

Name Type Description
pickup_obj_goal_pose list[float] | None
pickup_obj_name str
pickup_obj_start_pose list[float]
receptacle_name str | None
robot_base_pose list[float]
succ_pos_threshold float
task_cls str
task_type str | None

pickup_obj_goal_pose class-attribute instance-attribute

pickup_obj_goal_pose: list[float] | None = Field(default=None, min_length=7, max_length=7)

pickup_obj_name instance-attribute

pickup_obj_name: str

pickup_obj_start_pose class-attribute instance-attribute

pickup_obj_start_pose: list[float] = Field(..., min_length=7, max_length=7)

receptacle_name class-attribute instance-attribute

receptacle_name: str | None = None

robot_base_pose class-attribute instance-attribute

robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)

succ_pos_threshold class-attribute instance-attribute

succ_pos_threshold: float = 0.01

task_cls instance-attribute

task_cls: str

task_type class-attribute instance-attribute

task_type: str | None = None

RobotMountedCameraSpec

Bases: BaseModel

Specification for a camera mounted on the robot.

Attributes:

Name Type Description
camera_offset list[float]
camera_quaternion list[float]
fov float
lookat_offset list[float]
name str
record_depth bool
reference_body_names list[str]
type Literal['robot_mounted']

camera_offset class-attribute instance-attribute

camera_offset: list[float] = Field(..., min_length=3, max_length=3)

camera_quaternion class-attribute instance-attribute

camera_quaternion: list[float] = Field(..., min_length=4, max_length=4)

fov instance-attribute

fov: float

lookat_offset class-attribute instance-attribute

lookat_offset: list[float] = Field(..., min_length=3, max_length=3)

name instance-attribute

name: str

record_depth class-attribute instance-attribute

record_depth: bool = False

reference_body_names instance-attribute

reference_body_names: list[str]

type class-attribute instance-attribute

type: Literal['robot_mounted'] = 'robot_mounted'

RobotSpec

Bases: BaseModel

Robot initialization specification.

Note: Robot world placement is in task.robot_base_pose, not here. This spec only contains robot-intrinsic state (joint positions).

Attributes:

Name Type Description
init_qpos dict[str, list[float]]
robot_name str

init_qpos instance-attribute

init_qpos: dict[str, list[float]]

robot_name instance-attribute

robot_name: str

SceneModificationsSpec

Bases: BaseModel

Scene modifications required for this episode.

This captures objects that need to be added to the base scene XML and their initial poses.

Attributes:

Name Type Description
added_objects dict[str, str]
object_poses dict[str, list[float]]
removed_objects list[str]

added_objects class-attribute instance-attribute

added_objects: dict[str, str] = Field(default_factory=dict)

object_poses class-attribute instance-attribute

object_poses: dict[str, list[float]] = Field(default_factory=dict)

removed_objects class-attribute instance-attribute

removed_objects: list[str] = Field(default_factory=list)

SourceSpec

Bases: BaseModel

Provenance information for this episode.

Tracks where this episode specification came from (which H5 file and trajectory).

Attributes:

Name Type Description
benchmark_created_date str | None
camera_system_class str | None
episode_length int | None
h5_file str
source_data_date str | None
traj_key str

benchmark_created_date class-attribute instance-attribute

benchmark_created_date: str | None = None

camera_system_class class-attribute instance-attribute

camera_system_class: str | None = None

episode_length class-attribute instance-attribute

episode_length: int | None = None

h5_file instance-attribute

h5_file: str

source_data_date class-attribute instance-attribute

source_data_date: str | None = None

traj_key instance-attribute

traj_key: str

load_all_episodes

load_all_episodes(benchmark_dir: Path) -> list[EpisodeSpec]

Load all episodes from a benchmark directory as a flat list.

Supports two formats: 1. Single benchmark.json file (preferred): List of EpisodeSpec dicts 2. Legacy house_/episode_.json structure

Parameters:

Name Type Description Default
benchmark_dir Path

Path to benchmark directory

required

Returns:

Type Description
list[EpisodeSpec]

List of EpisodeSpec objects

Source code in molmo_spaces/evaluation/benchmark_schema.py
def load_all_episodes(benchmark_dir: Path) -> list[EpisodeSpec]:
    """Load all episodes from a benchmark directory as a flat list.

    Supports two formats:
    1. Single benchmark.json file (preferred): List of EpisodeSpec dicts
    2. Legacy house_*/episode_*.json structure

    Args:
        benchmark_dir: Path to benchmark directory

    Returns:
        List of EpisodeSpec objects
    """
    import json

    # Try new single-file format first
    benchmark_file = benchmark_dir / "benchmark.json"
    if benchmark_file.exists():
        with open(benchmark_file) as f:
            data = json.load(f)
        return [EpisodeSpec.model_validate(ep) for ep in data]

    # Fall back to legacy directory structure
    _, episodes_by_house = load_benchmark(benchmark_dir)
    episodes = []
    for episode_paths in episodes_by_house.values():
        for path in episode_paths:
            episodes.append(EpisodeSpec.from_json_file(path))
    return episodes

load_benchmark

load_benchmark(benchmark_dir: Path) -> tuple[BenchmarkMetadata | None, dict[int, list[Path]]]

Load a benchmark directory.

A benchmark is simply a directory of episode JSON files. Each episode is fully self-contained. An optional benchmark_metadata.json provides human-readable info but is not required.

Parameters:

Name Type Description Default
benchmark_dir Path

Path to benchmark directory containing house_* subdirectories with episode JSON files. May optionally contain benchmark_metadata.json.

required

Returns:

Type Description
tuple[BenchmarkMetadata | None, dict[int, list[Path]]]

Tuple of (BenchmarkMetadata or None, dict mapping house_id -> list of episode JSON paths)

Source code in molmo_spaces/evaluation/benchmark_schema.py
def load_benchmark(
    benchmark_dir: Path,
) -> tuple[BenchmarkMetadata | None, dict[int, list[Path]]]:
    """Load a benchmark directory.

    A benchmark is simply a directory of episode JSON files. Each episode is
    fully self-contained. An optional benchmark_metadata.json provides human-readable
    info but is not required.

    Args:
        benchmark_dir: Path to benchmark directory containing house_* subdirectories
            with episode JSON files. May optionally contain benchmark_metadata.json.

    Returns:
        Tuple of (BenchmarkMetadata or None, dict mapping house_id -> list of episode JSON paths)
    """

    # Load optional metadata (not required)
    metadata: BenchmarkMetadata | None = None
    metadata_path = benchmark_dir / "benchmark_metadata.json"
    if metadata_path.exists():
        metadata = BenchmarkMetadata.from_json_file(metadata_path)

    # Discover episode files organized by house
    episodes_by_house: dict[int, list[Path]] = {}
    for house_dir in sorted(benchmark_dir.glob("house_*")):
        if not house_dir.is_dir():
            continue
        house_id = int(house_dir.name.replace("house_", ""))
        episode_files = sorted(house_dir.glob("episode_*.json"))
        if episode_files:
            episodes_by_house[house_id] = episode_files

    return metadata, episodes_by_house

run_evaluation

run_evaluation(eval_config_cls: type[MlSpacesExpConfig] | str, benchmark_dir: Path, checkpoint_path: str | None = None, task_horizon_steps: int | None = None, task_horizon_sec: float | None = None, output_dir: str | Path | None = None, num_workers: int = 1, use_wandb: bool = False, wandb_project: str = 'mlspaces-online-eval', preloaded_policy: BasePolicy | None = None, max_episodes: int | None = None, camera_config_override: Any | None = None, camera_names_override: list[str] | None = None, environment_light_intensity: float | None = None, episode_idx: int | None = None, add_custom_object: bool = False, custom_object_path: str | Path | None = None, custom_object_name: str | None = None) -> EvaluationResults

Run evaluation on a JSON benchmark programmatically.

This is the primary entry point for running evaluations from external code. It can be imported and called directly without using command-line arguments.

Parameters:

Name Type Description Default
eval_config_cls type[MlSpacesExpConfig] | str

Either an MlSpacesExpConfig subclass, or a string in the format "module.path:ClassName" (e.g., "myrepo.configs:MyEvalConfig").

required
benchmark_dir Path

Path to JSON benchmark directory containing benchmark.json.

required
checkpoint_path str | None

Path to model checkpoint. Overrides the checkpoint in policy_config.

None
task_horizon_steps int | None

Max steps per episode. If None, uses default for the task class.

None
task_horizon_sec float | None

Max seconds per episode, used to calculate horizon in steps. Cannot be used with task_horizon_steps.

None
output_dir str | Path | None

Output directory for results. Defaults to eval_output//.

None
num_workers int

Number of parallel worker processes.

1
use_wandb bool

Whether to log results to Weights & Biases.

False
wandb_project str

W&B project name (only used if use_wandb=True).

'mlspaces-online-eval'
preloaded_policy BasePolicy | None

Optional pre-initialized policy instance. If provided, skips policy creation from config.

None
max_episodes int | None

Maximum number of episodes to evaluate from benchmark. If None, evaluates all episodes.

None
camera_config_override Any | None

Optional camera system config (e.g. FrankaEvalCameraSystem) to replace the default camera_config on the experiment config.

None
camera_names_override list[str] | None

Optional list of camera names to override policy_config.camera_names (e.g. ["randomized_zed2_analogue_1", "wrist_camera"]).

None
episode_idx int | None

Index of a specific episode to evaluate. If None, evaluates all episodes.

None
add_custom_object bool

Whether to replace the target object with a custom object.

False
custom_object_path str | Path | None

Path to the custom object XML file. Required if add_custom_object is True.

None
custom_object_name str | None

Natural language name for the custom object (e.g., 'lemon', 'cup'). If not provided, will attempt to extract from the object path.

None

Returns:

Type Description
EvaluationResults

EvaluationResults containing success counts, output paths, and per-episode details.

Raises:

Type Description
FileNotFoundError

If benchmark_dir doesn't exist.

ValueError

If no episodes found in benchmark or config class not found.

Example

from molmo_spaces.evaluation import run_evaluation from my_repo.configs import MyEvalConfig

results = run_evaluation( eval_config_cls=MyEvalConfig, benchmark_dir="/path/to/benchmark", checkpoint_path="/path/to/checkpoint.pt", task_horizon_steps=500, ) print(f"Success rate: {results.success_rate:.1%}")

Source code in molmo_spaces/evaluation/eval_main.py
def run_evaluation(
    eval_config_cls: type[MlSpacesExpConfig] | str,
    benchmark_dir: Path,
    checkpoint_path: str | None = None,
    task_horizon_steps: int | None = None,
    task_horizon_sec: float | None = None,
    output_dir: str | Path | None = None,
    num_workers: int = 1,
    use_wandb: bool = False,
    wandb_project: str = "mlspaces-online-eval",
    preloaded_policy: BasePolicy | None = None,
    max_episodes: int | None = None,
    camera_config_override: Any | None = None,
    camera_names_override: list[str] | None = None,
    environment_light_intensity: float | None = None,
    episode_idx: int | None = None,
    add_custom_object: bool = False,
    custom_object_path: str | Path | None = None,
    custom_object_name: str | None = None,
) -> EvaluationResults:
    """Run evaluation on a JSON benchmark programmatically.

    This is the primary entry point for running evaluations from external code.
    It can be imported and called directly without using command-line arguments.

    Args:
        eval_config_cls: Either an MlSpacesExpConfig subclass, or a string in the format
            "module.path:ClassName" (e.g., "myrepo.configs:MyEvalConfig").
        benchmark_dir: Path to JSON benchmark directory containing benchmark.json.
        checkpoint_path: Path to model checkpoint. Overrides the checkpoint in policy_config.
        task_horizon_steps: Max steps per episode. If None, uses default for the task class.
        task_horizon_sec: Max seconds per episode, used to calculate horizon in steps. Cannot be used with task_horizon_steps.
        output_dir: Output directory for results. Defaults to eval_output/<config>/<timestamp>.
        num_workers: Number of parallel worker processes.
        use_wandb: Whether to log results to Weights & Biases.
        wandb_project: W&B project name (only used if use_wandb=True).
        preloaded_policy: Optional pre-initialized policy instance. If provided, skips
            policy creation from config.
        max_episodes: Maximum number of episodes to evaluate from benchmark. If None, evaluates all episodes.
        camera_config_override: Optional camera system config (e.g. FrankaEvalCameraSystem) to
            replace the default camera_config on the experiment config.
        camera_names_override: Optional list of camera names to override
            policy_config.camera_names (e.g. ["randomized_zed2_analogue_1", "wrist_camera"]).
        episode_idx: Index of a specific episode to evaluate. If None, evaluates all episodes.
        add_custom_object: Whether to replace the target object with a custom object.
        custom_object_path: Path to the custom object XML file. Required if add_custom_object is True.
        custom_object_name: Natural language name for the custom object (e.g., 'lemon', 'cup').
            If not provided, will attempt to extract from the object path.

    Returns:
        EvaluationResults containing success counts, output paths, and per-episode details.

    Raises:
        FileNotFoundError: If benchmark_dir doesn't exist.
        ValueError: If no episodes found in benchmark or config class not found.

    Example:
        from molmo_spaces.evaluation import run_evaluation
        from my_repo.configs import MyEvalConfig

        results = run_evaluation(
            eval_config_cls=MyEvalConfig,
            benchmark_dir="/path/to/benchmark",
            checkpoint_path="/path/to/checkpoint.pt",
            task_horizon_steps=500,
        )
        print(f"Success rate: {results.success_rate:.1%}")
    """
    # Resolve config class if provided as string
    # Preserve the original string for config_name in case the registered name
    # differs from the class __name__ (e.g., a custom registry name)
    config_name_from_str: str | None = None
    if isinstance(eval_config_cls, str):
        config_name_from_str = eval_config_cls
        if ":" in eval_config_cls:
            # Full module path provided - import and get class directly
            module_path, class_name = eval_config_cls.split(":")
            module = importlib.import_module(module_path)
            eval_config_cls = getattr(module, class_name)
        else:
            # Just a class name - look up in registry
            class_name = eval_config_cls
            eval_config_cls = get_config_class(class_name)

    # Validate benchmark directory
    benchmark_dir = benchmark_dir.resolve()
    if not benchmark_dir.exists():
        raise FileNotFoundError(f"Benchmark directory not found: {benchmark_dir}")

    # Load benchmark episodes (for summary info and validation)
    episodes = load_all_episodes(benchmark_dir)

    # Validate episode index if specified
    if episode_idx is not None:
        if episode_idx < 0 or episode_idx >= len(episodes):
            raise ValueError(
                f"Episode index {episode_idx} is out of range. "
                f"Benchmark has {len(episodes)} episodes (indices 0-{len(episodes) - 1})"
            )
        log.info(f"Will evaluate single episode at index {episode_idx}")

    # Validate custom object path if requested
    if add_custom_object:
        if custom_object_path is None:
            raise ValueError(
                "--custom_object_path must be provided when --add_custom_object is set"
            )
        custom_object_path = Path(custom_object_path)
        if not custom_object_path.exists():
            raise FileNotFoundError(f"Custom object path does not exist: {custom_object_path}")
        log.info(f"Will replace target objects with custom object: {custom_object_path}")
        if custom_object_name is None:
            custom_object_name = custom_object_path.stem
            log.warning(f"No custom object name provided, using path stem: {custom_object_name}")
        else:
            log.info(f"Using provided custom object name: {custom_object_name}")

    if max_episodes is not None and len(episodes) > max_episodes:
        log.info(f"Evaluating the first {max_episodes} episodes of {len(episodes)} total episodes")
        episodes = episodes[:max_episodes]
    if not episodes:
        raise ValueError(
            f"No episodes found in benchmark at {benchmark_dir}. "
            f"Expected benchmark.json file with list of episode specs."
        )

    total_episodes = len(episodes)
    num_houses = len(set(ep.house_index for ep in episodes))

    # Create timestamp and output directory
    # Use the original string if eval_config_cls was passed as a string, otherwise use __name__.
    # This handles cases where the registry name differs from the class name.
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    if config_name_from_str:
        # If "module:ClassName" format, use just the class name for the output dir
        config_name = (
            config_name_from_str.split(":")[-1]
            if ":" in config_name_from_str
            else config_name_from_str
        )
    else:
        config_name = eval_config_cls.__name__

    if output_dir is not None:
        resolved_output_dir = Path(output_dir) / config_name / timestamp
    else:
        resolved_output_dir = Path("eval_output") / config_name / timestamp
    os.makedirs(resolved_output_dir, exist_ok=True)

    # Determine task horizon
    assert not (task_horizon_steps is not None and task_horizon_sec is not None), (
        "Cannot use both task_horizon_steps and task_horizon_sec"
    )
    task_horizon: int | None = None
    if task_horizon_steps is not None:
        task_horizon = task_horizon_steps
    elif task_horizon_sec is not None:
        policy_dt_ms = eval_config_cls.model_fields["policy_dt_ms"].get_default()
        assert isinstance(policy_dt_ms, float | int), (
            f"policy_dt_ms must be a float or int, got {type(policy_dt_ms)}"
        )
        task_horizon = round(task_horizon_sec * 1000.0 / policy_dt_ms)
    config_policy_dt_ms = eval_config_cls.model_fields["policy_dt_ms"].get_default()
    resolved_task_horizon = determine_task_horizon(
        episodes, task_horizon, policy_dt_ms=config_policy_dt_ms
    )

    # Create experiment config
    exp_config = create_eval_config(
        eval_config_cls=eval_config_cls,
        benchmark_dir=benchmark_dir,
        output_dir=resolved_output_dir,
        checkpoint_path=checkpoint_path,
        task_horizon=resolved_task_horizon,
        num_workers=num_workers,
        camera_config_override=camera_config_override,
    )

    # Custom filament settings to overwrite by the user
    exp_config.environment_light_intensity = (
        environment_light_intensity or exp_config.environment_light_intensity
    )

    # Override policy camera names if requested
    if camera_names_override is not None:
        log.info(f"Overriding policy_config.camera_names: {camera_names_override}")
        exp_config.policy_config.camera_names = camera_names_override

    # Patch config with evaluation-specific runtime parameters
    exp_config = JsonEvalRunner.patch_config(
        exp_config=exp_config,
        episode_idx=episode_idx,
        max_episodes=max_episodes,
        add_custom_object=add_custom_object,
        custom_object_path=custom_object_path,
        custom_object_name=custom_object_name,
    )
    JsonEvalRunner.adjust_robot(exp_config)

    # Resolve checkpoint path for logging
    resolved_checkpoint = checkpoint_path or getattr(
        exp_config.policy_config, "checkpoint_path", None
    )

    # Initialize wandb if requested
    if use_wandb:
        import wandb

        if resolved_checkpoint:
            path_parts = Path(resolved_checkpoint).parts
            ckpt_name_parts = [p for p in path_parts[-2:] if p and p != "/"]
            ckpt_name = "_".join(ckpt_name_parts)
        else:
            ckpt_name = "no_ckpt"

        wandb_run_name = f"{ckpt_name}_{timestamp}"
        wandb.init(project=wandb_project, name=wandb_run_name)
        wandb.config.update(
            {
                "checkpoint_path": resolved_checkpoint,
                "benchmark_dir": str(benchmark_dir),
                "task_horizon_steps": exp_config.task_horizon,
                "task_horizon_sec": exp_config.task_horizon / exp_config.fps,
                "exp_config_cls": config_name,
                "num_episodes": total_episodes,
                "num_houses": num_houses,
            }
        )

    # Create or use provided policy
    if preloaded_policy is not None:
        policy = preloaded_policy
    else:
        policy = exp_config.policy_config.policy_cls(exp_config, exp_config.task_type)

    # # Run evaluation
    # runner = JsonEvalRunner(exp_config, benchmark_dir)
    # success_count, total_count = runner.run(preloaded_policy=policy)

    # Run evaluation
    # Only pass preloaded policy for single-worker mode. With multiple workers,
    # each worker must create its own connection (WebSocket/msgpack can't be pickled).
    runner = JsonEvalRunner(exp_config, benchmark_dir)
    success_count, total_count = runner.run(preloaded_policy=policy)

    # Collect per-episode results
    episode_results = collect_episode_results(resolved_output_dir)

    # Log to wandb if enabled
    if use_wandb:
        import wandb

        camera_names = getattr(exp_config.policy_config, "camera_names", [])
        if camera_names:
            success_status = build_success_status_map(episode_results)
            composed_videos = compose_episode_videos(
                eval_dir=resolved_output_dir,
                camera_names=camera_names,
                success_status=success_status,
            )
        else:
            composed_videos = {}

        log_eval_results_to_wandb(
            results=episode_results,
            composed_videos=composed_videos,
        )
        wandb.finish()

    return EvaluationResults(
        success_count=success_count,
        total_count=total_count,
        output_dir=resolved_output_dir,
        episode_results=episode_results,
        exp_config=exp_config,
    )

benchmark_schema

JSON-based benchmark schema definitions.

This module defines Pydantic models for JSON benchmark files that fully specify episode initialization without relying on pickle serialization. Each episode is fully self-contained and can be loaded/inspected independently.

Design principles
  • Each episode JSON is fully self-contained (no external config dependencies)
  • A benchmark is simply a list/directory of episode JSONs (can mix task types)
  • All fields needed to recreate exact initial conditions are explicit
  • Task horizon is NOT stored per-episode - it's an evaluation parameter
Benchmark directory structure

benchmark_dir/ ├── house_5/ │ ├── episode_00000000.json # Fully self-contained │ └── ... └── ...

Key fields for robot placement
  • robot.init_qpos: Initial joint positions per move group
  • task.robot_base_pose: Robot base pose in world frame (NOT robot.default_world_pose)

The actual robot world placement comes from task.robot_base_pose, which is set by the task sampler and frozen into the episode. The robot_config.default_world_pose field in the codebase is just a default that gets overridden.

Task horizons

Task horizon (max steps per episode) is an EVALUATION parameter, not a task specification. Use DEFAULT_TASK_HORIZONS for sensible defaults per task class, and override via command line for specific eval runs.

Classes:

Name Description
BaseTaskSpec

Base task specification with fields common to all task types.

BenchmarkMetadata

Optional metadata for a benchmark directory.

DoorOpeningTaskSpec

Task-specific parameters for door opening tasks.

EpisodeSpec

Complete specification for a single benchmark episode.

ExocentricCameraSpec

Specification for an exocentric (fixed) camera.

LanguageSpec

Natural language task specification.

NavToObjTaskSpec

Task-specific parameters for navigation to object tasks.

OpenCloseTaskSpec

Task-specific parameters for open/close tasks.

PickAndPlaceColorTaskSpec

Task-specific parameters for pick and place color tasks.

PickAndPlaceNextToTaskSpec

Task-specific parameters for pick and place next-to tasks.

PickAndPlaceTaskSpec

Task-specific parameters for pick and place tasks.

PickTaskSpec

Task-specific parameters for pick tasks.

RobotMountedCameraSpec

Specification for a camera mounted on the robot.

RobotSpec

Robot initialization specification.

SceneModificationsSpec

Scene modifications required for this episode.

SourceSpec

Provenance information for this episode.

Functions:

Name Description
get_task_spec_field_names

Get all field names from TaskSpec models that should be copied to task_config.

load_all_episodes

Load all episodes from a benchmark directory as a flat list.

load_benchmark

Load a benchmark directory.

replace_target_object_with_custom

Replace the target object in an episode with a custom object.

Attributes:

Name Type Description
ALL_TASK_SPEC_CLASSES list[type[BaseTaskSpec]]
CameraSpec
TaskSpec

CameraSpec module-attribute

BaseTaskSpec

Bases: BaseModel

Base task specification with fields common to all task types.

robot_base_pose is the authoritative field for robot world placement. This comes from task_config in the codebase, not robot_config.

task_cls is the authoritative identifier for the task type. The eval task sampler is responsible for interpreting task_cls and creating the appropriate task. task_type is optional and for human convenience only.

Attributes:

Name Type Description
robot_base_pose list[float]
task_cls str
task_type str | None
robot_base_pose class-attribute instance-attribute
robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)
task_cls instance-attribute
task_cls: str
task_type class-attribute instance-attribute
task_type: str | None = None

BenchmarkMetadata

Bases: BaseModel

Optional metadata for a benchmark directory.

This is NOT required - each episode is fully self-contained. This file provides optional human-readable metadata about the benchmark.

Classes:

Name Description
Config

Methods:

Name Description
from_json_file

Load benchmark metadata from a JSON file.

to_json_file

Save the benchmark metadata to a JSON file.

Attributes:

Name Type Description
benchmark_created_date str | None
camera_system_class str | None
created_at str | None
description str | None
episode_length_stats dict[str, float] | None
house_counts dict[int, int] | None
num_episodes int | None
num_houses int | None
object_category_counts dict[str, int] | None
robot_counts dict[str, int] | None
source_data_date str | None
source_datagen_path str | None
task_cls_counts dict[str, int] | None
benchmark_created_date class-attribute instance-attribute
benchmark_created_date: str | None = None
camera_system_class class-attribute instance-attribute
camera_system_class: str | None = None
created_at class-attribute instance-attribute
created_at: str | None = None
description class-attribute instance-attribute
description: str | None = None
episode_length_stats class-attribute instance-attribute
episode_length_stats: dict[str, float] | None = None
house_counts class-attribute instance-attribute
house_counts: dict[int, int] | None = None
num_episodes class-attribute instance-attribute
num_episodes: int | None = None
num_houses class-attribute instance-attribute
num_houses: int | None = None
object_category_counts class-attribute instance-attribute
object_category_counts: dict[str, int] | None = None
robot_counts class-attribute instance-attribute
robot_counts: dict[str, int] | None = None
source_data_date class-attribute instance-attribute
source_data_date: str | None = None
source_datagen_path class-attribute instance-attribute
source_datagen_path: str | None = None
task_cls_counts class-attribute instance-attribute
task_cls_counts: dict[str, int] | None = None
Config

Attributes:

Name Type Description
extra
extra class-attribute instance-attribute
extra = 'allow'
from_json_file classmethod
from_json_file(path: str | Path) -> BenchmarkMetadata

Load benchmark metadata from a JSON file.

Source code in molmo_spaces/evaluation/benchmark_schema.py
@classmethod
def from_json_file(cls, path: str | Path) -> "BenchmarkMetadata":
    """Load benchmark metadata from a JSON file."""
    path = Path(path)
    with open(path) as f:
        import json

        data = json.load(f)
    return cls.model_validate(data)
to_json_file
to_json_file(path: str | Path) -> None

Save the benchmark metadata to a JSON file.

Source code in molmo_spaces/evaluation/benchmark_schema.py
def to_json_file(self, path: str | Path) -> None:
    """Save the benchmark metadata to a JSON file."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w") as f:
        import json

        json.dump(self.model_dump(), f, indent=2)

DoorOpeningTaskSpec

Bases: BaseTaskSpec

Task-specific parameters for door opening tasks.

Attributes:

Name Type Description
articulated_joint_range list[float] | None
articulated_joint_reset_state list[float] | None
door_body_name str
door_openness_threshold float
robot_base_pose list[float]
task_cls str
task_type str | None
articulated_joint_range class-attribute instance-attribute
articulated_joint_range: list[float] | None = None
articulated_joint_reset_state class-attribute instance-attribute
articulated_joint_reset_state: list[float] | None = None
door_body_name instance-attribute
door_body_name: str
door_openness_threshold class-attribute instance-attribute
door_openness_threshold: float = 0.67
robot_base_pose class-attribute instance-attribute
robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)
task_cls instance-attribute
task_cls: str
task_type class-attribute instance-attribute
task_type: str | None = None

EpisodeSpec

Bases: BaseModel

Complete specification for a single benchmark episode.

This is a FULLY SELF-CONTAINED specification - no external config needed. Contains all information needed to recreate the exact initial conditions for an episode: scene, robot, cameras, and task parameters.

NOTE: Timing/execution parameters (policy_dt_ms, ctrl_dt_ms, sim_dt_ms, task_horizon) are NOT stored per-episode. They come from the evaluation config or command line.

A benchmark is simply a list of EpisodeSpec objects in a single JSON file.

Classes:

Name Description
Config

Methods:

Name Description
from_json_file

Load an episode spec from a JSON file.

get_task_cls

Get fully qualified task class name from task dict (authoritative identifier).

get_task_type

Get optional human-readable task type from task dict.

to_json_file

Save the episode spec to a JSON file.

Attributes:

Name Type Description
cameras list[CameraSpec]
data_split str
house_index int
img_resolution tuple[int, int]
language LanguageSpec
robot RobotSpec
scene_dataset str
scene_modifications SceneModificationsSpec
seed int | None
source SourceSpec | None
task dict
task_relevant_objects list[str]
cameras class-attribute instance-attribute
cameras: list[CameraSpec] = Field(default_factory=list)
data_split class-attribute instance-attribute
data_split: str = 'val'
house_index instance-attribute
house_index: int
img_resolution instance-attribute
img_resolution: tuple[int, int]
language instance-attribute
language: LanguageSpec
robot instance-attribute
robot: RobotSpec
scene_dataset instance-attribute
scene_dataset: str
scene_modifications class-attribute instance-attribute
scene_modifications: SceneModificationsSpec = Field(default_factory=SceneModificationsSpec)
seed class-attribute instance-attribute
seed: int | None = None
source class-attribute instance-attribute
source: SourceSpec | None = None
task instance-attribute
task: dict
task_relevant_objects class-attribute instance-attribute
task_relevant_objects: list[str] = Field(default_factory=list)
Config

Attributes:

Name Type Description
extra
extra class-attribute instance-attribute
extra = 'allow'
from_json_file classmethod
from_json_file(path: str | Path) -> EpisodeSpec

Load an episode spec from a JSON file.

Source code in molmo_spaces/evaluation/benchmark_schema.py
@classmethod
def from_json_file(cls, path: str | Path) -> "EpisodeSpec":
    """Load an episode spec from a JSON file."""
    path = Path(path)
    with open(path) as f:
        import json

        data = json.load(f)
    return cls.model_validate(data)
get_task_cls
get_task_cls() -> str

Get fully qualified task class name from task dict (authoritative identifier).

Source code in molmo_spaces/evaluation/benchmark_schema.py
def get_task_cls(self) -> str:
    """Get fully qualified task class name from task dict (authoritative identifier)."""
    task_cls = self.task.get("task_cls")
    if not task_cls:
        raise ValueError("task dict missing required 'task_cls' field")
    # TODO(max): XXX remove this
    # if "molmo_spaces" in task_cls:  # TODO(rose): forking breanch
    #    print("XXX patching config molmo_spaces->mujoco_thor")
    #    task_cls = task_cls.replace("molmo_spaces", "mujoco_thor")
    return task_cls
get_task_type
get_task_type() -> str | None

Get optional human-readable task type from task dict.

Source code in molmo_spaces/evaluation/benchmark_schema.py
def get_task_type(self) -> str | None:
    """Get optional human-readable task type from task dict."""
    return self.task.get("task_type")
to_json_file
to_json_file(path: str | Path) -> None

Save the episode spec to a JSON file.

Source code in molmo_spaces/evaluation/benchmark_schema.py
def to_json_file(self, path: str | Path) -> None:
    """Save the episode spec to a JSON file."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w") as f:
        import json

        json.dump(self.model_dump(), f, indent=2)

ExocentricCameraSpec

Bases: BaseModel

Specification for an exocentric (fixed) camera.

Attributes:

Name Type Description
forward list[float]
fov float
name str
pos list[float]
record_depth bool
type Literal['exocentric']
up list[float]
forward class-attribute instance-attribute
forward: list[float] = Field(..., min_length=3, max_length=3)
fov instance-attribute
fov: float
name instance-attribute
name: str
pos class-attribute instance-attribute
pos: list[float] = Field(..., min_length=3, max_length=3)
record_depth class-attribute instance-attribute
record_depth: bool = False
type class-attribute instance-attribute
type: Literal['exocentric'] = 'exocentric'
up class-attribute instance-attribute
up: list[float] = Field(..., min_length=3, max_length=3)

LanguageSpec

Bases: BaseModel

Natural language task specification.

Attributes:

Name Type Description
referral_expressions dict[str, str]
referral_expressions_priority dict[str, list[list[float | str]]]
task_description str
referral_expressions class-attribute instance-attribute
referral_expressions: dict[str, str] = Field(default_factory=dict)
referral_expressions_priority class-attribute instance-attribute
referral_expressions_priority: dict[str, list[list[float | str]]] = Field(default_factory=dict)
task_description instance-attribute
task_description: str

NavToObjTaskSpec

Bases: BaseTaskSpec

Task-specific parameters for navigation to object tasks.

Attributes:

Name Type Description
pickup_obj_candidates list[str] | None
pickup_obj_name str
pickup_obj_start_pose list[float] | None
receptacle_name str | None
robot_base_pose list[float]
succ_pos_threshold float
task_cls str
task_type str | None
pickup_obj_candidates class-attribute instance-attribute
pickup_obj_candidates: list[str] | None = None
pickup_obj_name instance-attribute
pickup_obj_name: str
pickup_obj_start_pose class-attribute instance-attribute
pickup_obj_start_pose: list[float] | None = Field(default=None, min_length=7, max_length=7)
receptacle_name class-attribute instance-attribute
receptacle_name: str | None = None
robot_base_pose class-attribute instance-attribute
robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)
succ_pos_threshold class-attribute instance-attribute
succ_pos_threshold: float = 1.5
task_cls instance-attribute
task_cls: str
task_type class-attribute instance-attribute
task_type: str | None = None

OpenCloseTaskSpec

Bases: BaseTaskSpec

Task-specific parameters for open/close tasks.

Attributes:

Name Type Description
any_inst_of_category bool
articulation_object_name str | None
joint_goal_position float | None
joint_index int
joint_name str
joint_start_position float | list[float]
pickup_obj_name str
pickup_obj_start_pose list[float]
robot_base_pose list[float]
task_cls str
task_success_threshold float
task_type str | None
any_inst_of_category class-attribute instance-attribute
any_inst_of_category: bool = False
articulation_object_name class-attribute instance-attribute
articulation_object_name: str | None = None
joint_goal_position class-attribute instance-attribute
joint_goal_position: float | None = None
joint_index class-attribute instance-attribute
joint_index: int = 0
joint_name instance-attribute
joint_name: str
joint_start_position instance-attribute
joint_start_position: float | list[float]
pickup_obj_name instance-attribute
pickup_obj_name: str
pickup_obj_start_pose class-attribute instance-attribute
pickup_obj_start_pose: list[float] = Field(..., min_length=7, max_length=7)
robot_base_pose class-attribute instance-attribute
robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)
task_cls instance-attribute
task_cls: str
task_success_threshold class-attribute instance-attribute
task_success_threshold: float = 0.2
task_type class-attribute instance-attribute
task_type: str | None = None

PickAndPlaceColorTaskSpec

Bases: PickAndPlaceTaskSpec

Task-specific parameters for pick and place color tasks.

Attributes:

Name Type Description
max_place_receptacle_pos_displacement float
max_place_receptacle_rot_displacement float
object_colors dict[str, list[float]] | None
other_receptacle_names list[str] | None
other_receptacle_start_poses dict[str, list[float]] | None
pickup_obj_goal_pose list[float] | None
pickup_obj_name str
pickup_obj_start_pose list[float]
place_receptacle_name str
place_receptacle_start_pose list[float]
receptacle_name str | None
receptacle_supported_weight_frac float
robot_base_pose list[float]
succ_pos_threshold float
task_cls str
task_type str | None
max_place_receptacle_pos_displacement class-attribute instance-attribute
max_place_receptacle_pos_displacement: float = 0.15
max_place_receptacle_rot_displacement class-attribute instance-attribute
max_place_receptacle_rot_displacement: float = deg2rad(60)
object_colors class-attribute instance-attribute
object_colors: dict[str, list[float]] | None = None
other_receptacle_names class-attribute instance-attribute
other_receptacle_names: list[str] | None = None
other_receptacle_start_poses class-attribute instance-attribute
other_receptacle_start_poses: dict[str, list[float]] | None = None
pickup_obj_goal_pose class-attribute instance-attribute
pickup_obj_goal_pose: list[float] | None = Field(default=None, min_length=7, max_length=7)
pickup_obj_name instance-attribute
pickup_obj_name: str
pickup_obj_start_pose class-attribute instance-attribute
pickup_obj_start_pose: list[float] = Field(..., min_length=7, max_length=7)
place_receptacle_name instance-attribute
place_receptacle_name: str
place_receptacle_start_pose class-attribute instance-attribute
place_receptacle_start_pose: list[float] = Field(..., min_length=7, max_length=7)
receptacle_name class-attribute instance-attribute
receptacle_name: str | None = None
receptacle_supported_weight_frac class-attribute instance-attribute
receptacle_supported_weight_frac: float = 0.5
robot_base_pose class-attribute instance-attribute
robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)
succ_pos_threshold class-attribute instance-attribute
succ_pos_threshold: float = 0.01
task_cls instance-attribute
task_cls: str
task_type class-attribute instance-attribute
task_type: str | None = None

PickAndPlaceNextToTaskSpec

Bases: PickAndPlaceTaskSpec

Task-specific parameters for pick and place next-to tasks.

Attributes:

Name Type Description
max_place_receptacle_pos_displacement float
max_place_receptacle_rot_displacement float
max_surface_to_surface_gap float | None
min_surface_to_surface_gap float | None
pickup_obj_goal_pose list[float] | None
pickup_obj_name str
pickup_obj_start_pose list[float]
place_receptacle_name str
place_receptacle_start_pose list[float]
receptacle_name str | None
receptacle_supported_weight_frac float
robot_base_pose list[float]
succ_pos_threshold float
task_cls str
task_type str | None
max_place_receptacle_pos_displacement class-attribute instance-attribute
max_place_receptacle_pos_displacement: float = 0.15
max_place_receptacle_rot_displacement class-attribute instance-attribute
max_place_receptacle_rot_displacement: float = deg2rad(60)
max_surface_to_surface_gap class-attribute instance-attribute
max_surface_to_surface_gap: float | None = None
min_surface_to_surface_gap class-attribute instance-attribute
min_surface_to_surface_gap: float | None = None
pickup_obj_goal_pose class-attribute instance-attribute
pickup_obj_goal_pose: list[float] | None = Field(default=None, min_length=7, max_length=7)
pickup_obj_name instance-attribute
pickup_obj_name: str
pickup_obj_start_pose class-attribute instance-attribute
pickup_obj_start_pose: list[float] = Field(..., min_length=7, max_length=7)
place_receptacle_name instance-attribute
place_receptacle_name: str
place_receptacle_start_pose class-attribute instance-attribute
place_receptacle_start_pose: list[float] = Field(..., min_length=7, max_length=7)
receptacle_name class-attribute instance-attribute
receptacle_name: str | None = None
receptacle_supported_weight_frac class-attribute instance-attribute
receptacle_supported_weight_frac: float = 0.5
robot_base_pose class-attribute instance-attribute
robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)
succ_pos_threshold class-attribute instance-attribute
succ_pos_threshold: float = 0.01
task_cls instance-attribute
task_cls: str
task_type class-attribute instance-attribute
task_type: str | None = None

PickAndPlaceTaskSpec

Bases: PickTaskSpec

Task-specific parameters for pick and place tasks.

Attributes:

Name Type Description
max_place_receptacle_pos_displacement float
max_place_receptacle_rot_displacement float
pickup_obj_goal_pose list[float] | None
pickup_obj_name str
pickup_obj_start_pose list[float]
place_receptacle_name str
place_receptacle_start_pose list[float]
receptacle_name str | None
receptacle_supported_weight_frac float
robot_base_pose list[float]
succ_pos_threshold float
task_cls str
task_type str | None
max_place_receptacle_pos_displacement class-attribute instance-attribute
max_place_receptacle_pos_displacement: float = 0.15
max_place_receptacle_rot_displacement class-attribute instance-attribute
max_place_receptacle_rot_displacement: float = deg2rad(60)
pickup_obj_goal_pose class-attribute instance-attribute
pickup_obj_goal_pose: list[float] | None = Field(default=None, min_length=7, max_length=7)
pickup_obj_name instance-attribute
pickup_obj_name: str
pickup_obj_start_pose class-attribute instance-attribute
pickup_obj_start_pose: list[float] = Field(..., min_length=7, max_length=7)
place_receptacle_name instance-attribute
place_receptacle_name: str
place_receptacle_start_pose class-attribute instance-attribute
place_receptacle_start_pose: list[float] = Field(..., min_length=7, max_length=7)
receptacle_name class-attribute instance-attribute
receptacle_name: str | None = None
receptacle_supported_weight_frac class-attribute instance-attribute
receptacle_supported_weight_frac: float = 0.5
robot_base_pose class-attribute instance-attribute
robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)
succ_pos_threshold class-attribute instance-attribute
succ_pos_threshold: float = 0.01
task_cls instance-attribute
task_cls: str
task_type class-attribute instance-attribute
task_type: str | None = None

PickTaskSpec

Bases: BaseTaskSpec

Task-specific parameters for pick tasks.

Attributes:

Name Type Description
pickup_obj_goal_pose list[float] | None
pickup_obj_name str
pickup_obj_start_pose list[float]
receptacle_name str | None
robot_base_pose list[float]
succ_pos_threshold float
task_cls str
task_type str | None
pickup_obj_goal_pose class-attribute instance-attribute
pickup_obj_goal_pose: list[float] | None = Field(default=None, min_length=7, max_length=7)
pickup_obj_name instance-attribute
pickup_obj_name: str
pickup_obj_start_pose class-attribute instance-attribute
pickup_obj_start_pose: list[float] = Field(..., min_length=7, max_length=7)
receptacle_name class-attribute instance-attribute
receptacle_name: str | None = None
robot_base_pose class-attribute instance-attribute
robot_base_pose: list[float] = Field(..., min_length=7, max_length=7)
succ_pos_threshold class-attribute instance-attribute
succ_pos_threshold: float = 0.01
task_cls instance-attribute
task_cls: str
task_type class-attribute instance-attribute
task_type: str | None = None

RobotMountedCameraSpec

Bases: BaseModel

Specification for a camera mounted on the robot.

Attributes:

Name Type Description
camera_offset list[float]
camera_quaternion list[float]
fov float
lookat_offset list[float]
name str
record_depth bool
reference_body_names list[str]
type Literal['robot_mounted']
camera_offset class-attribute instance-attribute
camera_offset: list[float] = Field(..., min_length=3, max_length=3)
camera_quaternion class-attribute instance-attribute
camera_quaternion: list[float] = Field(..., min_length=4, max_length=4)
fov instance-attribute
fov: float
lookat_offset class-attribute instance-attribute
lookat_offset: list[float] = Field(..., min_length=3, max_length=3)
name instance-attribute
name: str
record_depth class-attribute instance-attribute
record_depth: bool = False
reference_body_names instance-attribute
reference_body_names: list[str]
type class-attribute instance-attribute
type: Literal['robot_mounted'] = 'robot_mounted'

RobotSpec

Bases: BaseModel

Robot initialization specification.

Note: Robot world placement is in task.robot_base_pose, not here. This spec only contains robot-intrinsic state (joint positions).

Attributes:

Name Type Description
init_qpos dict[str, list[float]]
robot_name str
init_qpos instance-attribute
init_qpos: dict[str, list[float]]
robot_name instance-attribute
robot_name: str

SceneModificationsSpec

Bases: BaseModel

Scene modifications required for this episode.

This captures objects that need to be added to the base scene XML and their initial poses.

Attributes:

Name Type Description
added_objects dict[str, str]
object_poses dict[str, list[float]]
removed_objects list[str]
added_objects class-attribute instance-attribute
added_objects: dict[str, str] = Field(default_factory=dict)
object_poses class-attribute instance-attribute
object_poses: dict[str, list[float]] = Field(default_factory=dict)
removed_objects class-attribute instance-attribute
removed_objects: list[str] = Field(default_factory=list)

SourceSpec

Bases: BaseModel

Provenance information for this episode.

Tracks where this episode specification came from (which H5 file and trajectory).

Attributes:

Name Type Description
benchmark_created_date str | None
camera_system_class str | None
episode_length int | None
h5_file str
source_data_date str | None
traj_key str
benchmark_created_date class-attribute instance-attribute
benchmark_created_date: str | None = None
camera_system_class class-attribute instance-attribute
camera_system_class: str | None = None
episode_length class-attribute instance-attribute
episode_length: int | None = None
h5_file instance-attribute
h5_file: str
source_data_date class-attribute instance-attribute
source_data_date: str | None = None
traj_key instance-attribute
traj_key: str

get_task_spec_field_names

get_task_spec_field_names() -> set[str]

Get all field names from TaskSpec models that should be copied to task_config.

Returns the union of all fields from all TaskSpec subclasses, excluding metadata fields (task_cls, task_type) which identify the task but aren't configuration values.

This is derived from the Pydantic models to stay in sync automatically.

Source code in molmo_spaces/evaluation/benchmark_schema.py
def get_task_spec_field_names() -> set[str]:
    """Get all field names from TaskSpec models that should be copied to task_config.

    Returns the union of all fields from all TaskSpec subclasses, excluding
    metadata fields (task_cls, task_type) which identify the task but aren't
    configuration values.

    This is derived from the Pydantic models to stay in sync automatically.
    """
    fields: set[str] = set()
    for spec_cls in ALL_TASK_SPEC_CLASSES:
        fields.update(spec_cls.model_fields.keys())
    return fields - _TASK_METADATA_FIELDS

load_all_episodes

load_all_episodes(benchmark_dir: Path) -> list[EpisodeSpec]

Load all episodes from a benchmark directory as a flat list.

Supports two formats: 1. Single benchmark.json file (preferred): List of EpisodeSpec dicts 2. Legacy house_/episode_.json structure

Parameters:

Name Type Description Default
benchmark_dir Path

Path to benchmark directory

required

Returns:

Type Description
list[EpisodeSpec]

List of EpisodeSpec objects

Source code in molmo_spaces/evaluation/benchmark_schema.py
def load_all_episodes(benchmark_dir: Path) -> list[EpisodeSpec]:
    """Load all episodes from a benchmark directory as a flat list.

    Supports two formats:
    1. Single benchmark.json file (preferred): List of EpisodeSpec dicts
    2. Legacy house_*/episode_*.json structure

    Args:
        benchmark_dir: Path to benchmark directory

    Returns:
        List of EpisodeSpec objects
    """
    import json

    # Try new single-file format first
    benchmark_file = benchmark_dir / "benchmark.json"
    if benchmark_file.exists():
        with open(benchmark_file) as f:
            data = json.load(f)
        return [EpisodeSpec.model_validate(ep) for ep in data]

    # Fall back to legacy directory structure
    _, episodes_by_house = load_benchmark(benchmark_dir)
    episodes = []
    for episode_paths in episodes_by_house.values():
        for path in episode_paths:
            episodes.append(EpisodeSpec.from_json_file(path))
    return episodes

load_benchmark

load_benchmark(benchmark_dir: Path) -> tuple[BenchmarkMetadata | None, dict[int, list[Path]]]

Load a benchmark directory.

A benchmark is simply a directory of episode JSON files. Each episode is fully self-contained. An optional benchmark_metadata.json provides human-readable info but is not required.

Parameters:

Name Type Description Default
benchmark_dir Path

Path to benchmark directory containing house_* subdirectories with episode JSON files. May optionally contain benchmark_metadata.json.

required

Returns:

Type Description
tuple[BenchmarkMetadata | None, dict[int, list[Path]]]

Tuple of (BenchmarkMetadata or None, dict mapping house_id -> list of episode JSON paths)

Source code in molmo_spaces/evaluation/benchmark_schema.py
def load_benchmark(
    benchmark_dir: Path,
) -> tuple[BenchmarkMetadata | None, dict[int, list[Path]]]:
    """Load a benchmark directory.

    A benchmark is simply a directory of episode JSON files. Each episode is
    fully self-contained. An optional benchmark_metadata.json provides human-readable
    info but is not required.

    Args:
        benchmark_dir: Path to benchmark directory containing house_* subdirectories
            with episode JSON files. May optionally contain benchmark_metadata.json.

    Returns:
        Tuple of (BenchmarkMetadata or None, dict mapping house_id -> list of episode JSON paths)
    """

    # Load optional metadata (not required)
    metadata: BenchmarkMetadata | None = None
    metadata_path = benchmark_dir / "benchmark_metadata.json"
    if metadata_path.exists():
        metadata = BenchmarkMetadata.from_json_file(metadata_path)

    # Discover episode files organized by house
    episodes_by_house: dict[int, list[Path]] = {}
    for house_dir in sorted(benchmark_dir.glob("house_*")):
        if not house_dir.is_dir():
            continue
        house_id = int(house_dir.name.replace("house_", ""))
        episode_files = sorted(house_dir.glob("episode_*.json"))
        if episode_files:
            episodes_by_house[house_id] = episode_files

    return metadata, episodes_by_house

replace_target_object_with_custom

replace_target_object_with_custom(episode: EpisodeSpec, custom_object_path: str | Path, custom_object_name: str | None = None) -> EpisodeSpec

Replace the target object in an episode with a custom object.

This function: 1. Identifies the target object from the task specification (e.g., pickup_obj_name) 2. Gets the target object's pose from task or scene_modifications 3. Removes the target object from scene_modifications if it's an added object 4. Adds the custom object to scene_modifications with the same pose 5. Updates the task specification to reference the new custom object

Parameters:

Name Type Description Default
episode EpisodeSpec

The episode specification to modify

required
custom_object_path str | Path

Path to the custom object XML file (relative to ASSETS_DIR or absolute)

required
custom_object_name str | None

Optional natural language name for the custom object (e.g., 'lemon'). If not provided, will extract from the XML body name.

None

Returns:

Type Description
EpisodeSpec

A new EpisodeSpec with the target object replaced by the custom object

Raises:

Type Description
ValueError

If the episode doesn't have a target object or if required fields are missing

Source code in molmo_spaces/evaluation/benchmark_schema.py
def replace_target_object_with_custom(
    episode: EpisodeSpec,
    custom_object_path: str | Path,
    custom_object_name: str | None = None,
) -> EpisodeSpec:
    """Replace the target object in an episode with a custom object.

    This function:
    1. Identifies the target object from the task specification (e.g., pickup_obj_name)
    2. Gets the target object's pose from task or scene_modifications
    3. Removes the target object from scene_modifications if it's an added object
    4. Adds the custom object to scene_modifications with the same pose
    5. Updates the task specification to reference the new custom object

    Args:
        episode: The episode specification to modify
        custom_object_path: Path to the custom object XML file (relative to ASSETS_DIR or absolute)
        custom_object_name: Optional natural language name for the custom object (e.g., 'lemon').
            If not provided, will extract from the XML body name.

    Returns:
        A new EpisodeSpec with the target object replaced by the custom object

    Raises:
        ValueError: If the episode doesn't have a target object or if required fields are missing
    """

    log = logging.getLogger(__name__)

    # Create a deep copy to avoid modifying the original
    modified_episode = copy.deepcopy(episode)

    # Get target object name from task - most tasks use pickup_obj_name
    task = modified_episode.task
    target_obj_name = task.get("pickup_obj_name")

    if not target_obj_name:
        raise ValueError(
            f"Episode task does not have a pickup_obj_name field. "
            f"Task type: {task.get('task_cls', 'unknown')}"
        )

    # Get target object pose - prefer pickup_obj_start_pose from task, fall back to object_poses
    target_obj_pose = None
    if "pickup_obj_start_pose" in task:
        target_obj_pose = task["pickup_obj_start_pose"]
    elif target_obj_name in modified_episode.scene_modifications.object_poses:
        target_obj_pose = modified_episode.scene_modifications.object_poses[target_obj_name]
    else:
        raise ValueError(
            f"Could not find pose for target object '{target_obj_name}'. "
            f"Expected either pickup_obj_start_pose in task or object_poses entry."
        )

    # Ensure we have a valid pose (7 elements: x, y, z, qw, qx, qy, qz)
    if len(target_obj_pose) != 7:
        raise ValueError(
            f"Target object pose must have 7 elements [x, y, z, qw, qx, qy, qz], "
            f"got {len(target_obj_pose)} elements"
        )

    # Convert custom_object_path to string and ensure it's relative to ASSETS_DIR if needed
    custom_obj_path_str = str(custom_object_path)

    # Determine the custom object body name to use
    if custom_object_name:
        # Use the provided custom object name
        custom_obj_body_name = custom_object_name
        log.info(f"Using provided custom object name: '{custom_obj_body_name}'")
    else:
        raise ValueError(
            "No custom object name provided. "
            "Please provide a custom object name using --custom_object_name."
        )

    # Generate a new name for the custom object
    # Use a prefix to avoid conflicts, and use the body name (either provided or from XML)
    custom_obj_name = f"custom_object/{custom_obj_body_name}"

    # Remove the target object from scene_modifications if it's an added object
    # Also add it to removed_objects to ensure it's removed from the base scene if present
    if target_obj_name in modified_episode.scene_modifications.added_objects:
        del modified_episode.scene_modifications.added_objects[target_obj_name]
        log.info(f"Removed target object '{target_obj_name}' from added_objects")

    if target_obj_name in modified_episode.scene_modifications.object_poses:
        del modified_episode.scene_modifications.object_poses[target_obj_name]
        log.info(f"Removed target object '{target_obj_name}' from object_poses")

    # Add to removed_objects to ensure it's removed from base scene if it exists there
    if target_obj_name not in modified_episode.scene_modifications.removed_objects:
        modified_episode.scene_modifications.removed_objects.append(target_obj_name)
        log.info(
            f"Added target object '{target_obj_name}' to removed_objects for base scene removal"
        )

    # Add the custom object to scene_modifications
    modified_episode.scene_modifications.added_objects[custom_obj_name] = custom_obj_path_str
    modified_episode.scene_modifications.object_poses[custom_obj_name] = target_obj_pose.copy()

    # Update task to reference the new custom object
    task["pickup_obj_name"] = custom_obj_name

    # Also update pickup_obj_start_pose to match (in case task uses it)
    task["pickup_obj_start_pose"] = target_obj_pose.copy()

    log.info(
        f"Replaced target object '{target_obj_name}' with custom object '{custom_obj_name}' "
        f"at path '{custom_obj_path_str}'"
    )

    return modified_episode

configs

Modules:

Name Description
evaluation_configs

These configs are EXAMPLES of how to set up evaluation configs for use

evaluation_configs

These configs are EXAMPLES of how to set up evaluation configs for use with JSON benchmarks via molmo_spaces.evaluation.run_evaluation(). The anticipated pattern is that users will create their own eval configs in their own repositories, import run_evaluation from molmo_spaces.evaluation, and pass their config to it.

Example usage from an external repo

from molmo_spaces.evaluation import run_evaluation from my_repo.configs import MyPolicyEvalConfig

results = run_evaluation( eval_config_cls=MyPolicyEvalConfig, benchmark_dir="/path/to/benchmark", checkpoint_path="/path/to/checkpoint", )

Eval configs provide: - Robot config (factories for instantiation, gravcomp settings) - Policy config (checkpoint path, camera names, action spec) - Timing parameters (policy_dt_ms, ctrl_dt_ms, sim_dt_ms)

Episode-specific data (init_qpos, robot_base_pose, cameras, object_poses, task config) comes from the JSON benchmark files, not from these configs. The benchmark JSON is strictly authoritative for episode initialization.

Classes:

Name Description
BrownianMotionPickPlaceColorEvalConfig
BrownianMotionPickPlaceEvalConfig

Evaluation config for Dummy pick and place.

CAPPolicyEvalConfig
DreamZeroPolicyEvalConfig
DummyBenchmarkEvalConfig

Test config that inherits from JsonBenchmarkEvalConfig.

DummyPickPlaceEvalConfig

Evaluation config for Dummy pick and place.

JsonBenchmarkEvalConfig

Minimal base config for JSON benchmark evaluation.

PiPolicyEvalConfig
TeleopPolicyEvalConfig

Attributes:

Name Type Description
TIMESTAMP
TIMESTAMP module-attribute
TIMESTAMP = strftime('%Y%m%d_%H%M%S')
BrownianMotionPickPlaceColorEvalConfig

Bases: BrownianMotionPickPlaceEvalConfig

Classes:

Name Description
Config
SavedEpisode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init
save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config FrankaRandomizedDroidCameraSystem
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config BrownianMotionPolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config BaseRobotConfig
scene_dataset str
seed int | None
sim_dt_ms float
tag str
task_config PickAndPlaceColorTaskConfig
task_config_preset PickTaskConfig | None
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config PickAndPlaceColorTaskSamplerConfig
task_type str
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
viewer_camera None
wandb_name str
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: FrankaRandomizedDroidCameraSystem = FrankaRandomizedDroidCameraSystem()
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'train'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = False
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output') / f'brownian_motion_{TIMESTAMP}'
policy_config class-attribute instance-attribute
policy_config: BrownianMotionPolicyConfig = BrownianMotionPolicyConfig()
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 66.0
profile class-attribute instance-attribute
profile: bool = True
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig = FrankaRobotConfig()
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int | None = None
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: PickAndPlaceColorTaskConfig = PickAndPlaceColorTaskConfig(task_cls=PickAndPlaceColorTask)
task_config_preset class-attribute instance-attribute
task_config_preset: PickTaskConfig | None = None
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 600
task_sampler_config class-attribute instance-attribute
task_sampler_config: PickAndPlaceColorTaskSamplerConfig = PickAndPlaceColorTaskSamplerConfig(task_sampler_class=PickAndPlaceColorTaskSampler, house_inds=[5, 15, 25, 35, 45, 55, 65, 75, 85, 95, 105, 115, 125, 135, 145], samples_per_house=3)
task_type class-attribute instance-attribute
task_type: str = 'pick_and_place_color'
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
viewer_camera class-attribute instance-attribute
viewer_camera: None = None
wandb_name class-attribute instance-attribute
wandb_name: str = f'brownian_motion_pick_place_color_eval_{TIMESTAMP}'
wandb_project class-attribute instance-attribute
wandb_project: str = 'brownian-motion-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config FrankaRobotConfig | None
task_cls_str str | None
task_config PickAndPlaceTaskConfig | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: FrankaRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: PickAndPlaceTaskConfig | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(__context) -> None
Source code in molmo_spaces/evaluation/configs/evaluation_configs.py
def model_post_init(self, __context) -> None:
    super().model_post_init(__context)
    self.robot_config.action_noise_config.enabled = False
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
BrownianMotionPickPlaceEvalConfig

Bases: FrankaPickAndPlaceDataGenConfig

Evaluation config for Dummy pick and place.

Classes:

Name Description
Config
SavedEpisode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init
save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config FrankaRandomizedDroidCameraSystem
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config BrownianMotionPolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config BaseRobotConfig
scene_dataset str
seed int | None
sim_dt_ms float
tag str
task_config PickAndPlaceTaskConfig
task_config_preset PickTaskConfig | None
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config PickAndPlaceTaskSamplerConfig
task_type str
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
viewer_camera None
wandb_name str
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: FrankaRandomizedDroidCameraSystem = FrankaRandomizedDroidCameraSystem()
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'train'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = False
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output') / f'brownian_motion_{TIMESTAMP}'
policy_config class-attribute instance-attribute
policy_config: BrownianMotionPolicyConfig = BrownianMotionPolicyConfig()
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 66.0
profile class-attribute instance-attribute
profile: bool = True
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig = FrankaRobotConfig()
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int | None = None
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: PickAndPlaceTaskConfig = PickAndPlaceTaskConfig(task_cls=PickAndPlaceTask)
task_config_preset class-attribute instance-attribute
task_config_preset: PickTaskConfig | None = None
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 600
task_sampler_config class-attribute instance-attribute
task_sampler_config: PickAndPlaceTaskSamplerConfig = PickAndPlaceTaskSamplerConfig(task_sampler_class=PickAndPlaceTaskSampler, house_inds=[5, 15, 25, 35, 45, 55, 65, 75, 85, 95, 105, 115, 125, 135, 145], samples_per_house=3)
task_type class-attribute instance-attribute
task_type: str = 'pick_and_place'
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
viewer_camera class-attribute instance-attribute
viewer_camera: None = None
wandb_name class-attribute instance-attribute
wandb_name: str = f'brownian_motion_pick_place_eval_{TIMESTAMP}'
wandb_project class-attribute instance-attribute
wandb_project: str = 'brownian-motion-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config FrankaRobotConfig | None
task_cls_str str | None
task_config PickAndPlaceTaskConfig | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: FrankaRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: PickAndPlaceTaskConfig | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(__context) -> None
Source code in molmo_spaces/evaluation/configs/evaluation_configs.py
def model_post_init(self, __context) -> None:
    super().model_post_init(__context)
    self.robot_config.action_noise_config.enabled = False
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
CAPPolicyEvalConfig

Bases: JsonBenchmarkEvalConfig

Classes:

Name Description
Config
SavedEpisode

Config informationd describing a sinlge episode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init
save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config None
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config CAPPolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config FrankaCAPRobotConfig
scene_dataset str
seed int | None
sim_dt_ms float
tag str
task_config BaseMujocoTaskConfig
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config BaseMujocoTaskSamplerConfig
task_type str
terminate_upon_success bool
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
wandb_name str | None
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: None = None
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'val'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = False
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output')
policy_config class-attribute instance-attribute
policy_config: CAPPolicyConfig = CAPPolicyConfig()
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 500.0
profile class-attribute instance-attribute
profile: bool = False
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config class-attribute instance-attribute
robot_config: FrankaCAPRobotConfig = FrankaCAPRobotConfig()
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int | None = None
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: BaseMujocoTaskConfig = BaseMujocoTaskConfig(task_cls=None)
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 500
task_sampler_config class-attribute instance-attribute
task_sampler_config: BaseMujocoTaskSamplerConfig = BaseMujocoTaskSamplerConfig(task_sampler_class=BaseMujocoTaskSampler, house_inds=[0], samples_per_house=1, task_batch_size=1, max_tasks=10000, load_robot_from_file=True)
task_type class-attribute instance-attribute
task_type: str = 'pick'
terminate_upon_success class-attribute instance-attribute
terminate_upon_success: bool = False
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
wandb_name class-attribute instance-attribute
wandb_name: str | None = None
wandb_project class-attribute instance-attribute
wandb_project: str = 'mlspaces-benchmark-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Config informationd describing a sinlge episode

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config BaseRobotConfig | None
task_cls_str str | None
task_config AllTaskConfigs | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: AllTaskConfigs | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(__context)
Source code in molmo_spaces/evaluation/configs/evaluation_configs.py
def model_post_init(self, __context):
    super().model_post_init(__context)
    self.robot_config.action_noise_config.enabled = False
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
DreamZeroPolicyEvalConfig

Bases: JsonBenchmarkEvalConfig

Classes:

Name Description
Config
SavedEpisode

Config informationd describing a sinlge episode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init
save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config None
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config DreamZeroPolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config FrankaRobotConfig
scene_dataset str
seed int | None
sim_dt_ms float
tag str
task_config BaseMujocoTaskConfig
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config BaseMujocoTaskSamplerConfig
task_type str
terminate_upon_success bool
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
wandb_name str | None
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: None = None
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'val'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = False
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output')
policy_config class-attribute instance-attribute
policy_config: DreamZeroPolicyConfig = DreamZeroPolicyConfig()
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 66.0
profile class-attribute instance-attribute
profile: bool = False
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config class-attribute instance-attribute
robot_config: FrankaRobotConfig = FrankaRobotConfig()
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int | None = None
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: BaseMujocoTaskConfig = BaseMujocoTaskConfig(task_cls=None)
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 500
task_sampler_config class-attribute instance-attribute
task_sampler_config: BaseMujocoTaskSamplerConfig = BaseMujocoTaskSamplerConfig(task_sampler_class=BaseMujocoTaskSampler, house_inds=[0], samples_per_house=1, task_batch_size=1, max_tasks=10000, load_robot_from_file=True)
task_type class-attribute instance-attribute
task_type: str = 'pick'
terminate_upon_success class-attribute instance-attribute
terminate_upon_success: bool = False
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
wandb_name class-attribute instance-attribute
wandb_name: str | None = None
wandb_project class-attribute instance-attribute
wandb_project: str = 'mlspaces-benchmark-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Config informationd describing a sinlge episode

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config BaseRobotConfig | None
task_cls_str str | None
task_config AllTaskConfigs | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: AllTaskConfigs | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(__context)
Source code in molmo_spaces/evaluation/configs/evaluation_configs.py
def model_post_init(self, __context):
    super().model_post_init(__context)
    self.robot_config.action_noise_config.enabled = False
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
DummyBenchmarkEvalConfig

Bases: JsonBenchmarkEvalConfig

Test config that inherits from JsonBenchmarkEvalConfig.

This tests the recommended pattern from evaluation/README.md: external repos should inherit from JsonBenchmarkEvalConfig and provide their robot_config and policy_config. The benchmark JSON provides all episode-specific data (cameras, poses, task params).

Note: Prefixed with underscore to avoid pytest collection warning since this inherits from a class with init.

Classes:

Name Description
Config
SavedEpisode

Config informationd describing a sinlge episode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init
save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config None
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config DummyPolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config FrankaRobotConfig
scene_dataset str
seed int
sim_dt_ms float
tag str
task_config BaseMujocoTaskConfig
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config BaseMujocoTaskSamplerConfig
task_type str
terminate_upon_success bool
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
wandb_name str | None
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: None = None
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'val'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = False
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output')
policy_config class-attribute instance-attribute
policy_config: DummyPolicyConfig = DummyPolicyConfig()
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 200.0
profile class-attribute instance-attribute
profile: bool = False
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config class-attribute instance-attribute
robot_config: FrankaRobotConfig = FrankaRobotConfig()
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int = 42
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: BaseMujocoTaskConfig = BaseMujocoTaskConfig(task_cls=None)
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 10
task_sampler_config class-attribute instance-attribute
task_sampler_config: BaseMujocoTaskSamplerConfig = BaseMujocoTaskSamplerConfig(task_sampler_class=BaseMujocoTaskSampler, house_inds=[0], samples_per_house=1, task_batch_size=1, max_tasks=10000, load_robot_from_file=True)
task_type class-attribute instance-attribute
task_type: str = 'pick'
terminate_upon_success class-attribute instance-attribute
terminate_upon_success: bool = False
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
wandb_name class-attribute instance-attribute
wandb_name: str | None = None
wandb_project class-attribute instance-attribute
wandb_project: str = 'mlspaces-benchmark-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Config informationd describing a sinlge episode

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config BaseRobotConfig | None
task_cls_str str | None
task_config AllTaskConfigs | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: AllTaskConfigs | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(__context) -> None
Source code in molmo_spaces/evaluation/configs/evaluation_configs.py
def model_post_init(self, __context) -> None:
    super().model_post_init(__context)
    # Disable action noise for deterministic testing
    self.robot_config.action_noise_config = ActionNoiseConfig(enabled=False)
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
DummyPickPlaceEvalConfig

Bases: FrankaPickAndPlaceDataGenConfig

Evaluation config for Dummy pick and place.

Classes:

Name Description
Config
SavedEpisode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init
save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config FrankaRandomizedDroidCameraSystem
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config DummyPolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config BaseRobotConfig
scene_dataset str
seed int | None
sim_dt_ms float
tag str
task_config PickAndPlaceTaskConfig
task_config_preset PickTaskConfig | None
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config PickAndPlaceTaskSamplerConfig
task_type str
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
viewer_camera None
wandb_name str
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: FrankaRandomizedDroidCameraSystem = FrankaRandomizedDroidCameraSystem()
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'train'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = False
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output') / f'dummy_{TIMESTAMP}'
policy_config class-attribute instance-attribute
policy_config: DummyPolicyConfig = DummyPolicyConfig()
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 66.0
profile class-attribute instance-attribute
profile: bool = True
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig = FrankaRobotConfig()
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int | None = None
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: PickAndPlaceTaskConfig = PickAndPlaceTaskConfig(task_cls=PickAndPlaceTask)
task_config_preset class-attribute instance-attribute
task_config_preset: PickTaskConfig | None = None
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 600
task_sampler_config class-attribute instance-attribute
task_sampler_config: PickAndPlaceTaskSamplerConfig = PickAndPlaceTaskSamplerConfig(task_sampler_class=PickAndPlaceTaskSampler, house_inds=[5, 15, 25, 35, 45, 55, 65, 75, 85, 95, 105, 115, 125, 135, 145], samples_per_house=3)
task_type class-attribute instance-attribute
task_type: str = 'pick_and_place'
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
viewer_camera class-attribute instance-attribute
viewer_camera: None = None
wandb_name class-attribute instance-attribute
wandb_name: str = f'dummy_pick_place_eval_{TIMESTAMP}'
wandb_project class-attribute instance-attribute
wandb_project: str = 'dummy-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config FrankaRobotConfig | None
task_cls_str str | None
task_config PickAndPlaceTaskConfig | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: FrankaRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: PickAndPlaceTaskConfig | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(__context) -> None
Source code in molmo_spaces/evaluation/configs/evaluation_configs.py
def model_post_init(self, __context) -> None:
    super().model_post_init(__context)
    self.robot_config.action_noise_config.enabled = False
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
JsonBenchmarkEvalConfig

Bases: MlSpacesExpConfig

Minimal base config for JSON benchmark evaluation.

This config is designed for use ONLY with JSON benchmarks. It provides the minimal infrastructure needed to run a learned policy against a benchmark where all episode-specific data (task type, cameras, robot poses, object poses, etc.) comes from the benchmark JSON.

Subclass this and provide: - robot_config: Robot configuration for instantiation - policy_config: Your learned policy configuration

DO NOT provide task_sampler_config or task_config - those are placeholders that will be overridden by the benchmark. If you accidentally try to use this config for data generation (not evaluation), it will fail because the task sampler/config are minimal stubs.

Example

class MyPolicyBenchmarkEvalConfig(JsonBenchmarkEvalConfig): robot_config = FrankaRobotConfig() policy_config = MyPolicyConfig(checkpoint_path="/path/to/ckpt")

Classes:

Name Description
Config
SavedEpisode

Config informationd describing a sinlge episode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init

This serves as the init() called after internal validation of config parameters

save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config None
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config BasePolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config BaseRobotConfig
scene_dataset str
seed int | None
sim_dt_ms float
tag str
task_config BaseMujocoTaskConfig
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config BaseMujocoTaskSamplerConfig
task_type str
terminate_upon_success bool
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
wandb_name str | None
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: None = None
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'val'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = False
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output')
policy_config instance-attribute
policy_config: BasePolicyConfig
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 66.0
profile class-attribute instance-attribute
profile: bool = False
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config instance-attribute
robot_config: BaseRobotConfig
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int | None = None
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: BaseMujocoTaskConfig = BaseMujocoTaskConfig(task_cls=None)
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 500
task_sampler_config class-attribute instance-attribute
task_sampler_config: BaseMujocoTaskSamplerConfig = BaseMujocoTaskSamplerConfig(task_sampler_class=BaseMujocoTaskSampler, house_inds=[0], samples_per_house=1, task_batch_size=1, max_tasks=10000, load_robot_from_file=True)
task_type class-attribute instance-attribute
task_type: str = 'pick'
terminate_upon_success class-attribute instance-attribute
terminate_upon_success: bool = False
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
wandb_name class-attribute instance-attribute
wandb_name: str | None = None
wandb_project class-attribute instance-attribute
wandb_project: str = 'mlspaces-benchmark-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Config informationd describing a sinlge episode

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config BaseRobotConfig | None
task_cls_str str | None
task_config AllTaskConfigs | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: AllTaskConfigs | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(_context) -> None

This serves as the init() called after internal validation of config parameters

Source code in molmo_spaces/configs/abstract_exp_config.py
def model_post_init(self, _context) -> None:
    """This serves as the __init__() called after internal validation of config parameters"""
    assert (self.policy_dt_ms / self.ctrl_dt_ms).is_integer(), (
        "policy_dt_ms must be a multiple of ctrl_dt_ms"
    )
    assert (self.ctrl_dt_ms / self.sim_dt_ms).is_integer(), (
        "ctrl_dt_ms must be a multiple of sim_dt"
    )

    # Initialize eval_runtime_params if not set (for backward compatibility)
    # This ensures it's always available, even for configs created outside evaluation
    if self.eval_runtime_params is None:
        # Import here to avoid circular dependency
        from molmo_spaces.evaluation.eval_main import EvalRuntimeParams

        self.eval_runtime_params = EvalRuntimeParams()
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
PiPolicyEvalConfig

Bases: JsonBenchmarkEvalConfig

Classes:

Name Description
Config
SavedEpisode

Config informationd describing a sinlge episode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init
save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config None
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config PiPolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config FrankaRobotConfig
scene_dataset str
seed int | None
sim_dt_ms float
tag str
task_config BaseMujocoTaskConfig
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config BaseMujocoTaskSamplerConfig
task_type str
terminate_upon_success bool
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
wandb_name str | None
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: None = None
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'val'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = True
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output')
policy_config class-attribute instance-attribute
policy_config: PiPolicyConfig = PiPolicyConfig()
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 66.0
profile class-attribute instance-attribute
profile: bool = False
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config class-attribute instance-attribute
robot_config: FrankaRobotConfig = FrankaRobotConfig()
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int | None = None
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: BaseMujocoTaskConfig = BaseMujocoTaskConfig(task_cls=None)
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 500
task_sampler_config class-attribute instance-attribute
task_sampler_config: BaseMujocoTaskSamplerConfig = BaseMujocoTaskSamplerConfig(task_sampler_class=BaseMujocoTaskSampler, house_inds=[0], samples_per_house=1, task_batch_size=1, max_tasks=10000, load_robot_from_file=True)
task_type class-attribute instance-attribute
task_type: str = 'pick'
terminate_upon_success class-attribute instance-attribute
terminate_upon_success: bool = False
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
wandb_name class-attribute instance-attribute
wandb_name: str | None = None
wandb_project class-attribute instance-attribute
wandb_project: str = 'mlspaces-benchmark-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Config informationd describing a sinlge episode

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config BaseRobotConfig | None
task_cls_str str | None
task_config AllTaskConfigs | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: AllTaskConfigs | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(__context)
Source code in molmo_spaces/evaluation/configs/evaluation_configs.py
def model_post_init(self, __context):
    super().model_post_init(__context)
    self.robot_config.action_noise_config.enabled = False
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
TeleopPolicyEvalConfig

Bases: JsonBenchmarkEvalConfig

Classes:

Name Description
Config
SavedEpisode

Config informationd describing a sinlge episode

Methods:

Name Description
freeze_task_config

Saves the state of a sampled task i.e. an episode

from_dict

Create a configuration instance from a dictionary.

load_config

Loads a configuration from a file

load_from_json

Load the configuration from a JSON file.

model_post_init
save_config

Saves the current configuration to the output directory

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
CameraConfig type
PolicyConfig type
RobotConfig type
benchmark_path Path | None
camera_config None
collision_free_pose_limit int
config_version str
ctrl_dt_ms float
data_split str
datagen_profiler bool
end_on_success bool
environment_light_intensity float
eval_runtime_params Any
filter_for_successful_trajectories bool
fps float
log_level str
num_envs int
num_workers int
output_dir Path
policy_config TeleopPolicyConfig
policy_dt_ms float
profile bool
profiler Profiler | None
robot_config FrankaRobotConfig
scene_dataset str
seed int | None
sim_dt_ms float
tag str
task_config BaseMujocoTaskConfig
task_config_preset_exp AllTaskConfigs | None
task_config_preset_scn AllTaskConfigs | None
task_horizon int
task_sampler_config BaseMujocoTaskSamplerConfig
task_type str
terminate_upon_success bool
use_passive_viewer bool
use_wandb bool
viewer_cam_dict dict
wandb_name str | None
wandb_project str
CameraConfig class-attribute
CameraConfig: type = CameraSystemConfig
PolicyConfig class-attribute
PolicyConfig: type = BasePolicyConfig
RobotConfig class-attribute
RobotConfig: type = BaseRobotConfig
benchmark_path class-attribute instance-attribute
benchmark_path: Path | None = None
camera_config class-attribute instance-attribute
camera_config: None = None
collision_free_pose_limit class-attribute instance-attribute
collision_free_pose_limit: int = 3
config_version class-attribute instance-attribute
config_version: str = '0.1'
ctrl_dt_ms class-attribute instance-attribute
ctrl_dt_ms: float = 2.0
data_split class-attribute instance-attribute
data_split: str = 'val'
datagen_profiler class-attribute instance-attribute
datagen_profiler: bool = True
end_on_success class-attribute instance-attribute
end_on_success: bool = False
environment_light_intensity class-attribute instance-attribute
environment_light_intensity: float = 15000.0
eval_runtime_params class-attribute instance-attribute
eval_runtime_params: Any = None
filter_for_successful_trajectories class-attribute instance-attribute
filter_for_successful_trajectories: bool = False
fps property
fps: float
log_level class-attribute instance-attribute
log_level: str = 'info'
num_envs class-attribute instance-attribute
num_envs: int = 1
num_workers class-attribute instance-attribute
num_workers: int = 1
output_dir class-attribute instance-attribute
output_dir: Path = Path('eval_output')
policy_config class-attribute instance-attribute
policy_config: TeleopPolicyConfig = TeleopPolicyConfig()
policy_dt_ms class-attribute instance-attribute
policy_dt_ms: float = 40
profile class-attribute instance-attribute
profile: bool = False
profiler class-attribute instance-attribute
profiler: Profiler | None = None
robot_config class-attribute instance-attribute
robot_config: FrankaRobotConfig = FrankaRobotConfig()
scene_dataset class-attribute instance-attribute
scene_dataset: str = 'procthor-10k'
seed class-attribute instance-attribute
seed: int | None = None
sim_dt_ms class-attribute instance-attribute
sim_dt_ms: float = 2.0
tag property
tag: str
task_config class-attribute instance-attribute
task_config: BaseMujocoTaskConfig = BaseMujocoTaskConfig(task_cls=None)
task_config_preset_exp class-attribute instance-attribute
task_config_preset_exp: AllTaskConfigs | None = None
task_config_preset_scn class-attribute instance-attribute
task_config_preset_scn: AllTaskConfigs | None = None
task_horizon class-attribute instance-attribute
task_horizon: int = 500
task_sampler_config class-attribute instance-attribute
task_sampler_config: BaseMujocoTaskSamplerConfig = BaseMujocoTaskSamplerConfig(task_sampler_class=BaseMujocoTaskSampler, house_inds=[0], samples_per_house=1, task_batch_size=1, max_tasks=10000, load_robot_from_file=True)
task_type class-attribute instance-attribute
task_type: str = 'pick'
terminate_upon_success class-attribute instance-attribute
terminate_upon_success: bool = False
use_passive_viewer class-attribute instance-attribute
use_passive_viewer: bool = False
use_wandb class-attribute instance-attribute
use_wandb: bool = False
viewer_cam_dict class-attribute instance-attribute
viewer_cam_dict: dict = {'distance': 5.0, 'azimuth': 45.0, 'elevation': -30.0, 'lookat': [0.0, 0.0, 0.5]}
wandb_name class-attribute instance-attribute
wandb_name: str | None = None
wandb_project class-attribute instance-attribute
wandb_project: str = 'mlspaces-benchmark-eval'
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
SavedEpisode

Bases: Config

Config informationd describing a sinlge episode

Classes:

Name Description
Config

Methods:

Name Description
from_dict

Create a configuration instance from a dictionary.

load_from_json

Load the configuration from a JSON file.

save_to_json

Save the configuration to a JSON file.

to_dict

Convert the configuration to a dictionary.

to_json

Attributes:

Name Type Description
camera_config AllCameraSystems | None
robot_config BaseRobotConfig | None
task_cls_str str | None
task_config AllTaskConfigs | None
camera_config class-attribute instance-attribute
camera_config: AllCameraSystems | None = None
robot_config class-attribute instance-attribute
robot_config: BaseRobotConfig | None = None
task_cls_str class-attribute instance-attribute
task_cls_str: str | None = None
task_config class-attribute instance-attribute
task_config: AllTaskConfigs | None = None
Config

Attributes:

Name Type Description
arbitrary_types_allowed
arbitrary_types_allowed class-attribute instance-attribute
arbitrary_types_allowed = True
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")
freeze_task_config
freeze_task_config(observation, task: BaseMujocoTask = None) -> None

Saves the state of a sampled task i.e. an episode

Source code in molmo_spaces/configs/abstract_exp_config.py
def freeze_task_config(self, observation, task: BaseMujocoTask = None) -> None:
    """Saves the state of a sampled task i.e. an episode"""
    sc = self.SavedEpisode()

    # RMH: deep argument VERY IMPORTANT. Mutates config for future episodes otherwise
    sc.robot_config = self.robot_config.model_copy(deep=True)
    # remove un-serializable
    sc.robot_config.robot_cls = None
    sc.robot_config.robot_factory = None
    sc.robot_config.robot_view_factory = None
    # save state
    sc.robot_config.init_qpos_noise_range = None  # remove ranges
    sc.robot_config.init_qpos = observation[0]["qpos"]
    sc.camera_config = self.camera_config.model_copy(deep=True)
    for i, camera in enumerate(sc.camera_config.cameras):
        # Some cameras can contain random sampling, e.g. of positions
        # Read the camera's positions and convert them to fixed cameras
        if isinstance(camera, MjcfCameraConfig | RobotMountedCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = RobotMountedCameraConfig(
                name=cam.name,
                reference_body_names=list(cam.reference_body_names),
                camera_offset=list(cam.camera_offset),
                lookat_offset=list(cam.lookat_offset),
                camera_quaternion=list(cam.camera_quaternion),
                fov=cam.fov,
            )
            sc.camera_config.cameras[i] = new_camera

        elif isinstance(camera, RandomizedExocentricCameraConfig | FixedExocentricCameraConfig):
            cam = task.env.camera_manager.registry[camera.name]
            new_camera = FixedExocentricCameraConfig(
                name=cam.name,
                fov=cam.fov,
                pos=list(cam.pos),
                up=list(cam.up),
                forward=list(cam.forward),
            )
            sc.camera_config.cameras[i] = new_camera
        else:
            raise NotImplementedError(f"Cannot freeze camera of type {type(camera).__name__}")

    # for all task relevant objects, save the poses
    # assert task.config.task_config.object_poses is None
    obj_poses = {}
    om = task.env.object_managers[task.env.current_batch_index]
    task_objects = om.get_mobile_objects()
    for task_object in task_objects:
        obj_poses[task_object.name] = pose_mat_to_7d(task_object.pose).tolist()
    task.config.task_config.object_poses = obj_poses

    sc.task_config = self.task_config.model_copy(deep=True)
    # remove un-serializable
    sc.task_config.task_cls = None
    # save the name of the task class
    sc.task_cls_str = (
        self.task_config.task_cls.__module__ + "." + self.task_config.task_cls.__name__
    )

    assert sc.task_config.robot_base_pose is not None

    sc_bytes = pickle.dumps(sc)
    sc_b64 = base64.b64encode(sc_bytes).decode("utf-8")
    return sc_b64
from_dict classmethod
from_dict(data: dict) -> Config

Create a configuration instance from a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Create a configuration instance from a dictionary."""
    return cls.model_validate(data)
load_config staticmethod
load_config(output_dir: Path) -> MlSpacesExpConfig

Loads a configuration from a file

Source code in molmo_spaces/configs/abstract_exp_config.py
@staticmethod
def load_config(output_dir: Path) -> MlSpacesExpConfig:
    """Loads a configuration from a file"""
    config_path = output_dir / "experiment_config.pkl"
    with open(config_path, "rb") as f:
        config = pickle.load(f)
    log.info(f"Loaded experiment configuration from {output_dir}")
    return config
load_from_json classmethod
load_from_json(file_path: str) -> Config

Load the configuration from a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
@classmethod
def load_from_json(cls, file_path: str) -> "Config":
    """Load the configuration from a JSON file."""
    with open(file_path, "r") as f:
        data = f.read()
    return cls.model_validate_json(data)
model_post_init
model_post_init(__context)
Source code in molmo_spaces/evaluation/configs/evaluation_configs.py
def model_post_init(self, __context):
    super().model_post_init(__context)
    self.robot_config.action_noise_config.enabled = False
save_config
save_config(output_dir=None) -> None

Saves the current configuration to the output directory

Source code in molmo_spaces/configs/abstract_exp_config.py
def save_config(self, output_dir=None) -> None:
    """Saves the current configuration to the output directory"""
    if output_dir is None:
        output_dir = self.output_dir
    output_dir = Path(output_dir)
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir.mkdir(parents=True, exist_ok=True)
    config_path = output_dir / f"experiment_config_{timestamp}.pkl"
    with open(config_path, "wb") as f:
        pickle.dump(self, f)
    log.info(f"Saved experiment configuration to {output_dir}")
save_to_json
save_to_json(file_path: str) -> None

Save the configuration to a JSON file.

Source code in molmo_spaces/configs/abstract_config.py
def save_to_json(self, file_path: str) -> None:
    """Save the configuration to a JSON file."""
    with open(file_path, "w") as f:
        f.write(self.to_json())
to_dict
to_dict() -> dict

Convert the configuration to a dictionary.

Source code in molmo_spaces/configs/abstract_config.py
def to_dict(self) -> dict:
    """Convert the configuration to a dictionary."""
    return self.model_dump()
to_json
to_json() -> str
Source code in molmo_spaces/configs/abstract_config.py
def to_json(self) -> str:
    # TODO(max): this can cause errors, try printing out the config to see missmatches w/ print(self)
    return self.model_dump_json(warnings="error")

eval_main

Evaluation entrypoint for learned policies on JSON-based benchmarks.

This module evaluates policies on JSON benchmark files where each episode is fully self-contained. Unlike the pickle-based frozen config approach, JSON benchmarks are human-readable, version-independent, and support mixed task types.

Key differences from run_benchmark_with_learned_policy.py: - Uses JsonEvalRunner instead of PatchyRunner - No patch_config needed - JSON episode specs are authoritative - Timing parameters (policy_dt_ms, ctrl_dt_ms, sim_dt_ms) come from the eval config, NOT from individual episodes. This allows the same benchmark to be run at different control rates. - Supports mixed task types in the same benchmark

Programmatic usage (from external repo): from molmo_spaces.evaluation import run_evaluation

results = run_evaluation(
    eval_config_cls=MyEvalConfig,
    benchmark_dir="/path/to/benchmark",
    checkpoint_path="/path/to/checkpoint",
)
print(f"Success rate: {results.success_count}/{results.total_count}")

Environment setup (MacOS): export PYTHONPATH="${PYTHONPATH}:." export MUJOCO_GL=egl export PYOPENGL_PLATFORM=egl

Classes:

Name Description
EvalRuntimeParams

Runtime parameters for evaluation that are not part of the base config schema.

EvaluationResults

Results from running an evaluation on a benchmark.

Functions:

Name Description
build_success_status_map

Build a map of episode keys to success status for video naming.

create_eval_config

Create an MlSpacesExpConfig experiment config from a JSON benchmark for evaluation.

determine_task_horizon

Determine task horizon from command line override or benchmark.

get_args
main

Command-line entry point for evaluation.

run_evaluation

Run evaluation on a JSON benchmark programmatically.

Attributes:

Name Type Description
log

log module-attribute

log = getLogger(__name__)

EvalRuntimeParams dataclass

EvalRuntimeParams(episode_idx: int | None = None, max_episodes: int | None = None, add_custom_object: bool = False, custom_object_path: str | Path | None = None, custom_object_name: str | None = None)

Runtime parameters for evaluation that are not part of the base config schema.

These parameters are set during evaluation initialization and used by the evaluation runner to customize episode processing.

Attributes:

Name Type Description
add_custom_object bool
custom_object_name str | None
custom_object_path str | Path | None
episode_idx int | None
max_episodes int | None
add_custom_object class-attribute instance-attribute
add_custom_object: bool = False
custom_object_name class-attribute instance-attribute
custom_object_name: str | None = None
custom_object_path class-attribute instance-attribute
custom_object_path: str | Path | None = None
episode_idx class-attribute instance-attribute
episode_idx: int | None = None
max_episodes class-attribute instance-attribute
max_episodes: int | None = None

EvaluationResults dataclass

EvaluationResults(success_count: int, total_count: int, output_dir: Path, episode_results: list[EpisodeResult] = list(), exp_config: MlSpacesExpConfig | None = None)

Results from running an evaluation on a benchmark.

Attributes:

Name Type Description
success_count int

Number of successful episodes

total_count int

Total number of episodes evaluated

output_dir Path

Path where evaluation outputs were saved

episode_results list[EpisodeResult]

Per-episode results with details

exp_config MlSpacesExpConfig | None

The experiment config used for evaluation

episode_results class-attribute instance-attribute
episode_results: list[EpisodeResult] = field(default_factory=list)
exp_config class-attribute instance-attribute
exp_config: MlSpacesExpConfig | None = None
output_dir instance-attribute
output_dir: Path
success_count instance-attribute
success_count: int
success_rate property
success_rate: float

Compute success rate as a fraction.

total_count instance-attribute
total_count: int

build_success_status_map

build_success_status_map(results: list[EpisodeResult]) -> dict[str, bool]

Build a map of episode keys to success status for video naming.

Parameters:

Name Type Description Default
results list[EpisodeResult]

List of episode results

required

Returns:

Type Description
dict[str, bool]

Dict mapping episode keys (e.g., "house_5/episode_00000000") to success status

Source code in molmo_spaces/evaluation/eval_main.py
def build_success_status_map(results: list[EpisodeResult]) -> dict[str, bool]:
    """Build a map of episode keys to success status for video naming.

    Args:
        results: List of episode results

    Returns:
        Dict mapping episode keys (e.g., "house_5/episode_00000000") to success status
    """
    return {f"{r.house_id}/episode_{r.episode_idx:08d}": r.success for r in results}

create_eval_config

create_eval_config(eval_config_cls: type[MlSpacesExpConfig], benchmark_dir: Path, output_dir: Path, checkpoint_path: str | None, task_horizon: int, num_workers: int, camera_config_override: Any | None = None) -> MlSpacesExpConfig

Create an MlSpacesExpConfig experiment config from a JSON benchmark for evaluation.

The eval config class provides: - policy_config: Policy configuration (checkpoint, camera names, etc.) - robot_config: Robot configuration - Timing parameters: policy_dt_ms, ctrl_dt_ms, sim_dt_ms

The benchmark provides: - Scene/task configuration (per-episode)

Parameters:

Name Type Description Default
eval_config_cls type[MlSpacesExpConfig]

The eval config class to instantiate

required
benchmark_dir Path

Path to JSON benchmark directory

required
output_dir Path

Output directory for results

required
checkpoint_path str | None

Optional override for checkpoint path

required
task_horizon int

Task horizon (already resolved from defaults or override)

required
num_workers int

Number of worker processes

required

Returns:

Type Description
MlSpacesExpConfig

Configured MlSpacesExpConfig

Source code in molmo_spaces/evaluation/eval_main.py
def create_eval_config(
    eval_config_cls: type[MlSpacesExpConfig],
    benchmark_dir: Path,
    output_dir: Path,
    checkpoint_path: str | None,
    task_horizon: int,
    num_workers: int,
    camera_config_override: Any | None = None,
) -> MlSpacesExpConfig:
    """Create an MlSpacesExpConfig experiment config from a JSON benchmark for evaluation.

    The eval config class provides:
    - policy_config: Policy configuration (checkpoint, camera names, etc.)
    - robot_config: Robot configuration
    - Timing parameters: policy_dt_ms, ctrl_dt_ms, sim_dt_ms

    The benchmark provides:
    - Scene/task configuration (per-episode)

    Args:
        eval_config_cls: The eval config class to instantiate
        benchmark_dir: Path to JSON benchmark directory
        output_dir: Output directory for results
        checkpoint_path: Optional override for checkpoint path
        task_horizon: Task horizon (already resolved from defaults or override)
        num_workers: Number of worker processes

    Returns:
        Configured MlSpacesExpConfig
    """
    # Instantiate the eval config
    exp_config = eval_config_cls()

    # Override checkpoint if provided
    if checkpoint_path is not None:
        exp_config.policy_config.checkpoint_path = checkpoint_path

    # Set output directory
    exp_config.output_dir = output_dir

    # Set number of workers
    exp_config.num_workers = num_workers

    # Disable action noise for evaluation
    exp_config.robot_config.action_noise_config = ActionNoiseConfig(enabled=False)

    # Disable profiling for cleaner output
    exp_config.datagen_profiler = False
    exp_config.profile = False

    # Don't filter - we want to save all trajectories for analysis
    exp_config.filter_for_successful_trajectories = False

    # Set eval mode seed
    exp_config.seed = 42

    # Set task_horizon (already determined from defaults or override)
    exp_config.task_horizon = task_horizon

    # Apply eval camera config override if provided
    if camera_config_override is not None:
        exp_config.camera_config = camera_config_override
    # Initialize eval_runtime_params with defaults so it always exists
    # This is now a proper field in MlSpacesExpConfig, so normal assignment works
    if exp_config.eval_runtime_params is None:
        exp_config.eval_runtime_params = EvalRuntimeParams()

    return exp_config

determine_task_horizon

determine_task_horizon(episodes: list[EpisodeSpec], task_horizon_override: int | None, policy_dt_ms: float | None = None) -> int

Determine task horizon from command line override or benchmark.

Priority: 1. Explicit override (from CLI --task_horizon_steps or --task_horizon_sec) 2. Benchmark's per-episode task_horizon_sec (converted to steps via policy_dt_ms)

Fails loudly if the benchmark does not contain task_horizon_sec and no explicit override was provided.

Parameters:

Name Type Description Default
episodes list[EpisodeSpec]

List of episode specs from the benchmark

required
task_horizon_override int | None

Optional override from command line

required
policy_dt_ms float | None

Policy timestep in milliseconds, required when reading task_horizon_sec from the benchmark.

None

Returns:

Type Description
int

Task horizon (in steps) to use for all episodes

Source code in molmo_spaces/evaluation/eval_main.py
def determine_task_horizon(
    episodes: list[EpisodeSpec],
    task_horizon_override: int | None,
    policy_dt_ms: float | None = None,
) -> int:
    """Determine task horizon from command line override or benchmark.

    Priority:
    1. Explicit override (from CLI --task_horizon_steps or --task_horizon_sec)
    2. Benchmark's per-episode task_horizon_sec (converted to steps via policy_dt_ms)

    Fails loudly if the benchmark does not contain task_horizon_sec and no
    explicit override was provided.

    Args:
        episodes: List of episode specs from the benchmark
        task_horizon_override: Optional override from command line
        policy_dt_ms: Policy timestep in milliseconds, required when reading
            task_horizon_sec from the benchmark.

    Returns:
        Task horizon (in steps) to use for all episodes
    """
    if task_horizon_override is not None:
        log.info(f"Using explicit task_horizon override: {task_horizon_override} steps")
        return task_horizon_override

    # Read task_horizon_sec from the benchmark episodes
    horizon_sec_values = {ep.task.get("task_horizon_sec") for ep in episodes}

    if None in horizon_sec_values:
        missing_count = sum(1 for ep in episodes if ep.task.get("task_horizon_sec") is None)
        raise ValueError(
            f"No explicit task_horizon override provided and {missing_count}/{len(episodes)} "
            f"benchmark episodes are missing 'task_horizon_sec' in their task dict. "
            f"Either add task_horizon_sec to the benchmark JSON or pass "
            f"--task_horizon_steps / --task_horizon_sec explicitly."
        )

    if len(horizon_sec_values) > 1:
        raise ValueError(
            f"Benchmark has inconsistent task_horizon_sec values: {horizon_sec_values}. "
            f"All episodes must have the same task_horizon_sec, or pass an explicit override."
        )

    task_horizon_sec = horizon_sec_values.pop()

    if policy_dt_ms is None:
        raise ValueError(
            "policy_dt_ms is required to convert benchmark task_horizon_sec to steps. "
            "This is a bug in the caller."
        )

    task_horizon_steps = round(task_horizon_sec * 1000.0 / policy_dt_ms)

    log.info("=" * 70)
    log.info("TASK HORIZON RESOLVED FROM BENCHMARK")
    log.info(f"  task_horizon_sec (from benchmark JSON): {task_horizon_sec}")
    log.info(f"  policy_dt_ms (from eval config): {policy_dt_ms}")
    log.info(
        f"  task_horizon_steps = {task_horizon_sec} * 1000 / {policy_dt_ms} = {task_horizon_steps}"
    )
    log.info("=" * 70)

    return task_horizon_steps

get_args

get_args()
Source code in molmo_spaces/evaluation/eval_main.py
def get_args():
    parser = argparse.ArgumentParser(
        description="Evaluation pipeline for learned policies on JSON benchmarks",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    parser.add_argument(
        "exp_config_cls",
        type=str,
        help="Name of the eval config class. Can include module path with colon.",
    )
    parser.add_argument(
        "--benchmark_dir",
        type=str,
        required=True,
        help="Path to JSON benchmark directory containing benchmark.json or house_*/episode_*.json files.",
    )
    parser.add_argument(
        "--checkpoint_path",
        type=str,
        default=None,
        help="Path to a model checkpoint. Overrides the checkpoint in the policy_config.",
    )
    horizon_group = parser.add_mutually_exclusive_group()
    horizon_group.add_argument(
        "--task_horizon_steps",
        type=int,
        default=None,
        help="Override task horizon (max steps per episode). If None, uses value from episode specs. Cannot be used with --task_horizon_sec.",
    )
    horizon_group.add_argument(
        "--task_horizon_sec",
        type=float,
        default=None,
        help="Override task horizon (max seconds per episode). If None, uses value from episode specs. Cannot be used with --task_horizon_steps.",
    )
    parser.add_argument(
        "--wandb_project",
        type=str,
        default="mlspaces-json-eval",
        help="Wandb project name.",
    )
    parser.add_argument(
        "--output_dir",
        type=str,
        default=None,
        help="Output directory for evaluation results. Defaults to eval_output/<config>/<timestamp>.",
    )
    parser.add_argument(
        "--num_workers",
        type=int,
        default=1,
        help="Number of parallel worker processes.",
    )
    parser.add_argument(
        "--no_wandb",
        action="store_true",
        help="Disable wandb logging.",
    )
    parser.add_argument(
        "--use-filament",
        action="store_true",
        help="Whether or not to use the filament renderer instead of the legacy opengl one (requires installing custom wheel for now)",
    )
    parser.add_argument(
        "--environment-light-intensity",
        type=float,
        default=None,
        help="The default value for the intensity of the default environmental light (when using filament)",
    )
    parser.add_argument(
        "--max_episodes",
        type=int,
        default=None,
        help="Limit number of episodes to evaluate from benchmark. If None, evaluates all episodes, else, evaluates only the episodes for the houses used in the first `max_episodes`. Note that the final number of episodes can differ from `max_episodes` if more than one episode is sampled for any of the houses among the first `max_episodes` episodes.",
    )
    parser.add_argument(
        "--camera_names",
        type=str,
        nargs="+",
        default=None,
        help="Override policy_config.camera_names (e.g. --camera_names randomized_zed2_analogue_1 wrist_camera).",
    )

    # Eval camera randomization flags (shared across all JSON eval entry points)
    from molmo_spaces.utils.eval_camera_randomization_utils import add_eval_camera_args

    add_eval_camera_args(parser)

    parser.add_argument(
        "--idx",
        type=int,
        default=None,
        help="The index of the episode to evaluate. If None, evaluates all episodes.",
    )
    parser.add_argument(
        "--add_custom_object",
        action="store_true",
        help="Add a custom object to the episode.",
    )
    parser.add_argument(
        "--custom_object_path",
        type=str,
        default=None,
        help="The path to the custom object to add to the episode.",
    )
    parser.add_argument(
        "--custom_object_name",
        type=str,
        default=None,
        help="The natural language name for the custom object (e.g., 'lemon', 'cup'). "
        "If not provided, will attempt to extract from the object path but could be incorrect.",
    )
    return parser.parse_args()

main

main() -> None

Command-line entry point for evaluation.

Source code in molmo_spaces/evaluation/eval_main.py
def main() -> None:
    """Command-line entry point for evaluation."""
    args = get_args()

    # Build eval camera config from CLI flags (None if --use_eval_cameras not passed)
    from molmo_spaces.utils.eval_camera_randomization_utils import (
        build_eval_camera_config_from_args,
    )

    eval_camera_config = build_eval_camera_config_from_args(args)

    # Load benchmark to log summary info
    benchmark_dir = Path(args.benchmark_dir).resolve()
    episodes = load_all_episodes(benchmark_dir)
    if episodes:
        log.info(f"Loaded benchmark: {benchmark_dir}")
        log.info(f"  Houses: {len(set(ep.house_index for ep in episodes))}")
        log.info(f"  Total episodes: {len(episodes)}")
        log.info(f"  First episode task_cls: {episodes[0].get_task_cls()}")

    # Run evaluation using the programmatic API
    results = run_evaluation(
        eval_config_cls=args.exp_config_cls,
        benchmark_dir=benchmark_dir,
        checkpoint_path=args.checkpoint_path,
        task_horizon_steps=args.task_horizon_steps,
        task_horizon_sec=args.task_horizon_sec,
        output_dir=args.output_dir,
        num_workers=args.num_workers,
        use_wandb=not args.no_wandb,
        wandb_project=args.wandb_project,
        max_episodes=args.max_episodes,
        environment_light_intensity=args.environment_light_intensity,
        camera_config_override=eval_camera_config,
        camera_names_override=args.camera_names,
        episode_idx=args.idx,
        add_custom_object=args.add_custom_object,
        custom_object_path=args.custom_object_path,
        custom_object_name=args.custom_object_name,
    )

    log.info(f"Evaluation complete: {results.success_count}/{results.total_count} successful")
    log.info(f"Success rate: {results.success_rate:.1%}")
    log.info(f"Output directory: {results.output_dir}")

run_evaluation

run_evaluation(eval_config_cls: type[MlSpacesExpConfig] | str, benchmark_dir: Path, checkpoint_path: str | None = None, task_horizon_steps: int | None = None, task_horizon_sec: float | None = None, output_dir: str | Path | None = None, num_workers: int = 1, use_wandb: bool = False, wandb_project: str = 'mlspaces-online-eval', preloaded_policy: BasePolicy | None = None, max_episodes: int | None = None, camera_config_override: Any | None = None, camera_names_override: list[str] | None = None, environment_light_intensity: float | None = None, episode_idx: int | None = None, add_custom_object: bool = False, custom_object_path: str | Path | None = None, custom_object_name: str | None = None) -> EvaluationResults

Run evaluation on a JSON benchmark programmatically.

This is the primary entry point for running evaluations from external code. It can be imported and called directly without using command-line arguments.

Parameters:

Name Type Description Default
eval_config_cls type[MlSpacesExpConfig] | str

Either an MlSpacesExpConfig subclass, or a string in the format "module.path:ClassName" (e.g., "myrepo.configs:MyEvalConfig").

required
benchmark_dir Path

Path to JSON benchmark directory containing benchmark.json.

required
checkpoint_path str | None

Path to model checkpoint. Overrides the checkpoint in policy_config.

None
task_horizon_steps int | None

Max steps per episode. If None, uses default for the task class.

None
task_horizon_sec float | None

Max seconds per episode, used to calculate horizon in steps. Cannot be used with task_horizon_steps.

None
output_dir str | Path | None

Output directory for results. Defaults to eval_output//.

None
num_workers int

Number of parallel worker processes.

1
use_wandb bool

Whether to log results to Weights & Biases.

False
wandb_project str

W&B project name (only used if use_wandb=True).

'mlspaces-online-eval'
preloaded_policy BasePolicy | None

Optional pre-initialized policy instance. If provided, skips policy creation from config.

None
max_episodes int | None

Maximum number of episodes to evaluate from benchmark. If None, evaluates all episodes.

None
camera_config_override Any | None

Optional camera system config (e.g. FrankaEvalCameraSystem) to replace the default camera_config on the experiment config.

None
camera_names_override list[str] | None

Optional list of camera names to override policy_config.camera_names (e.g. ["randomized_zed2_analogue_1", "wrist_camera"]).

None
episode_idx int | None

Index of a specific episode to evaluate. If None, evaluates all episodes.

None
add_custom_object bool

Whether to replace the target object with a custom object.

False
custom_object_path str | Path | None

Path to the custom object XML file. Required if add_custom_object is True.

None
custom_object_name str | None

Natural language name for the custom object (e.g., 'lemon', 'cup'). If not provided, will attempt to extract from the object path.

None

Returns:

Type Description
EvaluationResults

EvaluationResults containing success counts, output paths, and per-episode details.

Raises:

Type Description
FileNotFoundError

If benchmark_dir doesn't exist.

ValueError

If no episodes found in benchmark or config class not found.

Example

from molmo_spaces.evaluation import run_evaluation from my_repo.configs import MyEvalConfig

results = run_evaluation( eval_config_cls=MyEvalConfig, benchmark_dir="/path/to/benchmark", checkpoint_path="/path/to/checkpoint.pt", task_horizon_steps=500, ) print(f"Success rate: {results.success_rate:.1%}")

Source code in molmo_spaces/evaluation/eval_main.py
def run_evaluation(
    eval_config_cls: type[MlSpacesExpConfig] | str,
    benchmark_dir: Path,
    checkpoint_path: str | None = None,
    task_horizon_steps: int | None = None,
    task_horizon_sec: float | None = None,
    output_dir: str | Path | None = None,
    num_workers: int = 1,
    use_wandb: bool = False,
    wandb_project: str = "mlspaces-online-eval",
    preloaded_policy: BasePolicy | None = None,
    max_episodes: int | None = None,
    camera_config_override: Any | None = None,
    camera_names_override: list[str] | None = None,
    environment_light_intensity: float | None = None,
    episode_idx: int | None = None,
    add_custom_object: bool = False,
    custom_object_path: str | Path | None = None,
    custom_object_name: str | None = None,
) -> EvaluationResults:
    """Run evaluation on a JSON benchmark programmatically.

    This is the primary entry point for running evaluations from external code.
    It can be imported and called directly without using command-line arguments.

    Args:
        eval_config_cls: Either an MlSpacesExpConfig subclass, or a string in the format
            "module.path:ClassName" (e.g., "myrepo.configs:MyEvalConfig").
        benchmark_dir: Path to JSON benchmark directory containing benchmark.json.
        checkpoint_path: Path to model checkpoint. Overrides the checkpoint in policy_config.
        task_horizon_steps: Max steps per episode. If None, uses default for the task class.
        task_horizon_sec: Max seconds per episode, used to calculate horizon in steps. Cannot be used with task_horizon_steps.
        output_dir: Output directory for results. Defaults to eval_output/<config>/<timestamp>.
        num_workers: Number of parallel worker processes.
        use_wandb: Whether to log results to Weights & Biases.
        wandb_project: W&B project name (only used if use_wandb=True).
        preloaded_policy: Optional pre-initialized policy instance. If provided, skips
            policy creation from config.
        max_episodes: Maximum number of episodes to evaluate from benchmark. If None, evaluates all episodes.
        camera_config_override: Optional camera system config (e.g. FrankaEvalCameraSystem) to
            replace the default camera_config on the experiment config.
        camera_names_override: Optional list of camera names to override
            policy_config.camera_names (e.g. ["randomized_zed2_analogue_1", "wrist_camera"]).
        episode_idx: Index of a specific episode to evaluate. If None, evaluates all episodes.
        add_custom_object: Whether to replace the target object with a custom object.
        custom_object_path: Path to the custom object XML file. Required if add_custom_object is True.
        custom_object_name: Natural language name for the custom object (e.g., 'lemon', 'cup').
            If not provided, will attempt to extract from the object path.

    Returns:
        EvaluationResults containing success counts, output paths, and per-episode details.

    Raises:
        FileNotFoundError: If benchmark_dir doesn't exist.
        ValueError: If no episodes found in benchmark or config class not found.

    Example:
        from molmo_spaces.evaluation import run_evaluation
        from my_repo.configs import MyEvalConfig

        results = run_evaluation(
            eval_config_cls=MyEvalConfig,
            benchmark_dir="/path/to/benchmark",
            checkpoint_path="/path/to/checkpoint.pt",
            task_horizon_steps=500,
        )
        print(f"Success rate: {results.success_rate:.1%}")
    """
    # Resolve config class if provided as string
    # Preserve the original string for config_name in case the registered name
    # differs from the class __name__ (e.g., a custom registry name)
    config_name_from_str: str | None = None
    if isinstance(eval_config_cls, str):
        config_name_from_str = eval_config_cls
        if ":" in eval_config_cls:
            # Full module path provided - import and get class directly
            module_path, class_name = eval_config_cls.split(":")
            module = importlib.import_module(module_path)
            eval_config_cls = getattr(module, class_name)
        else:
            # Just a class name - look up in registry
            class_name = eval_config_cls
            eval_config_cls = get_config_class(class_name)

    # Validate benchmark directory
    benchmark_dir = benchmark_dir.resolve()
    if not benchmark_dir.exists():
        raise FileNotFoundError(f"Benchmark directory not found: {benchmark_dir}")

    # Load benchmark episodes (for summary info and validation)
    episodes = load_all_episodes(benchmark_dir)

    # Validate episode index if specified
    if episode_idx is not None:
        if episode_idx < 0 or episode_idx >= len(episodes):
            raise ValueError(
                f"Episode index {episode_idx} is out of range. "
                f"Benchmark has {len(episodes)} episodes (indices 0-{len(episodes) - 1})"
            )
        log.info(f"Will evaluate single episode at index {episode_idx}")

    # Validate custom object path if requested
    if add_custom_object:
        if custom_object_path is None:
            raise ValueError(
                "--custom_object_path must be provided when --add_custom_object is set"
            )
        custom_object_path = Path(custom_object_path)
        if not custom_object_path.exists():
            raise FileNotFoundError(f"Custom object path does not exist: {custom_object_path}")
        log.info(f"Will replace target objects with custom object: {custom_object_path}")
        if custom_object_name is None:
            custom_object_name = custom_object_path.stem
            log.warning(f"No custom object name provided, using path stem: {custom_object_name}")
        else:
            log.info(f"Using provided custom object name: {custom_object_name}")

    if max_episodes is not None and len(episodes) > max_episodes:
        log.info(f"Evaluating the first {max_episodes} episodes of {len(episodes)} total episodes")
        episodes = episodes[:max_episodes]
    if not episodes:
        raise ValueError(
            f"No episodes found in benchmark at {benchmark_dir}. "
            f"Expected benchmark.json file with list of episode specs."
        )

    total_episodes = len(episodes)
    num_houses = len(set(ep.house_index for ep in episodes))

    # Create timestamp and output directory
    # Use the original string if eval_config_cls was passed as a string, otherwise use __name__.
    # This handles cases where the registry name differs from the class name.
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    if config_name_from_str:
        # If "module:ClassName" format, use just the class name for the output dir
        config_name = (
            config_name_from_str.split(":")[-1]
            if ":" in config_name_from_str
            else config_name_from_str
        )
    else:
        config_name = eval_config_cls.__name__

    if output_dir is not None:
        resolved_output_dir = Path(output_dir) / config_name / timestamp
    else:
        resolved_output_dir = Path("eval_output") / config_name / timestamp
    os.makedirs(resolved_output_dir, exist_ok=True)

    # Determine task horizon
    assert not (task_horizon_steps is not None and task_horizon_sec is not None), (
        "Cannot use both task_horizon_steps and task_horizon_sec"
    )
    task_horizon: int | None = None
    if task_horizon_steps is not None:
        task_horizon = task_horizon_steps
    elif task_horizon_sec is not None:
        policy_dt_ms = eval_config_cls.model_fields["policy_dt_ms"].get_default()
        assert isinstance(policy_dt_ms, float | int), (
            f"policy_dt_ms must be a float or int, got {type(policy_dt_ms)}"
        )
        task_horizon = round(task_horizon_sec * 1000.0 / policy_dt_ms)
    config_policy_dt_ms = eval_config_cls.model_fields["policy_dt_ms"].get_default()
    resolved_task_horizon = determine_task_horizon(
        episodes, task_horizon, policy_dt_ms=config_policy_dt_ms
    )

    # Create experiment config
    exp_config = create_eval_config(
        eval_config_cls=eval_config_cls,
        benchmark_dir=benchmark_dir,
        output_dir=resolved_output_dir,
        checkpoint_path=checkpoint_path,
        task_horizon=resolved_task_horizon,
        num_workers=num_workers,
        camera_config_override=camera_config_override,
    )

    # Custom filament settings to overwrite by the user
    exp_config.environment_light_intensity = (
        environment_light_intensity or exp_config.environment_light_intensity
    )

    # Override policy camera names if requested
    if camera_names_override is not None:
        log.info(f"Overriding policy_config.camera_names: {camera_names_override}")
        exp_config.policy_config.camera_names = camera_names_override

    # Patch config with evaluation-specific runtime parameters
    exp_config = JsonEvalRunner.patch_config(
        exp_config=exp_config,
        episode_idx=episode_idx,
        max_episodes=max_episodes,
        add_custom_object=add_custom_object,
        custom_object_path=custom_object_path,
        custom_object_name=custom_object_name,
    )
    JsonEvalRunner.adjust_robot(exp_config)

    # Resolve checkpoint path for logging
    resolved_checkpoint = checkpoint_path or getattr(
        exp_config.policy_config, "checkpoint_path", None
    )

    # Initialize wandb if requested
    if use_wandb:
        import wandb

        if resolved_checkpoint:
            path_parts = Path(resolved_checkpoint).parts
            ckpt_name_parts = [p for p in path_parts[-2:] if p and p != "/"]
            ckpt_name = "_".join(ckpt_name_parts)
        else:
            ckpt_name = "no_ckpt"

        wandb_run_name = f"{ckpt_name}_{timestamp}"
        wandb.init(project=wandb_project, name=wandb_run_name)
        wandb.config.update(
            {
                "checkpoint_path": resolved_checkpoint,
                "benchmark_dir": str(benchmark_dir),
                "task_horizon_steps": exp_config.task_horizon,
                "task_horizon_sec": exp_config.task_horizon / exp_config.fps,
                "exp_config_cls": config_name,
                "num_episodes": total_episodes,
                "num_houses": num_houses,
            }
        )

    # Create or use provided policy
    if preloaded_policy is not None:
        policy = preloaded_policy
    else:
        policy = exp_config.policy_config.policy_cls(exp_config, exp_config.task_type)

    # # Run evaluation
    # runner = JsonEvalRunner(exp_config, benchmark_dir)
    # success_count, total_count = runner.run(preloaded_policy=policy)

    # Run evaluation
    # Only pass preloaded policy for single-worker mode. With multiple workers,
    # each worker must create its own connection (WebSocket/msgpack can't be pickled).
    runner = JsonEvalRunner(exp_config, benchmark_dir)
    success_count, total_count = runner.run(preloaded_policy=policy)

    # Collect per-episode results
    episode_results = collect_episode_results(resolved_output_dir)

    # Log to wandb if enabled
    if use_wandb:
        import wandb

        camera_names = getattr(exp_config.policy_config, "camera_names", [])
        if camera_names:
            success_status = build_success_status_map(episode_results)
            composed_videos = compose_episode_videos(
                eval_dir=resolved_output_dir,
                camera_names=camera_names,
                success_status=success_status,
            )
        else:
            composed_videos = {}

        log_eval_results_to_wandb(
            results=episode_results,
            composed_videos=composed_videos,
        )
        wandb.finish()

    return EvaluationResults(
        success_count=success_count,
        total_count=total_count,
        output_dir=resolved_output_dir,
        episode_results=episode_results,
        exp_config=exp_config,
    )

json_eval_runner

JSON-based benchmark evaluation runner.

This runner loads episode specifications from JSON benchmark files and runs policy evaluations against them. Unlike the pickle-based frozen config approach, JSON specs are fully self-contained and human-readable.

Key design principles: - Each episode is fully self-contained in JSON (no external config dependencies) - Timing parameters (policy_dt_ms, ctrl_dt_ms, sim_dt_ms) come from the eval config, NOT from individual episodes. This allows the same benchmark to be run at different control rates without modifying the benchmark files. - Task type can vary per episode (mixed task types in same benchmark) - No patch_config needed - JSON is authoritative

Usage

from molmo_spaces.evaluation import JsonEvalRunner, load_benchmark

Load benchmark and create config

metadata, episodes_by_house = load_benchmark(benchmark_dir) runner = JsonEvalRunner(exp_config, benchmark_dir) success_count, total_count = runner.run(preloaded_policy=policy)

Classes:

Name Description
JsonEvalRunner

Evaluation runner for JSON-based benchmarks.

Attributes:

Name Type Description
log

log module-attribute

log = getLogger(__name__)

JsonEvalRunner

JsonEvalRunner(exp_config: MlSpacesExpConfig, benchmark_dir: Path)

Bases: ParallelRolloutRunner

Evaluation runner for JSON-based benchmarks.

This runner differs from the standard ParallelRolloutRunner in several ways: 1. Episodes are loaded from JSON files, not from H5 frozen configs 2. Each episode is fully self-contained (timing, cameras, task config) 3. Task samplers are created per-episode to support mixed task types 4. Uses patch_config to add evaluation-specific runtime parameters

The runner inherits process_single_house from ParallelRolloutRunner and customizes behavior by overriding hook methods.

Initialize the JSON eval runner.

The benchmark is authoritative - all episode data comes from the JSON files. No fallbacks or defaults; missing data is an error.

Parameters:

Name Type Description Default
exp_config MlSpacesExpConfig

Base experiment config (provides robot_config, policy_config)

required
benchmark_dir Path

Path to benchmark directory containing benchmark.json

required

Methods:

Name Description
adjust_robot

Apply robot-specific evaluation overrides if configured.

get_episode_seed

Get seed from episode spec, falling back to index.

get_episode_spec_at_index

Get episode specification at given index.

get_episode_task_sampler

Create per-episode JsonEvalTaskSampler.

get_episodes_for_house

Get all episode specs for a given house.

get_max_episode_attempts

Process all episodes in the benchmark - no retry multiplier.

load_episodes_for_house

Load episode specifications from JSON benchmark.

patch_config

Patch evaluation config with runtime evaluation-specific parameters.

prepare_episode_config

Prepare episode-specific config from JSON spec.

process_single_house

Process all episodes for a single house using customizable hooks.

run

Run house-by-house rollouts using multiprocessing workers.

run_single_rollout

Execute a single rollout with the given task and policy.

sample_task_from_spec

Sample task - episode spec is already in the JsonEvalTaskSampler.

should_close_episode_task_sampler

Close task sampler after each episode - we create per-episode.

should_stop_early

Stop early if evaluating a single episode (--idx provided) and it's been collected.

Attributes:

Name Type Description
benchmark_dir
completed_houses
config
counter_lock
house_counter
house_indices
logger
max_allowed_sequential_irrecoverable_failures
max_allowed_sequential_rollout_failures
max_allowed_sequential_task_sampler_failures
profiler
samples_per_house
shutdown_event
skipped_houses
success_count
total_count
total_houses
wandb_enabled
Source code in molmo_spaces/evaluation/json_eval_runner.py
def __init__(
    self,
    exp_config: MlSpacesExpConfig,
    benchmark_dir: Path,
) -> None:
    """
    Initialize the JSON eval runner.

    The benchmark is authoritative - all episode data comes from the JSON files.
    No fallbacks or defaults; missing data is an error.

    Args:
        exp_config: Base experiment config (provides robot_config, policy_config)
        benchmark_dir: Path to benchmark directory containing benchmark.json
    """
    self.benchmark_dir = benchmark_dir.resolve()

    all_episodes = load_all_episodes(self.benchmark_dir)
    if not all_episodes:
        raise ValueError(
            f"No episodes found in benchmark at {self.benchmark_dir}. "
            f"Expected benchmark.json file with list of episode specs."
        )

    eval_params = exp_config.eval_runtime_params
    if eval_params.max_episodes is not None and len(all_episodes) > eval_params.max_episodes:
        log.info(
            f"Limiting to first {eval_params.max_episodes} of {len(all_episodes)} episodes"
        )
        all_episodes = all_episodes[: eval_params.max_episodes]

    self._episodes_by_house: dict[int, list[EpisodeSpec]] = defaultdict(list)
    for ep in all_episodes:
        self._episodes_by_house[ep.house_index].append(ep)
    self._episodes_by_house = dict(self._episodes_by_house)

    # If episode_idx is specified, only process the house containing that episode
    episode_idx = eval_params.episode_idx
    if episode_idx is not None:
        if episode_idx < 0 or episode_idx >= len(all_episodes):
            raise ValueError(
                f"Episode index {episode_idx} is out of range. "
                f"Benchmark has {len(all_episodes)} episodes (indices 0-{len(all_episodes) - 1})"
            )
        target_episode = all_episodes[episode_idx]
        # Only process the house containing the target episode
        exp_config.task_sampler_config.house_inds = [target_episode.house_index]
        exp_config.task_sampler_config.samples_per_house = 1
    else:
        exp_config.task_sampler_config.house_inds = sorted(self._episodes_by_house.keys())
        max_episodes = max(len(eps) for eps in self._episodes_by_house.values())
        exp_config.task_sampler_config.samples_per_house = max_episodes
    exp_config.benchmark_path = self.benchmark_dir

    super().__init__(exp_config)

    total_episodes = sum(len(eps) for eps in self._episodes_by_house.values())
    log.info(
        f"JsonEvalRunner initialized: {len(self._episodes_by_house)} houses, "
        f"{total_episodes} episodes from {self.benchmark_dir}"
    )
benchmark_dir instance-attribute
benchmark_dir = resolve()
completed_houses instance-attribute
completed_houses = Value('i', 0)
config instance-attribute
config = exp_config
counter_lock instance-attribute
counter_lock = Lock()
house_counter instance-attribute
house_counter = Value('i', 0)
house_indices instance-attribute
house_indices = house_inds
logger instance-attribute
logger = get_logger()
max_allowed_sequential_irrecoverable_failures instance-attribute
max_allowed_sequential_irrecoverable_failures = max_allowed_sequential_irrecoverable_failures
max_allowed_sequential_rollout_failures instance-attribute
max_allowed_sequential_rollout_failures = max_allowed_sequential_rollout_failures
max_allowed_sequential_task_sampler_failures instance-attribute
max_allowed_sequential_task_sampler_failures = max_allowed_sequential_task_sampler_failures
profiler instance-attribute
profiler = profiler
samples_per_house instance-attribute
samples_per_house = samples_per_house
shutdown_event instance-attribute
shutdown_event = Event()
skipped_houses instance-attribute
skipped_houses = Value('i', 0)
success_count instance-attribute
success_count = Value('i', 0)
total_count instance-attribute
total_count = Value('i', 0)
total_houses instance-attribute
total_houses = len(house_indices)
wandb_enabled instance-attribute
wandb_enabled = True
adjust_robot staticmethod
adjust_robot(exp_config: MlSpacesExpConfig) -> None

Apply robot-specific evaluation overrides if configured.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def adjust_robot(exp_config: MlSpacesExpConfig) -> None:
    """Apply robot-specific evaluation overrides if configured."""
    robot_override = get_robot_override(exp_config.robot_config)
    if robot_override is not None:
        exp_config._robot_eval_override = robot_override
get_episode_seed staticmethod
get_episode_seed(episode_idx: int, episode_spec: EpisodeSpec, task_sampler: JsonEvalTaskSampler) -> int

Get seed from episode spec, falling back to index.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def get_episode_seed(
    episode_idx: int,
    episode_spec: EpisodeSpec,
    task_sampler: JsonEvalTaskSampler,
) -> int:
    """Get seed from episode spec, falling back to index."""
    return episode_spec.seed if episode_spec.seed is not None else episode_idx
get_episode_spec_at_index staticmethod
get_episode_spec_at_index(episode_specs: list[EpisodeSpec], idx: int) -> EpisodeSpec

Get episode specification at given index.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def get_episode_spec_at_index(episode_specs: list[EpisodeSpec], idx: int) -> EpisodeSpec:
    """Get episode specification at given index."""
    return episode_specs[idx]
get_episode_task_sampler staticmethod
get_episode_task_sampler(exp_config: MlSpacesExpConfig, episode_spec: EpisodeSpec, shared_task_sampler, datagen_profiler: DatagenProfiler | None) -> JsonEvalTaskSampler

Create per-episode JsonEvalTaskSampler.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def get_episode_task_sampler(
    exp_config: MlSpacesExpConfig,
    episode_spec: EpisodeSpec,
    shared_task_sampler,
    datagen_profiler: DatagenProfiler | None,
) -> JsonEvalTaskSampler:
    """Create per-episode JsonEvalTaskSampler."""
    sampler = JsonEvalTaskSampler(exp_config, episode_spec)
    if datagen_profiler is not None:
        sampler.set_datagen_profiler(datagen_profiler)
    return sampler
get_episodes_for_house
get_episodes_for_house(house_id: int) -> list[EpisodeSpec]

Get all episode specs for a given house.

Source code in molmo_spaces/evaluation/json_eval_runner.py
def get_episodes_for_house(self, house_id: int) -> list[EpisodeSpec]:
    """Get all episode specs for a given house."""
    if house_id not in self._episodes_by_house:
        raise KeyError(
            f"House {house_id} not found in benchmark. "
            f"Available houses: {sorted(self._episodes_by_house.keys())}"
        )
    return self._episodes_by_house[house_id]
get_max_episode_attempts staticmethod
get_max_episode_attempts(episode_specs: list[EpisodeSpec], samples_per_house: int, exp_config: MlSpacesExpConfig) -> int

Process all episodes in the benchmark - no retry multiplier.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def get_max_episode_attempts(
    episode_specs: list[EpisodeSpec],
    samples_per_house: int,
    exp_config: MlSpacesExpConfig,
) -> int:
    """Process all episodes in the benchmark - no retry multiplier."""
    return len(episode_specs)
load_episodes_for_house staticmethod
load_episodes_for_house(exp_config: MlSpacesExpConfig, house_id: int, batch_suffix: str, worker_task_sampler, worker_logger) -> tuple[list[EpisodeSpec], None]

Load episode specifications from JSON benchmark.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def load_episodes_for_house(
    exp_config: MlSpacesExpConfig,
    house_id: int,
    batch_suffix: str,
    worker_task_sampler,
    worker_logger,
) -> tuple[list[EpisodeSpec], None]:
    """Load episode specifications from JSON benchmark."""
    benchmark_path = exp_config.benchmark_path
    all_episodes = load_all_episodes(benchmark_path)

    if not all_episodes:
        worker_logger.error(
            f"No episodes found in benchmark at {benchmark_path}. Expected benchmark.json file."
        )
        return [], None

    eval_params = exp_config.eval_runtime_params

    # Truncate to max_episodes before any filtering
    if eval_params.max_episodes is not None and len(all_episodes) > eval_params.max_episodes:
        all_episodes = all_episodes[: eval_params.max_episodes]

    # Filter by episode index if specified
    episode_idx = eval_params.episode_idx
    if episode_idx is not None:
        if episode_idx < 0 or episode_idx >= len(all_episodes):
            worker_logger.error(
                f"Episode index {episode_idx} is out of range. "
                f"Benchmark has {len(all_episodes)} episodes (indices 0-{len(all_episodes) - 1})"
            )
            return [], None
        # Filter to only the specified episode, but still need to check house_id
        target_episode = all_episodes[episode_idx]
        if target_episode.house_index != house_id:
            # This house doesn't contain the target episode, return empty list
            return [], None
        all_episodes = [target_episode]

    house_episodes = [ep for ep in all_episodes if ep.house_index == house_id]

    if not house_episodes:
        available_houses = sorted(set(ep.house_index for ep in all_episodes))
        worker_logger.error(
            f"House {house_id} not found in benchmark. Available houses: {available_houses}"
        )
        return [], None

    # Apply custom object replacement if requested
    add_custom_object = eval_params.add_custom_object
    custom_object_path = eval_params.custom_object_path
    custom_object_name = eval_params.custom_object_name
    if add_custom_object and custom_object_path is not None:
        from pathlib import Path

        from molmo_spaces.evaluation.benchmark_schema import replace_target_object_with_custom

        custom_object_path = Path(custom_object_path)
        worker_logger.info(f"Replacing target objects with custom object: {custom_object_path}")
        if custom_object_name:
            worker_logger.info(f"Using custom object name: '{custom_object_name}'")
        house_episodes = [
            replace_target_object_with_custom(ep, custom_object_path, custom_object_name)
            for ep in house_episodes
        ]

    worker_logger.info(
        f"Loaded {len(house_episodes)} episodes for house {house_id} from {benchmark_path}"
    )
    return house_episodes, None
patch_config staticmethod
patch_config(exp_config: MlSpacesExpConfig, episode_idx: int | None = None, max_episodes: int | None = None, add_custom_object: bool = False, custom_object_path: str | Path | None = None, custom_object_name: str | None = None) -> MlSpacesExpConfig

Patch evaluation config with runtime evaluation-specific parameters.

This method modifies the config object to store evaluation-specific runtime parameters that are not part of the base config schema. These parameters are used by the evaluation runner to customize episode processing.

Parameters:

Name Type Description Default
exp_config MlSpacesExpConfig

The experiment config to patch

required
episode_idx int | None

Optional index of a specific episode to evaluate. If provided, only that episode will be evaluated and the process will stop after it.

None
max_episodes int | None

Optional maximum number of episodes to evaluate. If provided, only the episodes for the houses used in the first N episodes will be evaluated. Note that the final number of episodes can differ from N if more than one episode is sampled for any of the houses among the first N episodes.

None
add_custom_object bool

Whether to replace the target object with a custom object.

False
custom_object_path str | Path | None

Path to the custom object XML file. Required if add_custom_object is True.

None
custom_object_name str | None

Natural language name for the custom object (e.g., 'lemon', 'cup').

None

Returns:

Type Description
MlSpacesExpConfig

The patched config (same object, modified in place)

Note

These parameters are stored in an EvalRuntimeParams dataclass attached to the config object as exp_config.eval_runtime_params for access by worker processes. They are not part of the base MlSpacesExpConfig schema but are necessary for runtime evaluation customization.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def patch_config(
    exp_config: MlSpacesExpConfig,
    episode_idx: int | None = None,
    max_episodes: int | None = None,
    add_custom_object: bool = False,
    custom_object_path: str | Path | None = None,
    custom_object_name: str | None = None,
) -> MlSpacesExpConfig:
    """Patch evaluation config with runtime evaluation-specific parameters.

    This method modifies the config object to store evaluation-specific runtime
    parameters that are not part of the base config schema. These parameters are
    used by the evaluation runner to customize episode processing.

    Args:
        exp_config: The experiment config to patch
        episode_idx: Optional index of a specific episode to evaluate. If provided,
            only that episode will be evaluated and the process will stop after it.
        max_episodes: Optional maximum number of episodes to evaluate. If provided,
            only the episodes for the houses used in the first N episodes will be
            evaluated. Note that the final number of episodes can differ from N
            if more than one episode is sampled for any of the houses among the
            first N episodes.
        add_custom_object: Whether to replace the target object with a custom object.
        custom_object_path: Path to the custom object XML file. Required if
            add_custom_object is True.
        custom_object_name: Natural language name for the custom object (e.g., 'lemon', 'cup').

    Returns:
        The patched config (same object, modified in place)

    Note:
        These parameters are stored in an EvalRuntimeParams dataclass attached to
        the config object as `exp_config.eval_runtime_params` for access by worker
        processes. They are not part of the base MlSpacesExpConfig schema but are
        necessary for runtime evaluation customization.
    """
    # Import here to avoid circular dependency
    from molmo_spaces.evaluation.eval_main import EvalRuntimeParams

    # eval_runtime_params is now a proper field in MlSpacesExpConfig, so normal assignment works
    exp_config.eval_runtime_params = EvalRuntimeParams(
        episode_idx=episode_idx,
        max_episodes=max_episodes,
        add_custom_object=add_custom_object,
        custom_object_path=custom_object_path,
        custom_object_name=custom_object_name,
    )

    return exp_config
prepare_episode_config staticmethod
prepare_episode_config(exp_config: MlSpacesExpConfig, episode_spec: EpisodeSpec, episode_idx: int) -> MlSpacesExpConfig

Prepare episode-specific config from JSON spec.

Note: task_horizon is NOT read from episode_spec. It's an evaluation parameter that comes from exp_config (set via command line or defaults).

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def prepare_episode_config(
    exp_config: MlSpacesExpConfig,
    episode_spec: EpisodeSpec,
    episode_idx: int,
) -> MlSpacesExpConfig:
    """Prepare episode-specific config from JSON spec.

    Note: task_horizon is NOT read from episode_spec. It's an evaluation
    parameter that comes from exp_config (set via command line or defaults).
    """
    episode_config = exp_config.model_copy(deep=True)
    episode_config.scene_dataset = episode_spec.scene_dataset
    episode_config.data_split = episode_spec.data_split
    # task_horizon comes from exp_config, not episode_spec
    return episode_config
process_single_house staticmethod
process_single_house(worker_id: int, worker_logger, house_id: int, exp_config: MlSpacesExpConfig, samples_per_house: int, shutdown_event, task_sampler, preloaded_policy: BasePolicy | None = None, max_allowed_sequential_task_sampler_failures: int = 10, max_allowed_sequential_rollout_failures: int = 10, filter_for_successful_trajectories: bool = False, runner_class=None, batch_num: int | None = None, total_batches: int | None = None, datagen_profiler: DatagenProfiler | None = None) -> tuple[int, int, bool]

Process all episodes for a single house using customizable hooks.

This method uses a while loop to iterate over episodes, calling hook methods via runner_class to allow subclasses to customize behavior without duplicating the entire method.

Hooks called (override in subclass to customize): - load_episodes_for_house: Load episode specs from source (JSON, etc.) - get_max_episode_attempts: Maximum iterations of the episode loop - should_stop_early: Whether to stop before max attempts (e.g., enough successes) - prepare_episode_config: Modify config per-episode - get_episode_task_sampler: Get/create task sampler for episode - sample_task_from_spec: Sample task from specification - get_episode_seed: Get seed for episode - should_close_episode_task_sampler: Whether to close sampler per-episode

Parameters:

Name Type Description Default
worker_id int

ID of the worker thread/process

required
worker_logger

Logger instance for this worker

required
house_id int

Index of the house to process

required
exp_config MlSpacesExpConfig

Experiment configuration

required
samples_per_house int

Number of episodes to collect for this house

required
shutdown_event

Event to signal shutdown

required
task_sampler

Task sampler instance (shared across houses for this worker)

required
preloaded_policy BasePolicy | None

Optional pre-initialized policy instance

None
max_allowed_sequential_task_sampler_failures int

Max consecutive task sampling failures

10
max_allowed_sequential_rollout_failures int

Max consecutive rollout failures

10
filter_for_successful_trajectories bool

Whether to filter for successful trajectories only

False
runner_class

Runner class with hook methods to call

None
batch_num int | None

Batch number for this house (for batched processing)

None
total_batches int | None

Total number of batches for this house

None
datagen_profiler DatagenProfiler | None

DatagenProfiler for per-worker timing (optional)

None

Returns:

Name Type Description
tuple tuple[int, int, bool]

(house_success_count, house_total_count, irrecoverable_failure_flag)

Source code in molmo_spaces/data_generation/pipeline.py
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
@staticmethod
def process_single_house(
    worker_id: int,
    worker_logger,
    house_id: int,
    exp_config: MlSpacesExpConfig,
    samples_per_house: int,
    shutdown_event,
    task_sampler,
    preloaded_policy: BasePolicy | None = None,
    max_allowed_sequential_task_sampler_failures: int = 10,
    max_allowed_sequential_rollout_failures: int = 10,
    filter_for_successful_trajectories: bool = False,
    runner_class=None,
    batch_num: int | None = None,
    total_batches: int | None = None,
    datagen_profiler: DatagenProfiler | None = None,
) -> tuple[int, int, bool]:
    """
    Process all episodes for a single house using customizable hooks.

    This method uses a while loop to iterate over episodes, calling hook methods
    via runner_class to allow subclasses to customize behavior without duplicating
    the entire method.

    Hooks called (override in subclass to customize):
    - load_episodes_for_house: Load episode specs from source (JSON, etc.)
    - get_max_episode_attempts: Maximum iterations of the episode loop
    - should_stop_early: Whether to stop before max attempts (e.g., enough successes)
    - prepare_episode_config: Modify config per-episode
    - get_episode_task_sampler: Get/create task sampler for episode
    - sample_task_from_spec: Sample task from specification
    - get_episode_seed: Get seed for episode
    - should_close_episode_task_sampler: Whether to close sampler per-episode

    Args:
        worker_id: ID of the worker thread/process
        worker_logger: Logger instance for this worker
        house_id: Index of the house to process
        exp_config: Experiment configuration
        samples_per_house: Number of episodes to collect for this house
        shutdown_event: Event to signal shutdown
        task_sampler: Task sampler instance (shared across houses for this worker)
        preloaded_policy: Optional pre-initialized policy instance
        max_allowed_sequential_task_sampler_failures: Max consecutive task sampling failures
        max_allowed_sequential_rollout_failures: Max consecutive rollout failures
        filter_for_successful_trajectories: Whether to filter for successful trajectories only
        runner_class: Runner class with hook methods to call
        batch_num: Batch number for this house (for batched processing)
        total_batches: Total number of batches for this house
        datagen_profiler: DatagenProfiler for per-worker timing (optional)

    Returns:
        tuple: (house_success_count, house_total_count, irrecoverable_failure_flag)
    """
    house_success_count = 0
    house_total_count = 0
    irrecoverable_failure_in_house = False

    # Setup directories and check for existing output
    house_output_dir, house_debug_dir, batch_suffix, should_skip = setup_house_dirs(
        exp_config, house_id, batch_num, total_batches
    )
    if should_skip:
        worker_logger.info(
            f"SKIPPING HOUSE {house_id} BATCH {batch_num}/{total_batches}: "
            f"Output already exists at {house_output_dir / f'trajectories{batch_suffix}.h5'}"
        )
        return 0, 0, False

    # Load episodes using hook - allows subclasses to load from different scene datasets
    episode_specs, shared_task_sampler = runner_class.load_episodes_for_house(
        exp_config, house_id, batch_suffix, task_sampler, worker_logger
    )

    if not episode_specs:
        worker_logger.warning(f"No episodes to process for house {house_id}")
        return 0, 0, False

    max_attempts = runner_class.get_max_episode_attempts(
        episode_specs, samples_per_house, exp_config
    )

    # Collect raw history data for this house
    house_raw_histories = []
    house_debug_raw_histories = []

    # Sequential failure tracking
    num_sequential_task_sampler_failures = 0
    num_sequential_rollout_failures = 0
    viewer = None

    # While loop with explicit index - allows subclasses to customize iteration
    episode_idx = 0
    while episode_idx < max_attempts:
        # Check early stop condition (e.g., enough successes for datagen)
        should_stop = runner_class.should_stop_early(
            len(house_raw_histories), samples_per_house, exp_config=exp_config
        )
        if should_stop:
            break

        # Check for shutdown signal
        if shutdown_event.is_set():
            worker_logger.info(f"Worker {worker_id} house {house_id} received shutdown signal")
            irrecoverable_failure_in_house = True
            break

        # Check for too many consecutive task sampling failures
        if num_sequential_task_sampler_failures >= max_allowed_sequential_task_sampler_failures:
            worker_logger.error(
                f"Worker {worker_id} house {house_id} encountered "
                f"{num_sequential_task_sampler_failures} consecutive task sampling failures. "
                "This is unrecoverable."
            )
            irrecoverable_failure_in_house = True
            break

        # Check for too many consecutive rollout failures
        if num_sequential_rollout_failures >= max_allowed_sequential_rollout_failures:
            worker_logger.error(
                f"Worker {worker_id} house {house_id} rollout failed across "
                f"{num_sequential_rollout_failures} retries. This is irrecoverable."
            )
            irrecoverable_failure_in_house = True
            break

        # Get episode spec for this iteration
        episode_spec = runner_class.get_episode_spec_at_index(episode_specs, episode_idx)

        # Track state for this episode
        task = None
        policy = None
        episode_task_sampler = None
        success = False
        task_sampling_failed = False
        house_invalid = False

        if datagen_profiler is not None:
            datagen_profiler.start("episode_total")

        # Prepare episode-specific config
        episode_config = runner_class.prepare_episode_config(
            exp_config, episode_spec, episode_idx
        )

        with cleanup_context():
            if viewer is not None:
                viewer.close()
                viewer = None

            # Task sampling phase
            task_sampling_start = time.perf_counter()

            try:
                # Get task sampler for this episode (shared or per-episode)
                episode_task_sampler = runner_class.get_episode_task_sampler(
                    episode_config, episode_spec, shared_task_sampler, datagen_profiler
                )

                # Sample task
                task = runner_class.sample_task_from_spec(
                    episode_task_sampler, house_id, episode_spec, episode_idx
                )

                if task is None:
                    worker_logger.info(
                        f"Worker {worker_id} house {house_id} episode {episode_idx}: "
                        "task sampling returned None"
                    )
                    house_invalid = True
                else:
                    # Record successful sampling time
                    if datagen_profiler is not None:
                        datagen_profiler.record(
                            "task_sampling", time.perf_counter() - task_sampling_start
                        )
                        task.set_datagen_profiler(datagen_profiler)

                    num_sequential_task_sampler_failures = 0

                    worker_logger.info(
                        f"Worker {worker_id} house {house_id} episode {episode_idx}/{max_attempts} "
                        f"collected={len(house_raw_histories)}/{samples_per_house}"
                    )

            except HouseInvalidForTask as e:
                traceback.print_exc()
                worker_logger.warning(
                    f"Worker {worker_id} house {house_id} episode {episode_idx} "
                    f"HouseInvalidForTask: {e.reason}"
                )
                house_invalid = True
                if datagen_profiler is not None:
                    datagen_profiler.record(
                        "task_sampling_failed", time.perf_counter() - task_sampling_start
                    )

            except Exception as e:
                traceback.print_exc()
                worker_logger.error(
                    f"Worker {worker_id} house {house_id} episode {episode_idx} "
                    f"task sampling error: {str(e)}"
                )
                num_sequential_task_sampler_failures += 1
                task_sampling_failed = True
                if datagen_profiler is not None:
                    datagen_profiler.record(
                        "task_sampling_failed", time.perf_counter() - task_sampling_start
                    )

            # Rollout phase (only if task sampling succeeded)
            if task is not None and not house_invalid and not task_sampling_failed:
                try:
                    # Setup policy and viewer
                    policy = setup_policy(
                        episode_config, task, preloaded_policy, datagen_profiler
                    )
                    viewer = setup_viewer(episode_config, task, policy, viewer)

                    # Get episode seed
                    episode_seed = runner_class.get_episode_seed(
                        episode_idx, episode_spec, episode_task_sampler
                    )

                    # Run the rollout
                    success = runner_class.run_single_rollout(
                        episode_seed=episode_seed,
                        task=task,
                        policy=policy,
                        profiler=episode_config.profiler,
                        viewer=viewer,
                        shutdown_event=shutdown_event,
                        datagen_profiler=datagen_profiler,
                        end_on_success=exp_config.end_on_success,
                    )

                    num_sequential_rollout_failures = 0

                    # Extract object name for logging if available
                    object_name = "unknown"
                    if hasattr(task, "config") and hasattr(task.config, "task_config"):
                        if hasattr(task.config.task_config, "pickup_obj_name"):
                            object_name = task.config.task_config.pickup_obj_name

                    worker_logger.info(
                        f"Worker {worker_id} house {house_id} episode {episode_idx} "
                        f"object {object_name} completed with success={success}"
                    )

                    # Collect trajectory
                    should_save = success or not filter_for_successful_trajectories
                    history = task.get_history()
                    should_save_debug = not should_save and random.random() < 0.01

                    if should_save or should_save_debug:
                        episode_info = {
                            "history": history,
                            "sensor_suite": task.sensor_suite,
                            "success": success,
                            "seed": episode_seed,
                        }
                        if should_save:
                            house_raw_histories.append(episode_info)
                        elif should_save_debug:
                            house_debug_raw_histories.append(episode_info)
                            worker_logger.info(
                                f"Queueing failed trajectory for debug (seed: {episode_seed})"
                            )
                    else:
                        del history

                    # Update house counters
                    house_total_count += 1
                    if success:
                        house_success_count += 1
                    else:
                        # Report failure for this asset (may lead to dynamic blacklisting)
                        asset_uid = task_sampler.get_asset_uid_from_object(
                            task.env, object_name
                        )
                        if asset_uid:
                            task_sampler.report_asset_failure(
                                asset_uid, "rollout failed (e.g., IK failure)"
                            )

                    if datagen_profiler is not None:
                        datagen_profiler.end("episode_total")
                        datagen_profiler.log_episode_summary(
                            episode_idx=episode_idx,
                            house_id=house_id,
                            success=success,
                        )

                except Exception as e:
                    worker_logger.error(
                        f"Worker {worker_id} house {house_id} episode {episode_idx} rollout error: {str(e)}"
                    )
                    traceback.print_exc()
                    num_sequential_rollout_failures += 1

                    # Report failure for this asset (may lead to dynamic blacklisting)
                    try:
                        asset_uid = task_sampler.get_asset_uid_from_object(
                            task.env, object_name
                        )
                        if asset_uid:
                            task_sampler.report_asset_failure(
                                asset_uid, f"rollout exception: {e}"
                            )
                    except Exception:
                        pass  # Don't let failure tracking break the error handling

                    if datagen_profiler is not None:
                        datagen_profiler.end("episode_total")

            else:
                # Task sampling failed or house invalid
                if datagen_profiler is not None:
                    datagen_profiler.end("episode_total")

            # Cleanup resources
            cleanup_episode_resources(
                task=task,
                policy=policy,
                task_sampler=episode_task_sampler,
                preloaded_policy=preloaded_policy,
                close_task_sampler=runner_class.should_close_episode_task_sampler(),
            )

        # Handle house invalid - break after cleanup
        if house_invalid:
            irrecoverable_failure_in_house = True
            break

        # Always increment episode index
        episode_idx += 1

    # Cleanup viewer
    if viewer is not None:
        viewer.close()
        viewer = None

    # Check shutdown signal before saving
    if shutdown_event.is_set():
        worker_logger.info(
            f"Worker {worker_id} house {house_id} shutdown requested, skipping save"
        )
        return house_success_count, house_total_count, True

    # Save trajectories
    save_house_trajectories(
        worker_logger,
        house_raw_histories,
        house_output_dir,
        exp_config,
        batch_suffix,
        datagen_profiler,
        batch_num,
        total_batches,
    )

    # Save debug trajectories
    save_house_trajectories(
        worker_logger,
        house_debug_raw_histories,
        house_debug_dir,
        exp_config,
        batch_suffix,
        datagen_profiler=None,
        batch_num=batch_num,
        total_batches=total_batches,
    )

    worker_logger.info(
        f"Worker {worker_id} completed house {house_id}: "
        f"{house_success_count}/{house_total_count} successful episodes"
    )

    if datagen_profiler is not None:
        datagen_profiler.log_house_summary(
            house_id=house_id,
            success_count=house_success_count,
            total_count=house_total_count,
        )

    return house_success_count, house_total_count, irrecoverable_failure_in_house
run
run(preloaded_policy: BasePolicy | None = None) -> tuple[int, int]

Run house-by-house rollouts using multiprocessing workers.

Parameters:

Name Type Description Default
preloaded_policy BasePolicy | None

Optional pre-initialized policy instance to use for rollouts. If None, a new policy will be created for each rollout.

None

Returns:

Name Type Description
tuple tuple[int, int]

(success_count, total_count)

Source code in molmo_spaces/data_generation/pipeline.py
def run(self, preloaded_policy: BasePolicy | None = None) -> tuple[int, int]:
    """
    Run house-by-house rollouts using multiprocessing workers.

    Args:
        preloaded_policy: Optional pre-initialized policy instance to use for rollouts.
            If None, a new policy will be created for each rollout.

    Returns:
        tuple: (success_count, total_count)
    """
    total_expected_episodes = self.total_houses * self.samples_per_house
    self.logger.info(
        f"Starting house-by-house rollout of {self.total_houses} houses "
        f"with {self.samples_per_house} episodes each ({total_expected_episodes} total episodes) "
        f"using {self.config.num_workers} worker processes"
    )

    # make a copy of the config in the output directory
    self.logger.info("Evaluation configuration:")
    self.logger.info(pprint.pformat(self.config.model_dump()))
    self.config.save_config(output_dir=Path(self.config.output_dir))

    # Start timing for WandB metrics
    start_time = time.time()

    # Launch worker processes
    if self.config.num_workers > 1:
        processes = []
        for worker_id in range(self.config.num_workers):
            p = mp_context.Process(
                target=house_processing_worker,
                args=(
                    worker_id,
                    self.config,
                    self.house_indices,
                    self.samples_per_house,
                    self.shutdown_event,
                    self.counter_lock,
                    self.house_counter,
                    self.success_count,
                    self.total_count,
                    self.completed_houses,
                    self.skipped_houses,
                    self.max_allowed_sequential_task_sampler_failures,
                    self.max_allowed_sequential_rollout_failures,
                    self.max_allowed_sequential_irrecoverable_failures,
                    preloaded_policy,
                    self.config.filter_for_successful_trajectories,
                    type(self),  # Pass the runner class to enable customization via subclassing
                ),
            )
            p.start()
            processes.append(p)

        # Periodic logging loop that monitors progress while workers run
        last_log_time = start_time
        log_interval = 60  # Log every 60 seconds

        while any(p.is_alive() for p in processes):
            # Check if it's time to log
            current_time = time.time()
            if self.wandb_enabled and (current_time - last_log_time) >= log_interval:
                try:
                    # Read current progress from shared counters
                    elapsed_time = current_time - start_time
                    completed = self.completed_houses.value
                    skipped = self.skipped_houses.value
                    success = self.success_count.value
                    total = self.total_count.value
                    active = sum(1 for p in processes if p.is_alive())

                    # Calculate metrics
                    success_rate = success / total if total > 0 else 0.0
                    episodes_per_second = total / elapsed_time if elapsed_time > 0 else 0.0
                    completion_percentage = (completed + skipped) / self.total_houses * 100

                    # Log to WandB
                    wandb.log(
                        {
                            "elapsed_time_seconds": elapsed_time,
                            "elapsed_time_hours": elapsed_time / 3600,
                            "completed_houses": completed,
                            "skipped_houses": skipped,
                            "success_count": success,
                            "total_count": total,
                            "success_rate": success_rate,
                            "episodes_per_second": episodes_per_second,
                            "active_workers": active,
                            "completion_percentage": completion_percentage,
                        }
                    )
                    self.logger.info(
                        f"Progress: {completed}/{self.total_houses} houses completed "
                        f"({completion_percentage:.1f}%), {success}/{total} successful episodes "
                        f"({success_rate * 100:.1f}%), {active} workers active"
                    )
                    last_log_time = current_time
                except Exception as e:
                    self.logger.warning(f"WandB periodic logging failed: {e}")

            # Sleep briefly before checking again

            time.sleep(5)

        # Wait for all processes to complete
        for p in processes:
            p.join()
            p.close()

    else:
        # Single-worker mode runs in the main process
        house_processing_worker(
            worker_id=0,
            exp_config=self.config,
            house_indices=self.house_indices,
            samples_per_house=self.samples_per_house,
            shutdown_event=self.shutdown_event,
            counter_lock=self.counter_lock,
            house_counter=self.house_counter,
            success_count=self.success_count,
            total_count=self.total_count,
            completed_houses=self.completed_houses,
            skipped_houses=self.skipped_houses,
            max_allowed_sequential_task_sampler_failures=self.max_allowed_sequential_task_sampler_failures,
            max_allowed_sequential_rollout_failures=self.max_allowed_sequential_rollout_failures,
            max_allowed_sequential_irrecoverable_failures=self.max_allowed_sequential_irrecoverable_failures,
            preloaded_policy=preloaded_policy,
            filter_for_successful_trajectories=self.config.filter_for_successful_trajectories,
            runner_class=type(
                self
            ),  # Pass the runner class to enable customization via subclassing
        )

    # Extract final values from shared multiprocessing state
    success_count_val = self.success_count.value
    total_count_val = self.total_count.value
    completed_houses_val = self.completed_houses.value
    skipped_houses_val = self.skipped_houses.value

    success_rate = success_count_val / total_count_val if total_count_val > 0 else 0.0
    self.logger.info(
        f"Completed {completed_houses_val} houses, skipped {skipped_houses_val} houses"
    )
    self.logger.info(f"Success count: {success_count_val}, Total count: {total_count_val}")
    self.logger.info(f"Success rate: {success_rate * 100:.2f}%")

    # Log final metrics to WandB
    if self.wandb_enabled:
        try:
            final_elapsed_time = time.time() - start_time
            wandb.log(
                {
                    "final_success_count": success_count_val,
                    "final_total_count": total_count_val,
                    "final_success_rate": success_rate,
                    "final_completed_houses": completed_houses_val,
                    "final_skipped_houses": skipped_houses_val,
                    "final_elapsed_time_seconds": final_elapsed_time,
                    "final_elapsed_time_hours": final_elapsed_time / 3600,
                }
            )
            wandb.finish()
            self.logger.info("WandB logging finished")
        except Exception as e:
            self.logger.warning(f"WandB final logging failed: {e}")

    return success_count_val, total_count_val
run_single_rollout staticmethod
run_single_rollout(episode_seed: int, task: BaseMujocoTask, policy: Any, profiler: Profiler | None = None, viewer=None, shutdown_event=None, datagen_profiler: DatagenProfiler | None = None, end_on_success: bool = False) -> bool

Execute a single rollout with the given task and policy.

Parameters:

Name Type Description Default
episode_seed int

Seed for this episode

required
task BaseMujocoTask

The task to run

required
policy Any

Policy to use for action selection

required
profiler Profiler | None

Legacy Profiler instance (optional)

None
viewer

MuJoCo viewer for visualization (optional)

None
shutdown_event

Event to signal shutdown (optional)

None
datagen_profiler DatagenProfiler | None

DatagenProfiler for per-worker timing (optional)

None

Returns:

Name Type Description
bool bool

Whether the episode was successful

Source code in molmo_spaces/data_generation/pipeline.py
@staticmethod
def run_single_rollout(
    episode_seed: int,
    task: BaseMujocoTask,
    policy: Any,
    profiler: Profiler | None = None,
    viewer=None,
    shutdown_event=None,
    datagen_profiler: DatagenProfiler | None = None,
    end_on_success: bool = False,
) -> bool:
    """Execute a single rollout with the given task and policy.

    Args:
        episode_seed: Seed for this episode
        task: The task to run
        policy: Policy to use for action selection
        profiler: Legacy Profiler instance (optional)
        viewer: MuJoCo viewer for visualization (optional)
        shutdown_event: Event to signal shutdown (optional)
        datagen_profiler: DatagenProfiler for per-worker timing (optional)

    Returns:
        bool: Whether the episode was successful
    """
    if profiler is not None:
        profiler.start("rollout")
    if datagen_profiler is not None:
        datagen_profiler.start("rollout_total")
        datagen_profiler.start("rollout_reset")

    observation, _info = task.reset()

    if datagen_profiler is not None:
        datagen_profiler.end("rollout_reset")

    if viewer is not None:
        viewer.sync()

    try:
        task.env.current_model.opt.enableflags |= int(mujoco.mjtEnableBit.mjENBL_SLEEP)
    except AttributeError:
        print("Not setting mujoco sleep. Needs version >=mujoco-3.8")

    step_count = 0
    while not task.is_done():
        # Check for shutdown signal
        if shutdown_event is not None and shutdown_event.is_set():
            if datagen_profiler is not None:
                datagen_profiler.end("rollout_total")
            return False

        # Step with policy
        if profiler is not None:
            profiler.start("policy_get_action")
        if datagen_profiler is not None:
            datagen_profiler.start("policy_get_action")
        action_cmd = policy.get_action(observation)
        if profiler is not None:
            profiler.end("policy_get_action")
        if datagen_profiler is not None:
            datagen_profiler.end("policy_get_action")

        # Step the task
        if profiler is not None:
            profiler.start("task_step")
        if datagen_profiler is not None:
            datagen_profiler.start("task_step")
        if action_cmd is None:
            print("Policy returned None action, ending episode")
            break
        observation, reward, terminal, truncated, infos = task.step(action_cmd)
        if profiler is not None:
            profiler.end("task_step")
        if datagen_profiler is not None:
            datagen_profiler.end("task_step")

        step_count += 1
        # Add termination if succ
        if end_on_success and "success" in infos[0] and infos[0]["success"]:
            success = True
            break

        if viewer is not None:
            viewer.sync()

    try:
        task.env.current_model.opt.enableflags &= ~int(mujoco.mjtEnableBit.mjENBL_SLEEP)
    except AttributeError:
        print("Not setting mujoco sleep. Needs version >=mujoco-3.8")

    # Save profiler summary
    if profiler is not None:
        profiler.end("rollout")
    if datagen_profiler is not None:
        datagen_profiler.end("rollout_total")
        # Record step count for reference
        datagen_profiler.record(
            "step_count_indicator", step_count / 1000.0
        )  # Scale down to avoid confusion

    # Check success if method exists
    success = task.judge_success() if hasattr(task, "judge_success") else False

    return success
sample_task_from_spec staticmethod
sample_task_from_spec(task_sampler: JsonEvalTaskSampler, house_id: int, episode_spec: EpisodeSpec, episode_idx: int) -> BaseMujocoTask | None

Sample task - episode spec is already in the JsonEvalTaskSampler.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def sample_task_from_spec(
    task_sampler: JsonEvalTaskSampler,
    house_id: int,
    episode_spec: EpisodeSpec,
    episode_idx: int,
) -> BaseMujocoTask | None:
    """Sample task - episode spec is already in the JsonEvalTaskSampler."""
    return task_sampler.sample_task(house_index=house_id)
should_close_episode_task_sampler staticmethod
should_close_episode_task_sampler() -> bool

Close task sampler after each episode - we create per-episode.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def should_close_episode_task_sampler() -> bool:
    """Close task sampler after each episode - we create per-episode."""
    return True
should_stop_early staticmethod
should_stop_early(num_collected: int, samples_per_house: int, exp_config: MlSpacesExpConfig | None = None) -> bool

Stop early if evaluating a single episode (--idx provided) and it's been collected.

Source code in molmo_spaces/evaluation/json_eval_runner.py
@staticmethod
def should_stop_early(
    num_collected: int, samples_per_house: int, exp_config: MlSpacesExpConfig | None = None
) -> bool:
    """Stop early if evaluating a single episode (--idx provided) and it's been collected."""
    if exp_config is not None:
        eval_params = exp_config.eval_runtime_params
        if eval_params.episode_idx is not None:
            # Stop after collecting the single requested episode
            return num_collected >= 1
    return False

policy_server

Modified from: https://github.com/Physical-Intelligence/openpi/blob/main/src/openpi/serving/websocket_policy_server.py

Classes:

Name Description
MutableFloat
WebsocketPolicyServer

Serves a policy using the websocket protocol.

Functions:

Name Description
measure_elapsed

Attributes:

Name Type Description
logger

logger module-attribute

logger = getLogger(__name__)

MutableFloat dataclass

MutableFloat(value: float | None = None)

Attributes:

Name Type Description
value float | None
value class-attribute instance-attribute
value: float | None = None

WebsocketPolicyServer

WebsocketPolicyServer(policies: InferencePolicy | list[InferencePolicy], model_name: str, host: str = '0.0.0.0', port: int | None = None, metadata: dict | None = None, max_concurrency: int = 100, force_concurrent: bool = False)

Serves a policy using the websocket protocol.

Concurrent inference is supported for stateful policies via state saving. Non-stateful policies default to nonconcurrent inference unless force_concurrent is True.

In order to provide for concurrent inference, we track policy state internally in the server.

Parameters:

Name Type Description Default
policies InferencePolicy | list[InferencePolicy]

Multiple copies of the same policies to serve, requests will be balanced across the policies for concurrent inference. If a policy is passed instead of a list, it will be used as the only policy.

required
model_name str

The name of the model to serve. Will be included in the metadata.

required
host str

The host to serve the policy on.

'0.0.0.0'
port int | None

The port to serve the policy on.

None
metadata dict | None

Additional metadata to serve with the policy.

None
max_concurrency int

The maximum number of concurrent clients to serve. Ignored for non-stateful policies unless force_concurrent is True.

100
force_concurrent bool

Whether to force concurrent inference for non-stateful policies. This may cause bugs if the policy is not safe for concurrency.

False

Methods:

Name Description
serve_forever

Prepares the policy and starts the server.

Source code in molmo_spaces/evaluation/policy_server.py
def __init__(
    self,
    policies: InferencePolicy | list[InferencePolicy],
    model_name: str,
    host: str = "0.0.0.0",
    port: int | None = None,
    metadata: dict | None = None,
    max_concurrency: int = 100,
    force_concurrent: bool = False,
) -> None:
    """
    Args:
        policies: Multiple copies of the same policies to serve,
            requests will be balanced across the policies for concurrent inference.
            If a policy is passed instead of a list, it will be used as the only policy.
        model_name: The name of the model to serve. Will be included in the metadata.
        host: The host to serve the policy on.
        port: The port to serve the policy on.
        metadata: Additional metadata to serve with the policy.
        max_concurrency: The maximum number of concurrent clients to serve.
            Ignored for non-stateful policies unless force_concurrent is True.
        force_concurrent: Whether to force concurrent inference for non-stateful policies.
            This may cause bugs if the policy is not safe for concurrency.
    """
    policies = policies if isinstance(policies, list) else [policies]
    assert len(policies) > 0, "Must provide at least one policy"
    assert all(type(p) is type(policies[0]) for p in policies), (
        "All policies must be of the same type"
    )

    self._policies = policies
    self._host = host
    self._port = port
    self._metadata = metadata or {}
    self._prepared = False
    self._client_states = {}  # map of client ID to policy state
    self._policy_idx_queue: asyncio.Queue[int] = asyncio.Queue(maxsize=len(policies))
    for i in range(len(policies)):
        self._policy_idx_queue.put_nowait(i)

    self._executor = ThreadPoolExecutor(max_workers=len(policies))

    self._metadata["model_name"] = model_name

    if isinstance(self._policies[0], StatefulPolicy) or force_concurrent:
        self._server_semaphore = asyncio.Semaphore(max_concurrency)
    else:
        logger.info("Policy does not support state saving, disabling concurrency")
        self._server_semaphore = asyncio.Semaphore(1)

    logging.getLogger("websockets.server").setLevel(logging.INFO)
serve_forever
serve_forever() -> None

Prepares the policy and starts the server.

Source code in molmo_spaces/evaluation/policy_server.py
def serve_forever(self) -> None:
    """
    Prepares the policy and starts the server.
    """
    if not self._prepared:
        for future in as_completed(
            self._executor.submit(policy.prepare_model) for policy in self._policies
        ):
            future.result()
        self._prepared = True
    asyncio.run(self._run())

measure_elapsed

measure_elapsed()
Source code in molmo_spaces/evaluation/policy_server.py
@contextmanager
def measure_elapsed():
    mf = MutableFloat()
    start = time.perf_counter()
    try:
        yield mf
    finally:
        mf.value = time.perf_counter() - start

robot_eval_overrides

Functions:

Name Description
cap_robot_eval_override
get_robot_override

Attributes:

Name Type Description
OverrideFn
ROBOT_OVERRIDE_REGISTRY dict[str, OverrideFn]
log

OverrideFn module-attribute

OverrideFn = Callable[[EpisodeSpec, CameraSystemConfig], None]

ROBOT_OVERRIDE_REGISTRY module-attribute

ROBOT_OVERRIDE_REGISTRY: dict[str, OverrideFn] = {'FrankaCAPRobotConfig': cap_robot_eval_override}

log module-attribute

log = getLogger(__name__)

cap_robot_eval_override

cap_robot_eval_override(episode_spec: EpisodeSpec, camera_config: CameraSystemConfig) -> None
Source code in molmo_spaces/evaluation/robot_eval_overrides.py
def cap_robot_eval_override(
    episode_spec: EpisodeSpec,
    camera_config: CameraSystemConfig,
) -> None:
    log.info("Applying CAP robot evaluation overrides")

    camera_config.cameras[0] = MjcfCameraConfig(
        name="wrist_camera",
        mjcf_name="wrist_camera",
        robot_namespace="robot_0/gripper/",
        fov=53.0,
        fov_noise_degrees=(0.0, 0.0),
        pos_noise_range=(0.0, 0.0),
        orientation_noise_degrees=0.0,
        record_depth=True,
    )

    camera_config.cameras[1].record_depth = True
    camera_config.cameras[1].fov = 71

    rot_base = R.from_quat(episode_spec.task["robot_base_pose"][3:7], scalar_first=True).as_matrix()
    episode_spec.task["robot_base_pose"][:3] += 0.05 * rot_base[0:3, 0]
    episode_spec.task["robot_base_pose"][2] -= 0.2

    camera_config.img_resolution = (960, 720)

    episode_spec.robot.init_qpos = {
        "base": [],
        "arm": [[0, -1.5, 0.116, -2.45, 0, 0.842, 0.965]],
        "gripper": [0.00296, 0.00296],
    }

get_robot_override

get_robot_override(robot_config) -> OverrideFn | None
Source code in molmo_spaces/evaluation/robot_eval_overrides.py
def get_robot_override(robot_config) -> OverrideFn | None:
    robot_class_name = robot_config.__class__.__name__
    override_fn = ROBOT_OVERRIDE_REGISTRY.get(robot_class_name)

    if override_fn is not None:
        log.info(f"Found robot override for {robot_class_name}")
        return override_fn

    return None