Skip to content

Simulator

This module defines the Docking1dSimulator class which maintains environment objects (platforms and entities) and progresses a simulated training episode via the step() method.

Docking1dSimulator (BaseSimulator) ¤

Simulator for the 1D Docking task. Interfaces Docking1dPlatform with underlying Deputy1D entities in Docking simulation. Docking1dSimulator is responsible for initializing the platform objects for a simulation and knowing how to set up episodes based on input parameters from a parameter provider. It is also responsible for reporting the simulation state at each timestep.

Source code in corl/simulators/docking_1d/simulator.py
class Docking1dSimulator(BaseSimulator):
    """
    Simulator for the 1D Docking task. Interfaces Docking1dPlatform with underlying Deputy1D entities in Docking
    simulation. Docking1dSimulator is responsible for initializing the platform objects for a simulation
    and knowing how to set up episodes based on input parameters from a parameter provider.
    It is also responsible for reporting the simulation state at each timestep.
    """

    @property
    def get_simulator_validator(self):
        return Docking1dSimulatorValidator

    @property
    def get_reset_validator(self):
        return Docking1dSimulatorResetValidator

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._state = StateDict()
        self.clock = 0.0

    def reset(self, config):
        config = self.get_reset_validator(**config)
        self._state.clear()
        self.clock = 0.0

        # construct entities ("Gets the platform object associated with each simulation entity.")
        self.sim_entities = {}  # pylint: disable=attribute-defined-outside-init
        for agent_id, agent_config in self.config.agent_configs.items():
            agent_reset_config = config.platforms.get(agent_id, {})
            self.sim_entities[agent_id] = Deputy1D(name=agent_id, **agent_reset_config)

        # construct platforms ("Gets the correct backend simulation entity for each agent.")
        sim_platforms = []
        for agent_id, entity in self.sim_entities.items():
            agent_config = self.config.agent_configs[agent_id]
            sim_platforms.append(Docking1dPlatform(platform_name=agent_id, platform=entity, parts_list=agent_config.parts_list))
        self._state.sim_platforms = tuple(sim_platforms)

        self.update_sensor_measurements()
        return self._state

    def step(self):
        for platform in self._state.sim_platforms:
            agent_id = platform.name
            action = np.array(platform.get_applied_action(), dtype=np.float32)
            entity = self.sim_entities[agent_id]
            entity.step(action=action, step_size=self.config.step_size)
            platform.sim_time = self.clock
        self.update_sensor_measurements()
        self.clock += self.config.step_size
        return self._state

    @property
    def sim_time(self) -> float:
        return self.clock

    @property
    def platforms(self) -> typing.List:
        return list(self._state.sim_platforms)

    def update_sensor_measurements(self):
        """
        Update and cache all the measurements of all the sensors on each platform
        """
        for plat in self._state.sim_platforms:
            for sensor in plat.sensors:
                sensor.calculate_and_cache_measurement(state=self._state.sim_platforms)

    def mark_episode_done(self, done_info, episode_state):
        pass

    def save_episode_information(self, dones, rewards, observations):
        pass

get_reset_validator property readonly ¤

returns the validator that can be used to validate episode parameters coming into the reset function from the environment class

Returns:

Type Description

BaseSimulatorResetValidator -- The validator to use during resets

get_simulator_validator property readonly ¤

returns the validator for the configuration options to the simulator the kwargs to this class are validated and put into a defined struct potentially raising based on invalid configurations

Returns:

Type Description

BaseSimulatorValidator -- The validator to use for this simulation class

platforms: List property readonly ¤

returns a list of platforms in the simulation

Returns:

Type Description
List

list of platforms

sim_time: float property readonly ¤

returns the time

Returns:

Type Description
float

float - time

mark_episode_done(self, done_info, episode_state) ¤

Takes in the done_info specifying how the episode completed and does any book keeping around ending an episode

Source code in corl/simulators/docking_1d/simulator.py
def mark_episode_done(self, done_info, episode_state):
    pass

reset(self, config) ¤

reset resets the simulation and sets up a new episode

Returns:

Type Description

StateDict -- The simulation state, has a .sim_platforms attr to access the platforms made by the simulation

Source code in corl/simulators/docking_1d/simulator.py
def reset(self, config):
    config = self.get_reset_validator(**config)
    self._state.clear()
    self.clock = 0.0

    # construct entities ("Gets the platform object associated with each simulation entity.")
    self.sim_entities = {}  # pylint: disable=attribute-defined-outside-init
    for agent_id, agent_config in self.config.agent_configs.items():
        agent_reset_config = config.platforms.get(agent_id, {})
        self.sim_entities[agent_id] = Deputy1D(name=agent_id, **agent_reset_config)

    # construct platforms ("Gets the correct backend simulation entity for each agent.")
    sim_platforms = []
    for agent_id, entity in self.sim_entities.items():
        agent_config = self.config.agent_configs[agent_id]
        sim_platforms.append(Docking1dPlatform(platform_name=agent_id, platform=entity, parts_list=agent_config.parts_list))
    self._state.sim_platforms = tuple(sim_platforms)

    self.update_sensor_measurements()
    return self._state

save_episode_information(self, dones, rewards, observations) ¤

provides a way to save information about the current episode based on the environment

Source code in corl/simulators/docking_1d/simulator.py
def save_episode_information(self, dones, rewards, observations):
    pass

step(self) ¤

advances the simulation platforms and returns the state

Returns:

Type Description

StateDict -- The state after the simulation updates, has a .sim_platforms attr to access the platforms made by the simulation

Source code in corl/simulators/docking_1d/simulator.py
def step(self):
    for platform in self._state.sim_platforms:
        agent_id = platform.name
        action = np.array(platform.get_applied_action(), dtype=np.float32)
        entity = self.sim_entities[agent_id]
        entity.step(action=action, step_size=self.config.step_size)
        platform.sim_time = self.clock
    self.update_sensor_measurements()
    self.clock += self.config.step_size
    return self._state

update_sensor_measurements(self) ¤

Update and cache all the measurements of all the sensors on each platform

Source code in corl/simulators/docking_1d/simulator.py
def update_sensor_measurements(self):
    """
    Update and cache all the measurements of all the sensors on each platform
    """
    for plat in self._state.sim_platforms:
        for sensor in plat.sensors:
            sensor.calculate_and_cache_measurement(state=self._state.sim_platforms)

Docking1dSimulatorResetValidator (BaseSimulatorResetValidator) pydantic-model ¤

A Validator for Docking1dSimulatorValidator reset configs

Parameters¤

dict

Contains individual initialization dicts for each agent. Key is platform name, value is platform's initialization dict.

Source code in corl/simulators/docking_1d/simulator.py
class Docking1dSimulatorResetValidator(BaseSimulatorResetValidator):
    """
    A Validator for Docking1dSimulatorValidator reset configs

    Parameters
    ----------
    platform_config: dict
        Contains individual initialization dicts for each agent.
        Key is platform name, value is platform's initialization dict.
    """
    platform_config: typing.Optional[typing.Dict] = {}

Docking1dSimulatorValidator (BaseSimulatorValidator) pydantic-model ¤

A validator for the Docking1dSimulatorValidator config.

step_size: A float representing how many simulated seconds pass each time the simulator updates

Source code in corl/simulators/docking_1d/simulator.py
class Docking1dSimulatorValidator(BaseSimulatorValidator):
    """
    A validator for the Docking1dSimulatorValidator config.

    step_size: A float representing how many simulated seconds pass each time the simulator updates
    """
    step_size: float