Skip to content

renderer

renderer

Modules:

Name Description
abstract_renderer
filament_rendering
offline_renderers
opengl_context
opengl_rendering

abstract_renderer

Classes:

Name Description
MjAbstractRenderer
MultithreadRenderer

Attributes:

Name Type Description
RENDERING_COMPLETE

RENDERING_COMPLETE module-attribute

RENDERING_COMPLETE = 'RENDERING_COMPLETE'

MjAbstractRenderer

MjAbstractRenderer(model_bindings: MjModelBindings = None, device_id: int | None = None, model: MjModel = None, **kwargs)

Bases: ABC

Methods:

Name Description
close
render
reset_single

Attributes:

Name Type Description
device_id
model
model_bindings
render_outputs list[Any]
Source code in molmo_spaces/renderer/abstract_renderer.py
def __init__(
    self,
    model_bindings: MjModelBindings = None,
    device_id: int | None = None,
    model: MjModel = None,
    **kwargs,
) -> None:
    self._model_bindings = model_bindings
    self._model = model
    self.device_id = device_id
device_id instance-attribute
device_id = device_id
model property
model
model_bindings property
model_bindings
render_outputs instance-attribute
render_outputs: list[Any]
close abstractmethod
close() -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
@abc.abstractmethod
def close(self) -> None:
    raise NotImplementedError
render abstractmethod
render(*args, **kwargs) -> Any
Source code in molmo_spaces/renderer/abstract_renderer.py
@abc.abstractmethod
def render(self, *args, **kwargs) -> Any:
    raise NotImplementedError
reset_single
reset_single(idx: int) -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
def reset_single(self, idx: int) -> None:
    return None

MultithreadRenderer

MultithreadRenderer(env: MuJoCoVectorEnv, renderer_cls: type[MjAbstractRenderer], max_render_contexts: int | None = 1000000, process_request_kwargs: dict | None = None, **additional_rendering_thread_runner_kwargs: Any)

Bases: ABC

Methods:

Name Description
__del__
process_request
render
rendering_thread_runner

Attributes:

Name Type Description
env
max_render_contexts
model_id_to_render_input_queue dict[int, Queue] | None
model_id_to_render_threads dict[int, list[Thread]] | None
render_output_queue Queue | None
Source code in molmo_spaces/renderer/abstract_renderer.py
def __init__(
    self,
    env: "MuJoCoVectorEnv",
    renderer_cls: type[MjAbstractRenderer],
    max_render_contexts: int | None = 1_000_000,
    process_request_kwargs: dict | None = None,
    **additional_rendering_thread_runner_kwargs: Any,
) -> None:
    self._closed = False

    self.env = env
    self.max_render_contexts = max_render_contexts or 1

    self.model_id_to_render_threads: dict[int, list[threading.Thread]] | None = None
    self.model_id_to_render_input_queue: dict[int, Queue] | None = None
    self.render_output_queue: Queue | None = None

    model_id_to_count = Counter(id(model) for model in env.mj_models)

    if self.max_render_contexts > 0:
        assert self.max_render_contexts >= len(model_id_to_count), (
            f"max_render_contexts ({self.max_render_contexts}) must be greater than or equal to "
            f"the number of unique models ({len(model_id_to_count)})"
        )

    self.model_id_to_render_threads = defaultdict(list)
    self.model_id_to_render_input_queue = defaultdict(Queue)
    self.render_output_queue = Queue()

    num_render_contexts = min(self.max_render_contexts, sum(model_id_to_count.values()))
    num_render_threads_started = 0

    while num_render_threads_started < num_render_contexts:
        for model_id, count in list(model_id_to_count.items()):
            if count > 0:
                model_id_to_count[model_id] -= 1

                self.model_id_to_render_threads[model_id].append(
                    threading.Thread(
                        target=self.rendering_thread_runner,
                        kwargs=dict(
                            renderer_cls=renderer_cls,
                            model_bindings=env.model_id_to_model_container[model_id],
                            device=env.device,
                            input_queue=self.model_id_to_render_input_queue[model_id],
                            output_queue=self.render_output_queue,
                            process_request_callback=self.process_request,
                            process_request_kwargs=process_request_kwargs,
                            **additional_rendering_thread_runner_kwargs,
                        ),
                    )
                )
                self.model_id_to_render_threads[model_id][-1].start()
                num_render_threads_started += 1

                if num_render_threads_started == num_render_contexts:
                    return
env instance-attribute
env = env
max_render_contexts instance-attribute
max_render_contexts = max_render_contexts or 1
model_id_to_render_input_queue instance-attribute
model_id_to_render_input_queue: dict[int, Queue] | None = defaultdict(Queue)
model_id_to_render_threads instance-attribute
model_id_to_render_threads: dict[int, list[Thread]] | None = defaultdict(list)
render_output_queue instance-attribute
render_output_queue: Queue | None = Queue()
__del__
__del__() -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
def __del__(self) -> None:
    if self._closed:
        return

    self._closed = True

    try:
        for _idx, model in enumerate(self.env.mj_models):
            self.model_id_to_render_input_queue[id(model)].put(RENDERING_COMPLETE)

        for threads in self.model_id_to_render_threads.values():
            for thread in threads:
                thread.join(0.1)

        self.model_id_to_render_threads.clear()
        self.model_id_to_render_input_queue.clear()
    except (KeyboardInterrupt, SystemExit):
        raise
    except ValueError:
        print("While closing MultithreadRenderer")
process_request abstractmethod staticmethod
process_request(renderer: MjAbstractRenderer, request: Any, output_queue: Queue, **kwargs) -> NoReturn
Source code in molmo_spaces/renderer/abstract_renderer.py
@staticmethod
@abc.abstractmethod
def process_request(
    renderer: MjAbstractRenderer, request: Any, output_queue: Queue, **kwargs
) -> NoReturn:
    raise NotImplementedError
render abstractmethod
render(*args, **kwargs) -> Any
Source code in molmo_spaces/renderer/abstract_renderer.py
@abc.abstractmethod
def render(self, *args, **kwargs) -> Any:
    raise NotImplementedError
rendering_thread_runner staticmethod
rendering_thread_runner(renderer_cls: type[MjAbstractRenderer], process_request_callback: Callable[[MjAbstractRenderer, Any, Queue, dict | None], None], model_bindings: MjModelBindings, device: int | None, input_queue: Queue, output_queue: Queue, timeout: int | None = None, process_request_kwargs: dict | None = None, **kwargs: Any) -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
@staticmethod
def rendering_thread_runner(
    renderer_cls: type[MjAbstractRenderer],
    process_request_callback: Callable[[MjAbstractRenderer, Any, Queue, dict | None], None],
    model_bindings: MjModelBindings,
    device: int | None,
    input_queue: Queue,
    output_queue: Queue,
    timeout: int | None = None,
    process_request_kwargs: dict | None = None,
    **kwargs: Any,
) -> None:
    renderer = renderer_cls(model_bindings=model_bindings, device_id=device, **kwargs)

    print(
        f"Rendering thread started with renderer {renderer_cls.__name__} for model {id(model_bindings.model)}"
    )
    process_request_kwargs = process_request_kwargs or {}
    try:
        while True:
            request = input_queue.get(block=True, timeout=timeout)

            if request == RENDERING_COMPLETE:
                print(
                    f"Rendering thread for model {id(model_bindings.model)} received RENDERING_COMPLETE"
                )
                break

            process_request_callback(renderer, request, output_queue, **process_request_kwargs)
    finally:
        renderer.close()

filament_rendering

Classes:

Name Description
MjFilamentRenderer

Functions:

Name Description
prepare_locals_for_super

Attributes:

Name Type Description
args
data
image
model
model_path
parser
pil_image
renderer

args module-attribute

args = parse_args()

data module-attribute

data = MjData(model)

image module-attribute

image = render()

model module-attribute

model = from_xml_path(as_posix())

model_path module-attribute

model_path = Path(model)

parser module-attribute

parser = ArgumentParser()

pil_image module-attribute

pil_image = fromarray(image)

renderer module-attribute

renderer = MjFilamentRenderer(model=model)

MjFilamentRenderer

MjFilamentRenderer(model_bindings: MjModelBindings = None, device_id: int | None = None, height: int = 720, width: int = 1280, max_geom: int = 10000, model: MjModel | None = None, **kwargs: Any)

Bases: MjAbstractRenderer

Methods:

Name Description
close
disable_depth_rendering
disable_segmentation_rendering
enable_depth_rendering
enable_segmentation_rendering
geomid_to_bodyid
mark_textures_dirty
render
render_rgb
reset_single
update
upload_textures

Attributes:

Name Type Description
device_id
height
model
model_bindings
render_outputs list[Any]
scene MjvScene
width
Source code in molmo_spaces/renderer/filament_rendering.py
def __init__(
    self,
    model_bindings: MjModelBindings = None,
    device_id: int | None = None,
    height: int = 720,
    width: int = 1280,
    max_geom: int = 10000,
    model: mj.MjModel | None = None,
    **kwargs: Any,
) -> None:
    assert model_bindings is not None or model is not None, (
        "model_bindings or model must be provided"
    )
    super().__init__(**prepare_locals_for_super(locals()))

    self._width = width
    self._height = height

    if model_bindings is not None and model is not None:
        assert model_bindings.model == model, "model_bindings and model must be the same"
    model = model_bindings.model if model_bindings is not None else model
    self._model = model

    self._scene = mj.MjvScene(model=model, maxgeom=max_geom)
    self._scene_option = mj.MjvOption()

    # Turn off site rendering
    self._scene_option.sitegroup *= 0

    # Enable shadow rendering by default (shadows are controlled by lights with castshadow enabled)
    self._scene.flags[mj.mjtRndFlag.mjRND_SHADOW] = True

    self._mjr_context = mj.MjrContext(model, mj.mjtFontScale.mjFONTSCALE_150.value)
    # mj.mjr_resizeOffscreen(width, height, self._mjr_context)
    mj.mjr_setBuffer(mj.mjtFramebuffer.mjFB_OFFSCREEN.value, self._mjr_context)
    self._mjr_context.readDepthMap = mj.mjtDepthMap.mjDEPTH_ZEROFAR

    # Default render flags.
    self._depth_rendering = False
    self._segmentation_rendering = False

    # Track if textures need to be uploaded (set to True when textures are modified)
    # NOTE: We start with False because textures are loaded from model at MjrContext creation
    # We only need to upload if textures are modified AFTER renderer initialization
    self._textures_need_upload = False
device_id instance-attribute
device_id = device_id
height property
height
model property
model
model_bindings property
model_bindings
render_outputs instance-attribute
render_outputs: list[Any]
scene property
scene: MjvScene
width property
width
close
close() -> None
Source code in molmo_spaces/renderer/filament_rendering.py
def close(self) -> None:
    if hasattr(self, "_mjr_context") and self._mjr_context:
        self._mjr_context.free()
    self._mjr_context = None
disable_depth_rendering
disable_depth_rendering() -> None
Source code in molmo_spaces/renderer/filament_rendering.py
def disable_depth_rendering(self) -> None:
    self._depth_rendering = False
disable_segmentation_rendering
disable_segmentation_rendering() -> None
Source code in molmo_spaces/renderer/filament_rendering.py
def disable_segmentation_rendering(self) -> None:
    self._segmentation_rendering = False
enable_depth_rendering
enable_depth_rendering() -> None
Source code in molmo_spaces/renderer/filament_rendering.py
def enable_depth_rendering(self) -> None:
    self._segmentation_rendering = False
    self._depth_rendering = True
enable_segmentation_rendering
enable_segmentation_rendering() -> None
Source code in molmo_spaces/renderer/filament_rendering.py
def enable_segmentation_rendering(self) -> None:
    self._segmentation_rendering = True
    self._depth_rendering = False
geomid_to_bodyid
geomid_to_bodyid(geomid)
Source code in molmo_spaces/renderer/filament_rendering.py
def geomid_to_bodyid(self, geomid):
    return self.model.geom_bodyid[geomid]
mark_textures_dirty
mark_textures_dirty() -> None
Source code in molmo_spaces/renderer/filament_rendering.py
def mark_textures_dirty(self) -> None:
    self._textures_need_upload = True
render
render(*, out: ndarray | None = None, width: int | None = None, height: int | None = None) -> ndarray
Source code in molmo_spaces/renderer/filament_rendering.py
def render(
    self,
    *,
    out: np.ndarray | None = None,
    width: int | None = None,
    height: int | None = None,
) -> np.ndarray:
    height = height or self._height
    width = width or self._width
    rect = mj.MjrRect(0, 0, width, height)

    original_flags = self._scene.flags.copy()

    # Enable shadow rendering (required for shadows to appear in rendered images)
    # Shadows are controlled by lights with castshadow enabled
    self._scene.flags[mj.mjtRndFlag.mjRND_SHADOW] = True

    # Using segmented rendering for depth makes the calculated depth more
    # accurate at far distances.
    if self._depth_rendering or self._segmentation_rendering:
        self._scene.flags[mj.mjtRndFlag.mjRND_SEGMENT] = True
        self._scene.flags[mj.mjtRndFlag.mjRND_IDCOLOR] = True

    # Upload textures to GPU before rendering if textures have been modified
    # This is necessary when textures are modified via model.tex_data
    # Only upload when needed to avoid performance overhead
    if self._textures_need_upload:
        self.upload_textures()
        self._textures_need_upload = False

    if self._depth_rendering:
        out_shape = (rect.height, rect.width)
        out_dtype = np.float32
    else:
        out_shape = (rect.height, rect.width, 3)
        out_dtype = np.uint8

    if out is None:
        out = np.empty(out_shape, dtype=out_dtype)
    else:
        if out.shape != out_shape:
            raise ValueError(
                f"Expected `out.shape == {out_shape}`. Got `out.shape={out.shape}`"
                " instead. When using depth rendering, the out array should be of"
                " shape `(width, height)` and otherwise (width, height, 3)."
                f" Got `(self.height, self.width)={(self.height, self.width)}` and"
                f" `self._depth_rendering={self._depth_rendering}`."
            )

    # Render scene and read contents of RGB and depth buffers.
    mj.mjr_render(rect, self._scene, self._mjr_context)

    if self._depth_rendering:
        mj.mjr_readPixels(rgb=None, depth=out, viewport=rect, con=self._mjr_context)

        # Get the distances to the near and far clipping planes.
        extent = self.model.stat.extent
        near = self.model.vis.map.znear * extent
        far = self.model.vis.map.zfar * extent

        # Calculate OpenGL perspective matrix values in float32 precision
        # so they are close to what glFrustum returns
        # https://registry.khronos.org/OpenGL-Refpages/gl2.1/xhtml/glFrustum.xml
        zfar = np.float32(far)
        znear = np.float32(near)
        c_coef = -(zfar + znear) / (zfar - znear)
        d_coef = -(np.float32(2) * zfar * znear) / (zfar - znear)

        # In reverse Z mode the perspective matrix is transformed by the following
        c_coef = np.float32(-0.5) * c_coef - np.float32(0.5)
        d_coef = np.float32(-0.5) * d_coef

        # We need 64 bits to convert Z from ndc to metric depth without noticeable
        # losses in precision
        out_64 = out.astype(np.float64)

        # Undo OpenGL projection
        # Note: We do not need to take action to convert from window coordinates
        # to normalized device coordinates because in reversed Z mode the mapping
        # is identity
        out_64 = d_coef / (out_64 + c_coef)

        # Cast result back to float32 for backwards compatibility
        # This has a small accuracy cost
        out[:] = out_64.astype(np.float32)

        # Reset scene flags.
        np.copyto(self._scene.flags, original_flags)
    elif self._segmentation_rendering:
        mj.mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)

        # Convert 3-channel uint8 to 1-channel uint32.
        image3 = out.astype(np.uint32)
        segimage = image3[:, :, 0] + image3[:, :, 1] * (2**8) + image3[:, :, 2] * (2**16)
        # Remap segid to 3-channel (object ID, object type, body ID) triplet
        # Seg ID 0 is background -- will be remapped to (-1, -1, -1).

        # Find the maximum segment ID in the image to size the output array correctly
        max_segid = np.max(segimage) if segimage.size > 0 else 0

        # Create output array with size to accommodate all possible segment IDs
        # Add 1 to account for 0-based indexing and ensure we have enough space
        segid2output = np.full((max_segid + 1, 3), fill_value=-1, dtype=np.int32)

        visible_geoms = [g for g in self._scene.geoms[: self._scene.ngeom] if g.segid != -1]
        visible_segids = np.array([g.segid + 1 for g in visible_geoms], np.int32)
        visible_objid = np.array([g.objid for g in visible_geoms], np.int32)
        visible_objtype = np.array([g.objtype for g in visible_geoms], np.int32)
        visible_bodyid = np.array(
            [self.geomid_to_bodyid(g.objid) for g in visible_geoms], np.int32
        )

        # Only set values for valid segment IDs that are within bounds
        valid_mask = (visible_segids >= 0) & (visible_segids < segid2output.shape[0])
        if np.any(valid_mask):
            segid2output[visible_segids[valid_mask], 0] = visible_objid[valid_mask]
            segid2output[visible_segids[valid_mask], 1] = visible_objtype[valid_mask]
            segid2output[visible_segids[valid_mask], 2] = visible_bodyid[valid_mask]

        out = segid2output[segimage]

        # Reset scene flags.
        np.copyto(self._scene.flags, original_flags)
    else:
        mj.mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)
        mj.mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)

    return out
render_rgb
render_rgb(*, out: ndarray | None = None, width: int | None = None, height: int | None = None) -> ndarray
Source code in molmo_spaces/renderer/filament_rendering.py
def render_rgb(
    self,
    *,
    out: np.ndarray | None = None,
    width: int | None = None,
    height: int | None = None,
) -> np.ndarray:
    height = height or self._height
    width = width or self._width
    rect = mj.MjrRect(0, 0, width, height)

    # Enable shadow rendering (required for shadows to appear in rendered images)
    # Shadows are controlled by lights with castshadow enabled
    self._scene.flags[mj.mjtRndFlag.mjRND_SHADOW] = True

    # Using segmented rendering for depth makes the calculated depth more
    # accurate at far distances.
    if self._depth_rendering or self._segmentation_rendering:
        self._scene.flags[mj.mjtRndFlag.mjRND_SEGMENT] = True
        self._scene.flags[mj.mjtRndFlag.mjRND_IDCOLOR] = True

    # Upload textures to GPU before rendering if textures have been modified
    # This is necessary when textures are modified via model.tex_data
    # Only upload when needed to avoid performance overhead
    if self._textures_need_upload:
        self.upload_textures()
        self._textures_need_upload = False

    if self._depth_rendering:
        out_shape = (rect.height, rect.width)
        out_dtype = np.float32
    else:
        out_shape = (rect.height, rect.width, 3)
        out_dtype = np.uint8

    if out is None:
        out = np.empty(out_shape, dtype=out_dtype)
    else:
        if out.shape != out_shape:
            raise ValueError(
                f"Expected `out.shape == {out_shape}`. Got `out.shape={out.shape}`"
                " instead. When using depth rendering, the out array should be of"
                " shape `(width, height)` and otherwise (width, height, 3)."
                f" Got `(self.height, self.width)={(self.height, self.width)}` and"
                f" `self._depth_rendering={self._depth_rendering}`."
            )

    # Render scene and read contents of RGB and depth buffers.
    mj.mjr_render(rect, self._scene, self._mjr_context)

    if self._depth_rendering:
        mj.mjr_readPixels(rgb=None, depth=out, viewport=rect, con=self._mjr_context)
    elif self._segmentation_rendering:
        mj.mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)
    else:
        mj.mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)
        mj.mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)

    return out
reset_single
reset_single(idx: int) -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
def reset_single(self, idx: int) -> None:
    return None
update
update(data: MjData, camera: int | str | MjvCamera = -1, scene_option: MjvOption | None = None) -> None
Source code in molmo_spaces/renderer/filament_rendering.py
def update(
    self,
    data: mj.MjData,
    camera: int | str | mj.MjvCamera = -1,
    scene_option: mj.MjvOption | None = None,
) -> None:
    if not isinstance(camera, mj.MjvCamera):
        camera_id = camera
        if isinstance(camera_id, str):
            camera_id = mj.mj_name2id(self.model, mj.mjtObj.mjOBJ_CAMERA.value, camera_id)
            if camera_id == -1:
                raise ValueError(f'The camera "{camera}" does not exist.')
        if camera_id < -1 or camera_id >= self.model.ncam:
            raise ValueError(
                f"The camera id {camera_id} is out of range [-1, {self.model.ncam})."
            )

        # Render camera.
        camera = mj.MjvCamera()
        camera.fixedcamid = camera_id

        # Defaults to mjCAMERA_FREE, otherwise mjCAMERA_FIXED refers to a
        # camera explicitly defined in the model_bindings.
        if camera_id == -1:
            camera.type = mj.mjtCamera.mjCAMERA_FREE
            mj.mjv_defaultFreeCamera(self.model, camera)
        else:
            camera.type = mj.mjtCamera.mjCAMERA_FIXED

    scene_option = scene_option or self._scene_option
    mj.mjv_updateScene(
        self.model,
        data,
        scene_option,
        None,
        camera,
        mj.mjtCatBit.mjCAT_ALL.value,
        self._scene,
    )
upload_textures
upload_textures(data: MjData | None = None) -> None
Source code in molmo_spaces/renderer/filament_rendering.py
def upload_textures(self, data: mj.MjData | None = None) -> None:
    if self.model.ntex == 0:
        log.debug("upload_textures(): Skipping - no textures in model (ntex == 0)")
        return

    for tex_id in range(self.model.ntex):
        mj.mjr_uploadTexture(self.model, self._mjr_context, tex_id)

prepare_locals_for_super

prepare_locals_for_super(local_vars, args_name='args', kwargs_name='kwargs', ignore_kwargs=False)
Source code in molmo_spaces/renderer/filament_rendering.py
def prepare_locals_for_super(
    local_vars, args_name="args", kwargs_name="kwargs", ignore_kwargs=False
):
    assert args_name not in local_vars, f"`prepare_locals_for_super` does not support {args_name}."
    new_locals = {k: v for k, v in local_vars.items() if k != "self" and "__" not in k}
    if kwargs_name in new_locals:
        if ignore_kwargs:
            new_locals.pop(kwargs_name)
        else:
            kwargs = new_locals.pop(kwargs_name)
            kwargs.update(new_locals)
            new_locals = kwargs
    return new_locals

offline_renderers

Modules:

Name Description
domain_randomization
omniverse_renderer

Offline renderer for rendering scenes from saved MujocoState dataset

opengl_rendrerer

domain_randomization

Classes:

Name Description
BaseDomainRandomizationOfflineRenderer

Abstract base class for offline renderers that support domain randomization.

BaseDomainRandomizationOfflineRenderer
BaseDomainRandomizationOfflineRenderer(episode_path: str, renderer: Any, device_id: int | None = None, **kwargs: Any)

Bases: MjAbstractRenderer, ABC

Abstract base class for offline renderers that support domain randomization.

This class performs the following
  1. Loads an episode's metadata (e.g., the scene XML path) to create MuJoCo model bindings.
  2. Initializes the MuJoCo data for simulation.
  3. Accepts an externally provided renderer (e.g., OpenGL, Omniverse, Madrona).
Subclasses must implement
  • randomize(): to modify the model/renderer properties (lights, shadows, textures, etc.)
This implementation applies random modifications to the scene
  • Lights: Randomizes intensity and color.
  • Shadows: Randomizes (if supported by the renderer) a shadow softness parameter.
  • Textures: Chooses a texture from a provided pool.

Parameters:

Name Type Description Default
episode_path str

Path to the episode folder (should contain task_metadata.json and state_data.npz).

required
renderer Any

Instance of the rendering backend (OpenGL, Omniverse, Madrona, etc.).

required
device_id Optional[int]

Device identifier (e.g., GPU id) if applicable.

None
kwargs Any

Any additional kwargs for the MjAbstractRenderer initialization.

{}

Methods:

Name Description
close
randomize

Apply domain randomizations to the scene.

render
reset_single

Attributes:

Name Type Description
data
device_id
episode_path
model
model_bindings
render_outputs list[Any]
renderer
Source code in molmo_spaces/renderer/offline_renderers/domain_randomization.py
def __init__(
    self, episode_path: str, renderer: Any, device_id: int | None = None, **kwargs: Any
) -> None:
    """
    Args:
        episode_path (str): Path to the episode folder (should contain task_metadata.json and state_data.npz).
        renderer (Any): Instance of the rendering backend (OpenGL, Omniverse, Madrona, etc.).
        device_id (Optional[int]): Device identifier (e.g., GPU id) if applicable.
        kwargs: Any additional kwargs for the MjAbstractRenderer initialization.
    """
    self.episode_path = episode_path

    # Load metadata (for example, the scene XML location)
    metadata_file = os.path.join(episode_path, "task_metadata.json")
    with open(metadata_file, "r") as f:
        metadata = json.load(f)
    scene_path = metadata["scene_path"]

    # Create model bindings from the scene XML.
    model_bindings = MjModelBindings.from_xml_path(scene_path)
    super().__init__(model_bindings, device_id=device_id, **kwargs)

    # Create the MuJoCo simulation data.
    self.data = MjData(self.model)

    # Use the externally provided renderer.
    self.renderer = renderer
data instance-attribute
data = MjData(model)
device_id instance-attribute
device_id = device_id
episode_path instance-attribute
episode_path = episode_path
model property
model
model_bindings property
model_bindings
render_outputs instance-attribute
render_outputs: list[Any]
renderer instance-attribute
renderer = renderer
close abstractmethod
close() -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
@abc.abstractmethod
def close(self) -> None:
    raise NotImplementedError
randomize
randomize() -> None

Apply domain randomizations to the scene.

This method should perform modifications such as altering light intensities/colors, shadow properties, textures, etc.

Source code in molmo_spaces/renderer/offline_renderers/domain_randomization.py
def randomize(self) -> None:
    """
    Apply domain randomizations to the scene.

    This method should perform modifications such as altering
    light intensities/colors, shadow properties, textures, etc.
    """
    if self.randomize_lights:
        self._randomize_lights()
    if self.randomize_shadows:
        self._randomize_shadows()
    if self.randomize_textures:
        self._randomize_textures()
render abstractmethod
render(*args, **kwargs) -> Any
Source code in molmo_spaces/renderer/abstract_renderer.py
@abc.abstractmethod
def render(self, *args, **kwargs) -> Any:
    raise NotImplementedError
reset_single
reset_single(idx: int) -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
def reset_single(self, idx: int) -> None:
    return None

omniverse_renderer

Offline renderer for rendering scenes from saved MujocoState dataset

Omniverse RTX Renderer https://docs.omniverse.nvidia.com/materials-and-rendering/latest/rtx-renderer.html

Kit/Issac Sim Min Requirements: - 535.129.03 Linux (RTX 3070/Quadro) - Intel i7 Gen5 or AMD Ryzen - 16GB/32GB RAM - Ubuntu 20.04 or 22.04

Classes:

Name Description
ImageWriter
OfflineOmniverseRenderer

For photorealistic RGB rendering

Functions:

Name Description
check_if_camera_resolution_valid
check_if_camera_valid
load_state

Attributes:

Name Type Description
app_launcher
offline_omniverse_renderer
simulation_app
app_launcher module-attribute
app_launcher = AppLauncher(enagle_cameras=True, headless=True)
offline_omniverse_renderer module-attribute
offline_omniverse_renderer = OfflineOmniverseRenderer()
simulation_app module-attribute
simulation_app = app
ImageWriter
ImageWriter(output_dir: str, image_format: str = 'png', rgb: bool = True, normals: bool = False, semantic_segmentation: bool = True, frame_padding: int = 4)

Bases: Writer

Methods:

Name Description
write

Write function called from the OgnWriter node on every frame to process annotator output.

Attributes:

Name Type Description
annotators
normals
output_dir
rgb
semantic_segmentation
write_ready
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def __init__(
    self,
    output_dir: str,
    image_format: str = "png",
    rgb: bool = True,
    normals: bool = False,
    semantic_segmentation: bool = True,
    frame_padding: int = 4,
) -> None:
    self.output_dir = output_dir
    if output_dir:
        self._backend = rep.BackendDispatch(output_dir=output_dir)
    self.write_ready = False
    self._frame_id = 0
    self._frame_padding = frame_padding
    self._sequence_id = 0
    self._image_format = image_format
    self.rgb = rgb
    self.normals = normals
    self.semantic_segmentation = semantic_segmentation

    self.annotators = []
    if rgb:
        self.annotators.append(rep.Annotator(rep.AnnotatorType.RGB))
    if normals:
        self.annotators.append(rep.Annotator(rep.AnnotatorType.NORMALS))
    if semantic_segmentation:
        self.annotators.append(rep.Annotator(rep.AnnotatorType.SEMANTIC_SEGMENTATION))
annotators instance-attribute
annotators = []
normals instance-attribute
normals = normals
output_dir instance-attribute
output_dir = output_dir
rgb instance-attribute
rgb = rgb
semantic_segmentation instance-attribute
semantic_segmentation = semantic_segmentation
write_ready instance-attribute
write_ready = False
write
write(data: dict) -> None

Write function called from the OgnWriter node on every frame to process annotator output.

Parameters:

Name Type Description Default
data dict

A dictionary containing the annotator data for the current frame.

required

https://github.com/ARISE-Initiative/robosuite/blob/master/robosuite/scripts/render_dataset_with_omniverse.py#L440

Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def write(self, data: dict) -> None:
    """
    Write function called from the OgnWriter node on every frame to process annotator output.

    Args:
        data: A dictionary containing the annotator data for the current frame.
    https://github.com/ARISE-Initiative/robosuite/blob/master/robosuite/scripts/render_dataset_with_omniverse.py#L440
    """
    if self.write_ready:
        for ann_name, ann_data in data["annotators"].items():
            for _idx, (camera_name, data) in enumerate(ann_data.items()):
                file_name = (
                    Path(camera_name).stem
                    / f"{self._frame_id:0{self._frame_padding}d}.{self._image_format}"
                )
                if ann_name == "rgb":
                    filepath = Path(self.output_dir) / "rgb" / file_name
                    self._backend.write_image(filepath, data["data"])
                elif ann_name == "normals":
                    raise NotImplementedError("Normals are not supported")
                elif ann_name == "semantic_segmentation":
                    filepath = Path(self.output_dir) / "semantic_segmentation" / file_name
                    self._backend.write_image(filepath, data["data"])
        self._frame_id += 1
OfflineOmniverseRenderer
OfflineOmniverseRenderer(output_dir: str, camera_names: list[str], camera_resolution: list[tuple[int, int]], num_frames: int = 1000)

For photorealistic RGB rendering

reference: https://github.com/ARISE-Initiative/robosuite/blob/master/robosuite/scripts/render_dataset_with_omniverse.py

Methods:

Name Description
add_camera
add_light
from_path
from_usd_state_data_path
reset
save_rendering
start_rendering

Attributes:

Name Type Description
camera_names
camera_resolution
initial_skip
num_frames
rgb_writer
rt_subframes
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def __init__(
    self,
    output_dir: str,
    camera_names: list[str],
    camera_resolution: list[tuple[int, int]],
    num_frames: int = 1000,
) -> None:
    self.rgb_writer = ImageWriter(output_dir=output_dir)
    self.num_frames = num_frames  # same as number of traj_len
    self.rt_subframes = 1
    self.initial_skip = 0
    self.camera_names = camera_names
    self.camera_resolution = camera_resolution
camera_names property writable
camera_names
camera_resolution property writable
camera_resolution
initial_skip instance-attribute
initial_skip = 0
num_frames instance-attribute
num_frames = num_frames
rgb_writer instance-attribute
rgb_writer = ImageWriter(output_dir=output_dir)
rt_subframes instance-attribute
rt_subframes = 1
add_camera
add_camera(pos, rotation_xyz, obj_name) -> NoReturn
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def add_camera(self, pos, rotation_xyz, obj_name) -> NoReturn:
    self.exporter.add_camera(pos, rotation_xyz, obj_name)
    raise NotImplementedError("add_camera is not implemented for OfflineOmniverseRenderer")
add_light
add_light(pos, intensity, radius, color, obj_name, light_type) -> NoReturn
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def add_light(self, pos, intensity, radius, color, obj_name, light_type) -> NoReturn:
    self.exporter.add_light(pos, intensity, radius, color, obj_name, light_type)
    raise NotImplementedError("add_light is not implemented for OfflineOmniverseRenderer")
from_path classmethod
from_path(usd_path: str, output_dir: str = None)
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
@classmethod
def from_path(cls, usd_path: str, output_dir: str = None):
    load_state(usd_path)
    if output_dir is None:
        output_dir = os.path.dirname(usd_path)
    return cls(output_dir=output_dir)
from_usd_state_data_path classmethod
from_usd_state_data_path(usd_path: str, state_path: str, output_dir: str = None)
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
@classmethod
def from_usd_state_data_path(cls, usd_path: str, state_path: str, output_dir: str = None):
    load_state(usd_path)
    # read camera names and num_frames from render_metadata.txt
    render_metadata_path = Path(usd_path).parent / "render_metadata.txt"
    assert render_metadata_path.exists(), "render_metadata.txt does not exist"
    with open(render_metadata_path, "r") as f:
        metadata = {}
        for line in f.readlines():
            if ":" not in line:
                continue
            key, value = line.strip().split(":", 1)
            metadata[key.strip()] = value.strip()

    camera_names = eval(
        metadata.get("camera_names", "[]")
    )  # Convert string representation to list
    camera_resolution = eval(
        metadata.get("camera_resolution", "[]")
    )  # Convert string representation to list of tuples
    num_frames = int(metadata.get("num_frames", "0"))
    if output_dir is None:
        output_dir = os.path.dirname(usd_path)
    return cls(
        output_dir=output_dir,
        camera_names=camera_names,
        camera_resolution=camera_resolution,
        num_frames=num_frames,
    )
reset
reset() -> None
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def reset(self) -> None:
    self.camera_resolution = None
    self.render_products = []
    self.num_frames = 0
save_rendering
save_rendering(output_dir: str) -> None
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def save_rendering(self, output_dir: str) -> None:
    # save renderings
    pass
start_rendering
start_rendering() -> None
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def start_rendering(self) -> None:
    # start rendering
    ## start_recorder()
    self._init()

    # run_recording_loop()
    for _ in range(self.initial_skip):
        rep.orchestrator.step(rt_subframes=1, delta_time=None, pause_timeline=False)

    timeline = omni.timeline.get_timeline_interface()
    timeline.set_end_time(self.num_frames)

    with tqdm(total=self.num_frames) as pbar:
        for _ in range(self.num_frames):  # are we rendering one image at a time?
            timeline.forward_one_frame()
            rep.orchestrator.step(
                rt_subframes=self.rt_subframes, delta_time=None, pause_timeline=True
            )
            pbar.update(1)

    # finish_recording()
    timeline.stop()
    rep.orchestrator.wait_until_complete()

    # clear_recorder()
    self.writer.detach()
    self.writer = None
    for rp in self.render_products:
        rp.destroy()
    self.render_products = []
    stage_utils.clear_state()
    stage_utils.update_stage()
check_if_camera_resolution_valid
check_if_camera_resolution_valid(camera_resolution: tuple[int, int]) -> None
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def check_if_camera_resolution_valid(camera_resolution: tuple[int, int]) -> None:
    pass
check_if_camera_valid
check_if_camera_valid(camera_path) -> bool
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def check_if_camera_valid(camera_path) -> bool:
    context = omni.usd.get_context()
    stage = context.get_stage()
    camera_prim = stage.GetPrimAtPath(camera_path)

    if not camera_prim.IsValid():
        print(f"Camera at path {camera_path} is not valid")
        return False
    if camera_prim.GetTypeName() == "Camera":
        return True
    else:
        print(f"{camera_prim} is not a Camera type")
        return False
load_state
load_state(self, usd_path: str = None) -> None
Source code in molmo_spaces/renderer/offline_renderers/omniverse_renderer.py
def load_state(self, usd_path: str = None) -> None:
    # open USD stage
    stage_utils.open_stage(usd_path)

opengl_rendrerer

Classes:

Name Description
MultithreadedDomainRandomizationOfflineOpenGLRenderer

For Depth and Segmentation rendering using OpenGL renderer

MultithreadedDomainRandomizationOfflineOpenGLRenderer
MultithreadedDomainRandomizationOfflineOpenGLRenderer(env: MuJoCoVectorEnv, device_id: int = 0, renderer_cls: type[MjAbstractRenderer] = MjOpenGLRenderer, max_render_contexts: int | None = None, namespace: str = 'robot_0/', width: int = 1280, height: int = 720, randomize_lights: bool = True, randomize_shadows: bool = True, randomize_textures: bool = True, light_config_range: dict | None = None, shadow_config_range: dict | None = None, textures_pool: list | None = None, **kwargs: Any)

Bases: MultithreadRenderer, BaseDomainRandomizationOfflineRenderer

For Depth and Segmentation rendering using OpenGL renderer For RGB rendering using Default OpenGL renderer

Methods:

Name Description
__del__
close
from_dataset
process_request
randomize

Apply domain randomizations to the scene.

render
rendering_thread_runner
reset_single

Attributes:

Name Type Description
data
device_id
env
episode_path
light_config_range
max_render_contexts
model
model_bindings
model_id_to_render_input_queue dict[int, Queue] | None
model_id_to_render_threads dict[int, list[Thread]] | None
multithreaded_render
randomize_lights
randomize_shadows
randomize_textures
render_output_queue Queue | None
render_outputs list[Any]
renderer
shadow_config_range
textures_pool
Source code in molmo_spaces/renderer/offline_renderers/opengl_rendrerer.py
def __init__(
    self,
    env: "MuJoCoVectorEnv",
    device_id: int = 0,
    renderer_cls: type[MjAbstractRenderer] = MjOpenGLRenderer,
    max_render_contexts: int | None = None,
    namespace: str = "robot_0/",
    width: int = 1280,
    height: int = 720,
    randomize_lights: bool = True,
    randomize_shadows: bool = True,
    randomize_textures: bool = True,
    light_config_range: dict | None = None,
    shadow_config_range: dict | None = None,
    textures_pool: list | None = None,
    **kwargs: Any,
) -> None:
    self.multithreaded_render = MultithreadOpenGLRenderer(
        env=env,
        renderer_cls=renderer_cls,
        max_render_contexts=max_render_contexts,
        namespace=namespace,
        width=width,
        height=height,
        **kwargs,
    )
    self.randomize_lights = randomize_lights
    self.randomize_shadows = randomize_shadows
    self.randomize_textures = randomize_textures

    # Default configuration ranges.
    self.light_config_range = light_config_range or {
        "intensity": (0.5, 1.5),
        "color": [(0.5, 1.0), (0.5, 1.0), (0.5, 1.0)],
    }
    self.shadow_config_range = shadow_config_range or {"shadow_softness": (0.0, 1.0)}
    self.textures_pool = textures_pool or ["texture1.png", "texture2.png", "texture3.png"]
data instance-attribute
data = MjData(model)
device_id instance-attribute
device_id = device_id
env instance-attribute
env = env
episode_path instance-attribute
episode_path = episode_path
light_config_range instance-attribute
light_config_range = light_config_range or {'intensity': (0.5, 1.5), 'color': [(0.5, 1.0), (0.5, 1.0), (0.5, 1.0)]}
max_render_contexts instance-attribute
max_render_contexts = max_render_contexts or 1
model property
model
model_bindings property
model_bindings
model_id_to_render_input_queue instance-attribute
model_id_to_render_input_queue: dict[int, Queue] | None = defaultdict(Queue)
model_id_to_render_threads instance-attribute
model_id_to_render_threads: dict[int, list[Thread]] | None = defaultdict(list)
multithreaded_render instance-attribute
multithreaded_render = MultithreadOpenGLRenderer(env=env, renderer_cls=renderer_cls, max_render_contexts=max_render_contexts, namespace=namespace, width=width, height=height, **kwargs)
randomize_lights instance-attribute
randomize_lights = randomize_lights
randomize_shadows instance-attribute
randomize_shadows = randomize_shadows
randomize_textures instance-attribute
randomize_textures = randomize_textures
render_output_queue instance-attribute
render_output_queue: Queue | None = Queue()
render_outputs instance-attribute
render_outputs: list[Any]
renderer instance-attribute
renderer = renderer
shadow_config_range instance-attribute
shadow_config_range = shadow_config_range or {'shadow_softness': (0.0, 1.0)}
textures_pool instance-attribute
textures_pool = textures_pool or ['texture1.png', 'texture2.png', 'texture3.png']
__del__
__del__() -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
def __del__(self) -> None:
    if self._closed:
        return

    self._closed = True

    try:
        for _idx, model in enumerate(self.env.mj_models):
            self.model_id_to_render_input_queue[id(model)].put(RENDERING_COMPLETE)

        for threads in self.model_id_to_render_threads.values():
            for thread in threads:
                thread.join(0.1)

        self.model_id_to_render_threads.clear()
        self.model_id_to_render_input_queue.clear()
    except (KeyboardInterrupt, SystemExit):
        raise
    except ValueError:
        print("While closing MultithreadRenderer")
close abstractmethod
close() -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
@abc.abstractmethod
def close(self) -> None:
    raise NotImplementedError
from_dataset classmethod
from_dataset(data_path: str) -> None
Source code in molmo_spaces/renderer/offline_renderers/opengl_rendrerer.py
@classmethod
def from_dataset(cls, data_path: str) -> None:
    pass
process_request
process_request(renderer: MjOpenGLRenderer, request: Any, output_queue: Queue, episode_path: str, **process_request_kwargs: Any) -> None
Source code in molmo_spaces/renderer/offline_renderers/opengl_rendrerer.py
def process_request(
    self,
    renderer: MjOpenGLRenderer,
    request: Any,
    output_queue: Queue,
    episode_path: str,
    # add_namespace: bool = True,
    **process_request_kwargs: Any,
) -> None:
    # load from episode path and metadata
    model, data, renderer = self._load_episode_data(episode_path)
    state_data = self._load_episode_data(episode_path)
    qpos = state_data["qpos"]
    # camera_names = state_data["camera_names"]
    # camera_resolution = state_data["camera_resolution"]

    # get from request
    idx, camera, data, mode = request

    # mode handling
    if mode == "rgb":
        renderer.disable_depth_rendering()
        renderer.disable_segmentation_rendering()
    elif mode == "depth" or mode == "pointcloud":
        renderer.enable_depth_rendering()
    elif mode == "segmentation":
        renderer.enable_segmentation_rendering()
    else:
        raise ValueError(f"Invalid mode: {mode}")

    # render all frames
    num_frames = qpos.shape[0]
    observations = []
    for i in range(num_frames):
        data.qpos = qpos[i]
        mujoco.mj_forward(model, data)

        renderer.update(data, camera=camera)
        image = renderer.render(**process_request_kwargs)

        if mode == "pointcloud":
            # pointcloud = mujoco_depth_to_pointcloud(depth)
            raise NotImplementedError("Pointcloud rendering not implemented")
        else:
            observations.append(image)

    output_queue.put((idx, observations))
randomize
randomize() -> None

Apply domain randomizations to the scene.

This method should perform modifications such as altering light intensities/colors, shadow properties, textures, etc.

Source code in molmo_spaces/renderer/offline_renderers/domain_randomization.py
def randomize(self) -> None:
    """
    Apply domain randomizations to the scene.

    This method should perform modifications such as altering
    light intensities/colors, shadow properties, textures, etc.
    """
    if self.randomize_lights:
        self._randomize_lights()
    if self.randomize_shadows:
        self._randomize_shadows()
    if self.randomize_textures:
        self._randomize_textures()
render
render(camera: str = 'camera_rgb', mode: Literal['rgb', 'depth', 'segmentation'] = 'rgb', add_namespace: bool = True) -> None
Source code in molmo_spaces/renderer/offline_renderers/opengl_rendrerer.py
def render(
    self,
    camera: str = "camera_rgb",
    mode: Literal["rgb", "depth", "segmentation"] = "rgb",
    add_namespace: bool = True,
) -> None:
    pass
rendering_thread_runner staticmethod
rendering_thread_runner(renderer_cls: type[MjAbstractRenderer], process_request_callback: Callable[[MjAbstractRenderer, Any, Queue, dict | None], None], model_bindings: MjModelBindings, device: int | None, input_queue: Queue, output_queue: Queue, timeout: int | None = None, process_request_kwargs: dict | None = None, **kwargs: Any) -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
@staticmethod
def rendering_thread_runner(
    renderer_cls: type[MjAbstractRenderer],
    process_request_callback: Callable[[MjAbstractRenderer, Any, Queue, dict | None], None],
    model_bindings: MjModelBindings,
    device: int | None,
    input_queue: Queue,
    output_queue: Queue,
    timeout: int | None = None,
    process_request_kwargs: dict | None = None,
    **kwargs: Any,
) -> None:
    renderer = renderer_cls(model_bindings=model_bindings, device_id=device, **kwargs)

    print(
        f"Rendering thread started with renderer {renderer_cls.__name__} for model {id(model_bindings.model)}"
    )
    process_request_kwargs = process_request_kwargs or {}
    try:
        while True:
            request = input_queue.get(block=True, timeout=timeout)

            if request == RENDERING_COMPLETE:
                print(
                    f"Rendering thread for model {id(model_bindings.model)} received RENDERING_COMPLETE"
                )
                break

            process_request_callback(renderer, request, output_queue, **process_request_kwargs)
    finally:
        renderer.close()
reset_single
reset_single(idx: int) -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
def reset_single(self, idx: int) -> None:
    return None

opengl_context

Classes:

Name Description
EGLGLContext

An EGL context for headless accelerated OpenGL rendering on GPU devices.

Functions:

Name Description
create_initialized_egl_device_display

Creates an initialized EGL display directly on a device.

Attributes:

Name Type Description
EGL_ATTRIBUTES
EGL_DISPLAY
EGL_DISPLAY_INITIALIZED
PYOPENGL_PLATFORM
xla_flags

EGL_ATTRIBUTES module-attribute

EGL_ATTRIBUTES = (EGL_RED_SIZE, 8, EGL_GREEN_SIZE, 8, EGL_BLUE_SIZE, 8, EGL_ALPHA_SIZE, 8, EGL_DEPTH_SIZE, 24, EGL_STENCIL_SIZE, 8, EGL_COLOR_BUFFER_TYPE, EGL_RGB_BUFFER, EGL_SURFACE_TYPE, EGL_PBUFFER_BIT, EGL_RENDERABLE_TYPE, EGL_OPENGL_BIT, EGL_NONE)

EGL_DISPLAY module-attribute

EGL_DISPLAY = None

EGL_DISPLAY_INITIALIZED module-attribute

EGL_DISPLAY_INITIALIZED = False

PYOPENGL_PLATFORM module-attribute

PYOPENGL_PLATFORM = get('PYOPENGL_PLATFORM')

xla_flags module-attribute

xla_flags = get('XLA_FLAGS', '')

EGLGLContext

EGLGLContext(max_width, max_height, device_id=0)

An EGL context for headless accelerated OpenGL rendering on GPU devices.

Methods:

Name Description
__del__
free

Frees resources associated with this context.

make_current

Attributes:

Name Type Description
device_id
Source code in molmo_spaces/renderer/opengl_context.py
def __init__(self, max_width, max_height, device_id=0) -> None:
    global EGL_DISPLAY, EGL_DISPLAY_INITIALIZED
    del max_width, max_height  # unused
    self.device_id = device_id
    num_configs = ctypes.c_long()
    config_size = 1
    config = EGL.EGLConfig()
    EGL.eglReleaseThread()

    if not EGL_DISPLAY_INITIALIZED:
        # only initialize for the first time
        EGL_DISPLAY = create_initialized_egl_device_display(device_id=device_id)
        if EGL_DISPLAY == EGL.EGL_NO_DISPLAY:
            raise ImportError(
                "Cannot initialize a EGL device display. This likely means that your EGL "
                "driver does not support the PLATFORM_DEVICE extension, which is "
                "required for creating a headless rendering context."
            )
        atexit.register(EGL.eglTerminate, EGL_DISPLAY)
        EGL_DISPLAY_INITIALIZED = True
    EGL.eglChooseConfig(
        EGL_DISPLAY, EGL_ATTRIBUTES, ctypes.byref(config), config_size, num_configs
    )
    if num_configs.value < 1:
        raise RuntimeError(
            "EGL failed to find a framebuffer configuration that matches the "
            f"desired attributes: {EGL_ATTRIBUTES}"
        )
    EGL.eglBindAPI(EGL.EGL_OPENGL_API)
    self._context = EGL.eglCreateContext(EGL_DISPLAY, config, EGL.EGL_NO_CONTEXT, None)
    if not self._context:
        raise RuntimeError("Cannot create an EGL context.")
device_id instance-attribute
device_id = device_id
__del__
__del__() -> None
Source code in molmo_spaces/renderer/opengl_context.py
def __del__(self) -> None:
    self.free()
free
free() -> None

Frees resources associated with this context.

Source code in molmo_spaces/renderer/opengl_context.py
def free(self) -> None:
    """Frees resources associated with this context."""
    global EGL_DISPLAY, EGL_DISPLAY_INITIALIZED
    if self._context and EGL_DISPLAY_INITIALIZED:
        try:
            current_context = EGL.eglGetCurrentContext()
            if current_context and self._context.address == current_context.address:
                EGL.eglMakeCurrent(
                    EGL_DISPLAY,
                    EGL.EGL_NO_SURFACE,
                    EGL.EGL_NO_SURFACE,
                    EGL.EGL_NO_CONTEXT,
                )
            EGL.eglDestroyContext(EGL_DISPLAY, self._context)
            EGL.eglReleaseThread()
        except EGLError:
            # Display may have already been terminated by atexit handler
            # during exception cleanup. Nothing we can do here.
            pass
    self._context = None
make_current
make_current() -> None
Source code in molmo_spaces/renderer/opengl_context.py
def make_current(self) -> None:
    global EGL_DISPLAY
    if not EGL.eglMakeCurrent(
        EGL_DISPLAY, EGL.EGL_NO_SURFACE, EGL.EGL_NO_SURFACE, self._context
    ):
        error = EGL.eglGetError()
        raise RuntimeError(f"Failed to make the EGL context current. EGL error: {error}")

create_initialized_egl_device_display

create_initialized_egl_device_display(device_id=0)

Creates an initialized EGL display directly on a device.

Source code in molmo_spaces/renderer/opengl_context.py
def create_initialized_egl_device_display(device_id=0):
    """Creates an initialized EGL display directly on a device."""
    all_devices = EGL.eglQueryDevicesEXT()
    selected_device = (
        os.environ.get("CUDA_VISIBLE_DEVICES", None)
        if os.environ.get("MUJOCO_EGL_DEVICE_ID", None) is None
        else os.environ.get("MUJOCO_EGL_DEVICE_ID", None)
    )
    if selected_device is None:
        candidates = all_devices
        device_idx = 0 if device_id == -1 else device_id
    else:
        if not selected_device.isdigit():
            device_inds = [int(x) for x in selected_device.split(",")]
            if device_id == -1:
                device_idx = device_inds[0]
            else:
                assert device_id in device_inds, (
                    "specified device id is not made visible in environment variables."
                )
                device_idx = device_id
        else:
            device_idx = int(selected_device)
        if not 0 <= device_idx < len(all_devices):
            raise RuntimeError(
                f"The MUJOCO_EGL_DEVICE_ID environment variable must be an integer "
                f"between 0 and {len(all_devices) - 1} (inclusive), got {device_idx}."
            )
    candidates = all_devices[device_idx : device_idx + 1]
    for device in candidates:
        display = EGL.eglGetPlatformDisplayEXT(EGL.EGL_PLATFORM_DEVICE_EXT, device, None)
        if display != EGL.EGL_NO_DISPLAY and EGL.eglGetError() == EGL.EGL_SUCCESS:
            # `eglInitialize` may or may not raise an exception on failure depending
            # on how PyOpenGL is configured. We therefore catch a `GLError` and also
            # manually check the output of `eglGetError()` here.
            try:
                initialized = EGL.eglInitialize(display, None, None)
            except error.GLError:
                pass
            else:
                if initialized == EGL.EGL_TRUE and EGL.eglGetError() == EGL.EGL_SUCCESS:
                    return display
    return EGL.EGL_NO_DISPLAY

opengl_rendering

Classes:

Name Description
MjBatchRenderer

Reference:

MjOpenGLRenderer

Renders MuJoCo scenes with OpenGL.

MultithreadOpenGLRenderer

Functions:

Name Description
prepare_locals_for_super

MjBatchRenderer

Reference: https://github.com/openai/mujoco-py/blob/master/mujoco_py/mjbatchrenderer.pyx https://github.com/openai/mujoco-py/commit/aa82c0e555d28813394f04ee8a8c2fc6b18d6b3f https://github.com/openai/mujoco-py/pull/94 https://github.com/openai/mujoco-py/pull/246/files#diff-2e59ed7fefb358a2579ea4033f71379d3659858b63dcb4a0d161fca9b3e43522

MjOpenGLRenderer

MjOpenGLRenderer(model_bindings: MjModelBindings = None, device_id: int | None = None, height: int = 720, width: int = 1280, max_geom: int = 10000, model: MjModel | None = None, **kwargs: Any)

Bases: MjAbstractRenderer

Renders MuJoCo scenes with OpenGL.

Methods:

Name Description
__del__
__enter__
__exit__
close

Frees the resources used by the renderer.

disable_depth_rendering
disable_segmentation_rendering
enable_depth_rendering
enable_segmentation_rendering
geomid_to_bodyid
mark_textures_dirty

Mark that textures have been modified and need to be uploaded.

render

Renders the scene as a numpy array of pixel values.

reset_single
update

Updates geometry used for rendering.

upload_textures

Upload all textures to the GPU render context.

Attributes:

Name Type Description
device_id
height
model
model_bindings
render_outputs list[Any]
scene MjvScene
width
Source code in molmo_spaces/renderer/opengl_rendering.py
def __init__(
    self,
    model_bindings: MjModelBindings = None,
    device_id: int | None = None,
    height: int = 720,
    width: int = 1280,
    max_geom: int = 10000,
    model: MjModel | None = None,
    **kwargs: Any,
) -> None:
    assert model_bindings is not None or model is not None, (
        "model_bindings or model must be provided"
    )
    """Initializes a new `Renderer`.

    Args:
      model: an mujoco.Mjmodel instance.
      device_id: The index of the device to use for rendering.
      height: image height in pixels.
      width: image width in pixels.
      max_geom: Optional integer specifying the maximum number of geoms that can
        be rendered in the same scene. If None this will be chosen automatically
        based on the estimated maximum number of renderable geoms in the model_bindings.
    Raises:
      ValueError: If `camera_id` is outside the valid range, or if `width` or
        `height` exceed the dimensions of MuJoCo's offscreen framebuffer.
    """
    if device_id is None:
        try:
            import torch

            if torch.cuda.is_available():
                device_id = 0
        except ImportError:
            pass

    super().__init__(**prepare_locals_for_super(locals()))

    self._width = width
    self._height = height

    if model_bindings is not None and model is not None:
        assert model_bindings.model == model, "model_bindings and model must be the same"
    model = model_bindings.model if model_bindings is not None else model
    self._model = model

    self._scene = MjvScene(model=model, maxgeom=max_geom)
    self._scene_option = MjvOption()

    # Turn off site rendering
    self._scene_option.sitegroup *= 0

    # Enable shadow rendering by default (shadows are controlled by lights with castshadow enabled)
    self._scene.flags[mjtRndFlag.mjRND_SHADOW] = True

    # Create render contexts.
    # TODO(nimrod): Figure out why pytype doesn't like gl_context.GLContext
    self._context_is_cgl = False
    if device_id is None:
        from mujoco import gl_context

        self._gl_context = gl_context.GLContext(width, height)  # type: ignore
        self._context_is_cgl = True
    else:
        from molmo_spaces.renderer.opengl_context import EGLGLContext

        self._gl_context = EGLGLContext(width, height, device_id)
    self._gl_context.make_current()
    self._mjr_context = MjrContext(model, mjtFontScale.mjFONTSCALE_150.value)
    mjr_resizeOffscreen(width, height, self._mjr_context)
    mjr_setBuffer(mjtFramebuffer.mjFB_OFFSCREEN.value, self._mjr_context)
    self._mjr_context.readDepthMap = mjtDepthMap.mjDEPTH_ZEROFAR

    # TODO In MacOS, keeping the context locked seems to preclude others to progress,
    #  so it doesn't look like we can achieve true parallelism through multi threading?
    #  This also happens at the end of render()
    if self._context_is_cgl:
        from mujoco.cgl import cgl

        cgl.CGLUnlockContext(self._gl_context._context)

    # Default render flags.
    self._depth_rendering = False
    self._segmentation_rendering = False

    # Track if textures need to be uploaded (set to True when textures are modified)
    # NOTE: We start with False because textures are loaded from model at MjrContext creation
    # We only need to upload if textures are modified AFTER renderer initialization
    self._textures_need_upload = False
device_id instance-attribute
device_id = device_id
height property
height
model property
model
model_bindings property
model_bindings
render_outputs instance-attribute
render_outputs: list[Any]
scene property
scene: MjvScene
width property
width
__del__
__del__() -> None
Source code in molmo_spaces/renderer/opengl_rendering.py
def __del__(self) -> None:
    self.close()
__enter__
__enter__()
Source code in molmo_spaces/renderer/opengl_rendering.py
def __enter__(self):
    return self
__exit__
__exit__(exc_type, exc_value, traceback)
Source code in molmo_spaces/renderer/opengl_rendering.py
def __exit__(self, exc_type, exc_value, traceback):
    del exc_type, exc_value, traceback  # Unused.
    self.close()
close
close() -> None

Frees the resources used by the renderer.

This method can be used directly:

renderer = Renderer(...)
# Use renderer.
renderer.close()

or via a context manager:

with Renderer(...) as renderer:
  # Use renderer.
Source code in molmo_spaces/renderer/opengl_rendering.py
def close(self) -> None:
    """Frees the resources used by the renderer.

    This method can be used directly:

    ```python
    renderer = Renderer(...)
    # Use renderer.
    renderer.close()
    ```

    or via a context manager:

    ```python
    with Renderer(...) as renderer:
      # Use renderer.
    ```
    """
    if hasattr(self, "_gl_context") and self._gl_context:
        self._gl_context.free()
    self._gl_context = None
    if hasattr(self, "_mjr_context") and self._mjr_context:
        self._mjr_context.free()
    self._mjr_context = None
disable_depth_rendering
disable_depth_rendering() -> None
Source code in molmo_spaces/renderer/opengl_rendering.py
def disable_depth_rendering(self) -> None:
    self._depth_rendering = False
disable_segmentation_rendering
disable_segmentation_rendering() -> None
Source code in molmo_spaces/renderer/opengl_rendering.py
def disable_segmentation_rendering(self) -> None:
    self._segmentation_rendering = False
enable_depth_rendering
enable_depth_rendering() -> None
Source code in molmo_spaces/renderer/opengl_rendering.py
def enable_depth_rendering(self) -> None:
    self._segmentation_rendering = False
    self._depth_rendering = True
enable_segmentation_rendering
enable_segmentation_rendering() -> None
Source code in molmo_spaces/renderer/opengl_rendering.py
def enable_segmentation_rendering(self) -> None:
    self._segmentation_rendering = True
    self._depth_rendering = False
geomid_to_bodyid
geomid_to_bodyid(geomid)
Source code in molmo_spaces/renderer/opengl_rendering.py
def geomid_to_bodyid(self, geomid):
    return self.model.geom_bodyid[geomid]
mark_textures_dirty
mark_textures_dirty() -> None

Mark that textures have been modified and need to be uploaded.

Call this after modifying texture data in model.tex_data to ensure the changes will be uploaded before the next render.

Source code in molmo_spaces/renderer/opengl_rendering.py
def mark_textures_dirty(self) -> None:
    """Mark that textures have been modified and need to be uploaded.

    Call this after modifying texture data in model.tex_data to ensure
    the changes will be uploaded before the next render.
    """
    self._textures_need_upload = True
render
render(*, out: ndarray | None = None, width: int | None = None, height: int | None = None) -> ndarray

Renders the scene as a numpy array of pixel values.

Parameters:

Name Type Description Default
out ndarray | None

Alternative output array in which to place the resulting pixels. It must have the same shape as the expected output but the type will be cast if necessary. The expted shape depends on the value of self._depth_rendering: when True, we expect out.shape == (width, height), and out.shape == (width, height, 3) when False.

None

Returns:

Type Description
ndarray

A new numpy array holding the pixels with shape (H, W) or (H, W, 3),

ndarray

depending on the value of self._depth_rendering unless

ndarray

out is None, in which case a reference to out is returned.

Raises:

Type Description
RuntimeError

if this method is called after the close method.

Source code in molmo_spaces/renderer/opengl_rendering.py
def render(
    self,
    *,
    out: np.ndarray | None = None,
    width: int | None = None,
    height: int | None = None,
) -> np.ndarray:
    """Renders the scene as a numpy array of pixel values.

    Args:
      out: Alternative output array in which to place the resulting pixels. It
        must have the same shape as the expected output but the type will be
        cast if necessary. The expted shape depends on the value of
        `self._depth_rendering`: when `True`, we expect `out.shape == (width,
        height)`, and `out.shape == (width, height, 3)` when `False`.

    Returns:
      A new numpy array holding the pixels with shape `(H, W)` or `(H, W, 3)`,
      depending on the value of `self._depth_rendering` unless
      `out is None`, in which case a reference to `out` is returned.

    Raises:
      RuntimeError: if this method is called after the close method.
    """

    height = height or self._height
    width = width or self._width
    rect = MjrRect(0, 0, width, height)

    original_flags = self._scene.flags.copy()

    # Enable shadow rendering (required for shadows to appear in rendered images)
    # Shadows are controlled by lights with castshadow enabled
    self._scene.flags[mjtRndFlag.mjRND_SHADOW] = True

    # Using segmented rendering for depth makes the calculated depth more
    # accurate at far distances.
    if self._depth_rendering or self._segmentation_rendering:
        self._scene.flags[mjtRndFlag.mjRND_SEGMENT] = True
        self._scene.flags[mjtRndFlag.mjRND_IDCOLOR] = True

    if self._gl_context is None:
        raise RuntimeError("render cannot be called after close.")

    self._gl_context.make_current()

    # Upload textures to GPU before rendering if textures have been modified
    # This is necessary when textures are modified via model.tex_data
    # Only upload when needed to avoid performance overhead
    if self._textures_need_upload:
        self.upload_textures()
        self._textures_need_upload = False

    if self._depth_rendering:
        out_shape = (rect.height, rect.width)
        out_dtype = np.float32
    else:
        out_shape = (rect.height, rect.width, 3)
        out_dtype = np.uint8

    if out is None:
        out = np.empty(out_shape, dtype=out_dtype)
    else:
        if out.shape != out_shape:
            raise ValueError(
                f"Expected `out.shape == {out_shape}`. Got `out.shape={out.shape}`"
                " instead. When using depth rendering, the out array should be of"
                " shape `(width, height)` and otherwise (width, height, 3)."
                f" Got `(self.height, self.width)={(self.height, self.width)}` and"
                f" `self._depth_rendering={self._depth_rendering}`."
            )

    # Render scene and read contents of RGB and depth buffers.
    mjr_render(rect, self._scene, self._mjr_context)

    if self._depth_rendering:
        mjr_readPixels(rgb=None, depth=out, viewport=rect, con=self._mjr_context)

        # Get the distances to the near and far clipping planes.
        extent = self.model.stat.extent
        near = self.model.vis.map.znear * extent
        far = self.model.vis.map.zfar * extent

        # Calculate OpenGL perspective matrix values in float32 precision
        # so they are close to what glFrustum returns
        # https://registry.khronos.org/OpenGL-Refpages/gl2.1/xhtml/glFrustum.xml
        zfar = np.float32(far)
        znear = np.float32(near)
        c_coef = -(zfar + znear) / (zfar - znear)
        d_coef = -(np.float32(2) * zfar * znear) / (zfar - znear)

        # In reverse Z mode the perspective matrix is transformed by the following
        c_coef = np.float32(-0.5) * c_coef - np.float32(0.5)
        d_coef = np.float32(-0.5) * d_coef

        # We need 64 bits to convert Z from ndc to metric depth without noticeable
        # losses in precision
        out_64 = out.astype(np.float64)

        # Undo OpenGL projection
        # Note: We do not need to take action to convert from window coordinates
        # to normalized device coordinates because in reversed Z mode the mapping
        # is identity
        out_64 = d_coef / (out_64 + c_coef)

        # Cast result back to float32 for backwards compatibility
        # This has a small accuracy cost
        out[:] = out_64.astype(np.float32)

        # Reset scene flags.
        np.copyto(self._scene.flags, original_flags)
    elif self._segmentation_rendering:
        mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)

        # Convert 3-channel uint8 to 1-channel uint32.
        image3 = out.astype(np.uint32)
        segimage = image3[:, :, 0] + image3[:, :, 1] * (2**8) + image3[:, :, 2] * (2**16)
        # Remap segid to 3-channel (object ID, object type, body ID) triplet
        # Seg ID 0 is background -- will be remapped to (-1, -1, -1).

        # Find the maximum segment ID in the image to size the output array correctly
        max_segid = np.max(segimage) if segimage.size > 0 else 0

        # Create output array with size to accommodate all possible segment IDs
        # Add 1 to account for 0-based indexing and ensure we have enough space
        segid2output = np.full((max_segid + 1, 3), fill_value=-1, dtype=np.int32)

        visible_geoms = [g for g in self._scene.geoms[: self._scene.ngeom] if g.segid != -1]
        visible_segids = np.array([g.segid + 1 for g in visible_geoms], np.int32)
        visible_objid = np.array([g.objid for g in visible_geoms], np.int32)
        visible_objtype = np.array([g.objtype for g in visible_geoms], np.int32)
        visible_bodyid = np.array(
            [self.geomid_to_bodyid(g.objid) for g in visible_geoms], np.int32
        )

        # Only set values for valid segment IDs that are within bounds
        valid_mask = (visible_segids >= 0) & (visible_segids < segid2output.shape[0])
        if np.any(valid_mask):
            segid2output[visible_segids[valid_mask], 0] = visible_objid[valid_mask]
            segid2output[visible_segids[valid_mask], 1] = visible_objtype[valid_mask]
            segid2output[visible_segids[valid_mask], 2] = visible_bodyid[valid_mask]

        out = segid2output[segimage]

        # Reset scene flags.
        np.copyto(self._scene.flags, original_flags)
    else:
        mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)
        mjr_readPixels(rgb=out, depth=None, viewport=rect, con=self._mjr_context)

    out[:] = np.flipud(out)

    # TODO In MacOS, keeping the context locked seems to preclude others to progress,
    #  so it doesn't look like we can achieve true parallelism through multi threading?
    #  This also happens at the end of __init__()
    if self._context_is_cgl:
        from mujoco.cgl import cgl

        cgl.CGLUnlockContext(self._gl_context._context)

    return out
reset_single
reset_single(idx: int) -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
def reset_single(self, idx: int) -> None:
    return None
update
update(data: MjData, camera: int | str | MjvCamera = -1, scene_option: MjvOption | None = None) -> None

Updates geometry used for rendering.

Parameters:

Name Type Description Default
data MjData

An instance of MjData.

required
camera int | str | MjvCamera

An instance of MjvCamera, a string or an integer

-1
scene_option MjvOption | None

A custom MjvOption instance to use to render the scene instead of the default.

None

Raises:

Type Description
ValueError

If camera_id is outside the valid range, or if camera does not exist.

Source code in molmo_spaces/renderer/opengl_rendering.py
def update(
    self,
    data: MjData,
    camera: int | str | MjvCamera = -1,
    scene_option: MjvOption | None = None,
) -> None:
    """Updates geometry used for rendering.

    Args:
      data: An instance of `MjData`.
      camera: An instance of `MjvCamera`, a string or an integer
      scene_option: A custom `MjvOption` instance to use to render
        the scene instead of the default.

    Raises:
      ValueError: If `camera_id` is outside the valid range, or if camera does
        not exist.
    """
    if not isinstance(camera, MjvCamera):
        camera_id = camera
        if isinstance(camera_id, str):
            camera_id = mj_name2id(self.model, mjtObj.mjOBJ_CAMERA.value, camera_id)
            if camera_id == -1:
                raise ValueError(f'The camera "{camera}" does not exist.')
        if camera_id < -1 or camera_id >= self.model.ncam:
            raise ValueError(
                f"The camera id {camera_id} is out of range [-1, {self.model.ncam})."
            )

        # Render camera.
        camera = MjvCamera()
        camera.fixedcamid = camera_id

        # Defaults to mjCAMERA_FREE, otherwise mjCAMERA_FIXED refers to a
        # camera explicitly defined in the model_bindings.
        if camera_id == -1:
            camera.type = mjtCamera.mjCAMERA_FREE
            mjv_defaultFreeCamera(self.model, camera)
        else:
            camera.type = mjtCamera.mjCAMERA_FIXED

    scene_option = scene_option or self._scene_option
    mjv_updateScene(
        self.model,
        data,
        scene_option,
        None,
        camera,
        mjtCatBit.mjCAT_ALL.value,
        self._scene,
    )
upload_textures
upload_textures(data: MjData | None = None) -> None

Upload all textures to the GPU render context.

This should be called after modifying texture data in model.tex_data to ensure the changes are visible in rendered images.

NOTE: This only uploads textures to THIS renderer's context (MjOpenGLRenderer). The passive viewer has its own separate renderer context and won't see these updates.

Parameters:

Name Type Description Default
data MjData | None

Optional MjData to use for updating the scene after texture upload. If provided, will call mjv_updateScene() to refresh the scene.

None
Source code in molmo_spaces/renderer/opengl_rendering.py
def upload_textures(self, data: MjData | None = None) -> None:
    """Upload all textures to the GPU render context.

    This should be called after modifying texture data in model.tex_data
    to ensure the changes are visible in rendered images.

    NOTE: This only uploads textures to THIS renderer's context (MjOpenGLRenderer).
    The passive viewer has its own separate renderer context and won't see these updates.

    Args:
        data: Optional MjData to use for updating the scene after texture upload.
              If provided, will call mjv_updateScene() to refresh the scene.
    """
    import logging

    log = logging.getLogger(__name__)

    if self._gl_context is None or self._mjr_context is None:
        log.debug("upload_textures(): Skipping - GL context or Mjr context is None")
        return

    # Skip if no textures exist
    if self.model.ntex == 0:
        log.debug("upload_textures(): Skipping - no textures in model (ntex == 0)")
        return

    log.debug(f"upload_textures(): Uploading {self.model.ntex} textures to GPU render context")
    self._gl_context.make_current()
    # Upload all textures to the render context
    for tex_id in range(self.model.ntex):
        mjr_uploadTexture(self.model, self._mjr_context, tex_id)

    # Unlock context if needed (for macOS)
    if self._context_is_cgl:
        from mujoco.cgl import cgl

        cgl.CGLUnlockContext(self._gl_context._context)

MultithreadOpenGLRenderer

MultithreadOpenGLRenderer(env, renderer_cls: type[MjAbstractRenderer] = MjOpenGLRenderer, max_render_contexts: int | None = None, namespace: str = 'robot_0/', width: int = 1280, height: int = 720, **kwargs: Any)

Bases: MultithreadRenderer

Methods:

Name Description
__del__
process_request
render
rendering_thread_runner

Attributes:

Name Type Description
env
height
max_render_contexts
model_id_to_render_input_queue dict[int, Queue] | None
model_id_to_render_threads dict[int, list[Thread]] | None
namespace
render_output_queue Queue | None
render_outputs list[ndarray] | None
width
Source code in molmo_spaces/renderer/opengl_rendering.py
def __init__(
    self,
    env,  #: "MuJoCoVectorEnv",
    renderer_cls: type[MjAbstractRenderer] = MjOpenGLRenderer,
    max_render_contexts: int | None = None,
    namespace: str = "robot_0/",
    width: int = 1280,
    height: int = 720,
    **kwargs: Any,
) -> None:
    self.width = width
    self.height = height
    self.namespace = namespace

    self.render_outputs: list[np.ndarray] | None = None

    super().__init__(**prepare_locals_for_super(locals()))
env instance-attribute
env = env
height instance-attribute
height = height
max_render_contexts instance-attribute
max_render_contexts = max_render_contexts or 1
model_id_to_render_input_queue instance-attribute
model_id_to_render_input_queue: dict[int, Queue] | None = defaultdict(Queue)
model_id_to_render_threads instance-attribute
model_id_to_render_threads: dict[int, list[Thread]] | None = defaultdict(list)
namespace instance-attribute
namespace = namespace
render_output_queue instance-attribute
render_output_queue: Queue | None = Queue()
render_outputs instance-attribute
render_outputs: list[ndarray] | None = None
width instance-attribute
width = width
__del__
__del__() -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
def __del__(self) -> None:
    if self._closed:
        return

    self._closed = True

    try:
        for _idx, model in enumerate(self.env.mj_models):
            self.model_id_to_render_input_queue[id(model)].put(RENDERING_COMPLETE)

        for threads in self.model_id_to_render_threads.values():
            for thread in threads:
                thread.join(0.1)

        self.model_id_to_render_threads.clear()
        self.model_id_to_render_input_queue.clear()
    except (KeyboardInterrupt, SystemExit):
        raise
    except ValueError:
        print("While closing MultithreadRenderer")
process_request staticmethod
process_request(renderer: MjOpenGLRenderer, request: Any, output_queue: Queue, **process_request_kwargs) -> None
Source code in molmo_spaces/renderer/opengl_rendering.py
@staticmethod
def process_request(
    renderer: MjOpenGLRenderer, request: Any, output_queue: Queue, **process_request_kwargs
) -> None:
    idx, camera, data, mode = request

    if mode == "rgb":
        renderer.disable_depth_rendering()
        renderer.disable_segmentation_rendering()
    elif mode == "depth":
        renderer.enable_depth_rendering()
    elif mode == "segmentation":
        renderer.enable_segmentation_rendering()
    else:
        raise ValueError(f"Invalid mode: {mode}")

    renderer.update(data, camera=camera)
    img = renderer.render(**process_request_kwargs)
    output_queue.put((idx, img))
render
render(camera: str = 'camera_rgb', mode: Literal['rgb', 'depth', 'segmentation'] = 'rgb', add_namespace: bool = True)
Source code in molmo_spaces/renderer/opengl_rendering.py
def render(
    self,
    camera: str = "camera_rgb",
    mode: Literal["rgb", "depth", "segmentation"] = "rgb",
    add_namespace: bool = True,
):
    for idx, (model, data) in enumerate(zip(self.env.mj_models, self.env.mj_datas)):
        self.model_id_to_render_input_queue[id(model)].put(
            (idx, (self.namespace if add_namespace else "") + camera, data, mode)
        )

    idx_render_tuples = [
        self.render_output_queue.get(block=True) for _ in range(len(self.env.mj_models))
    ]

    idx_render_tuples.sort(key=lambda x: x[0])
    self.render_outputs = [img for _, img in idx_render_tuples]

    return self.render_outputs
rendering_thread_runner staticmethod
rendering_thread_runner(renderer_cls: type[MjAbstractRenderer], process_request_callback: Callable[[MjAbstractRenderer, Any, Queue, dict | None], None], model_bindings: MjModelBindings, device: int | None, input_queue: Queue, output_queue: Queue, timeout: int | None = None, process_request_kwargs: dict | None = None, **kwargs: Any) -> None
Source code in molmo_spaces/renderer/abstract_renderer.py
@staticmethod
def rendering_thread_runner(
    renderer_cls: type[MjAbstractRenderer],
    process_request_callback: Callable[[MjAbstractRenderer, Any, Queue, dict | None], None],
    model_bindings: MjModelBindings,
    device: int | None,
    input_queue: Queue,
    output_queue: Queue,
    timeout: int | None = None,
    process_request_kwargs: dict | None = None,
    **kwargs: Any,
) -> None:
    renderer = renderer_cls(model_bindings=model_bindings, device_id=device, **kwargs)

    print(
        f"Rendering thread started with renderer {renderer_cls.__name__} for model {id(model_bindings.model)}"
    )
    process_request_kwargs = process_request_kwargs or {}
    try:
        while True:
            request = input_queue.get(block=True, timeout=timeout)

            if request == RENDERING_COMPLETE:
                print(
                    f"Rendering thread for model {id(model_bindings.model)} received RENDERING_COMPLETE"
                )
                break

            process_request_callback(renderer, request, output_queue, **process_request_kwargs)
    finally:
        renderer.close()

prepare_locals_for_super

prepare_locals_for_super(local_vars, args_name='args', kwargs_name='kwargs', ignore_kwargs=False)
Source code in molmo_spaces/renderer/opengl_rendering.py
def prepare_locals_for_super(
    local_vars, args_name="args", kwargs_name="kwargs", ignore_kwargs=False
):
    assert args_name not in local_vars, f"`prepare_locals_for_super` does not support {args_name}."
    new_locals = {k: v for k, v in local_vars.items() if k != "self" and "__" not in k}
    if kwargs_name in new_locals:
        if ignore_kwargs:
            new_locals.pop(kwargs_name)
        else:
            kwargs = new_locals.pop(kwargs_name)
            kwargs.update(new_locals)
            new_locals = kwargs
    return new_locals