Simulator
This module defines the Docking1dSimulator class which maintains environment objects (platforms and entities) and progresses a simulated training episode via the step() method.
Docking1dSimulator (BaseSimulator)
¤
Simulator for the 1D Docking task. Interfaces Docking1dPlatform with underlying Deputy1D entities in Docking simulation. Docking1dSimulator is responsible for initializing the platform objects for a simulation and knowing how to set up episodes based on input parameters from a parameter provider. It is also responsible for reporting the simulation state at each timestep.
Source code in corl/simulators/docking_1d/simulator.py
class Docking1dSimulator(BaseSimulator):
"""
Simulator for the 1D Docking task. Interfaces Docking1dPlatform with underlying Deputy1D entities in Docking
simulation. Docking1dSimulator is responsible for initializing the platform objects for a simulation
and knowing how to set up episodes based on input parameters from a parameter provider.
It is also responsible for reporting the simulation state at each timestep.
"""
@property
def get_simulator_validator(self):
return Docking1dSimulatorValidator
@property
def get_reset_validator(self):
return Docking1dSimulatorResetValidator
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._state = StateDict()
self.clock = 0.0
def reset(self, config):
config = self.get_reset_validator(**config)
self._state.clear()
self.clock = 0.0
# construct entities ("Gets the platform object associated with each simulation entity.")
self.sim_entities = {} # pylint: disable=attribute-defined-outside-init
for agent_id, agent_config in self.config.agent_configs.items():
agent_reset_config = config.platforms.get(agent_id, {})
self.sim_entities[agent_id] = Deputy1D(name=agent_id, **agent_reset_config)
# construct platforms ("Gets the correct backend simulation entity for each agent.")
sim_platforms = []
for agent_id, entity in self.sim_entities.items():
agent_config = self.config.agent_configs[agent_id]
sim_platforms.append(Docking1dPlatform(platform_name=agent_id, platform=entity, parts_list=agent_config.parts_list))
self._state.sim_platforms = tuple(sim_platforms)
self.update_sensor_measurements()
return self._state
def step(self):
for platform in self._state.sim_platforms:
agent_id = platform.name
action = np.array(platform.get_applied_action(), dtype=np.float32)
entity = self.sim_entities[agent_id]
entity.step(action=action, step_size=self.config.step_size)
platform.sim_time = self.clock
self.update_sensor_measurements()
self.clock += self.config.step_size
return self._state
@property
def sim_time(self) -> float:
return self.clock
@property
def platforms(self) -> typing.List:
return list(self._state.sim_platforms)
def update_sensor_measurements(self):
"""
Update and cache all the measurements of all the sensors on each platform
"""
for plat in self._state.sim_platforms:
for sensor in plat.sensors:
sensor.calculate_and_cache_measurement(state=self._state.sim_platforms)
def mark_episode_done(self, done_info, episode_state):
pass
def save_episode_information(self, dones, rewards, observations):
pass
get_reset_validator
property
readonly
¤
returns the validator that can be used to validate episode parameters coming into the reset function from the environment class
Returns:
Type | Description |
---|---|
BaseSimulatorResetValidator -- The validator to use during resets |
get_simulator_validator
property
readonly
¤
returns the validator for the configuration options to the simulator the kwargs to this class are validated and put into a defined struct potentially raising based on invalid configurations
Returns:
Type | Description |
---|---|
BaseSimulatorValidator -- The validator to use for this simulation class |
platforms: List
property
readonly
¤
returns a list of platforms in the simulation
Returns:
Type | Description |
---|---|
List |
list of platforms |
sim_time: float
property
readonly
¤
returns the time
Returns:
Type | Description |
---|---|
float |
float - time |
mark_episode_done(self, done_info, episode_state)
¤
Takes in the done_info specifying how the episode completed and does any book keeping around ending an episode
Source code in corl/simulators/docking_1d/simulator.py
def mark_episode_done(self, done_info, episode_state):
pass
reset(self, config)
¤
reset resets the simulation and sets up a new episode
Returns:
Type | Description |
---|---|
StateDict -- The simulation state, has a .sim_platforms attr to access the platforms made by the simulation |
Source code in corl/simulators/docking_1d/simulator.py
def reset(self, config):
config = self.get_reset_validator(**config)
self._state.clear()
self.clock = 0.0
# construct entities ("Gets the platform object associated with each simulation entity.")
self.sim_entities = {} # pylint: disable=attribute-defined-outside-init
for agent_id, agent_config in self.config.agent_configs.items():
agent_reset_config = config.platforms.get(agent_id, {})
self.sim_entities[agent_id] = Deputy1D(name=agent_id, **agent_reset_config)
# construct platforms ("Gets the correct backend simulation entity for each agent.")
sim_platforms = []
for agent_id, entity in self.sim_entities.items():
agent_config = self.config.agent_configs[agent_id]
sim_platforms.append(Docking1dPlatform(platform_name=agent_id, platform=entity, parts_list=agent_config.parts_list))
self._state.sim_platforms = tuple(sim_platforms)
self.update_sensor_measurements()
return self._state
save_episode_information(self, dones, rewards, observations)
¤
provides a way to save information about the current episode based on the environment
Source code in corl/simulators/docking_1d/simulator.py
def save_episode_information(self, dones, rewards, observations):
pass
step(self)
¤
advances the simulation platforms and returns the state
Returns:
Type | Description |
---|---|
StateDict -- The state after the simulation updates, has a .sim_platforms attr to access the platforms made by the simulation |
Source code in corl/simulators/docking_1d/simulator.py
def step(self):
for platform in self._state.sim_platforms:
agent_id = platform.name
action = np.array(platform.get_applied_action(), dtype=np.float32)
entity = self.sim_entities[agent_id]
entity.step(action=action, step_size=self.config.step_size)
platform.sim_time = self.clock
self.update_sensor_measurements()
self.clock += self.config.step_size
return self._state
update_sensor_measurements(self)
¤
Update and cache all the measurements of all the sensors on each platform
Source code in corl/simulators/docking_1d/simulator.py
def update_sensor_measurements(self):
"""
Update and cache all the measurements of all the sensors on each platform
"""
for plat in self._state.sim_platforms:
for sensor in plat.sensors:
sensor.calculate_and_cache_measurement(state=self._state.sim_platforms)
Docking1dSimulatorResetValidator (BaseSimulatorResetValidator)
pydantic-model
¤
A Validator for Docking1dSimulatorValidator reset configs
Parameters¤
dict
Contains individual initialization dicts for each agent. Key is platform name, value is platform's initialization dict.
Source code in corl/simulators/docking_1d/simulator.py
class Docking1dSimulatorResetValidator(BaseSimulatorResetValidator):
"""
A Validator for Docking1dSimulatorValidator reset configs
Parameters
----------
platform_config: dict
Contains individual initialization dicts for each agent.
Key is platform name, value is platform's initialization dict.
"""
platform_config: typing.Optional[typing.Dict] = {}
Docking1dSimulatorValidator (BaseSimulatorValidator)
pydantic-model
¤
A validator for the Docking1dSimulatorValidator config.
step_size: A float representing how many simulated seconds pass each time the simulator updates
Source code in corl/simulators/docking_1d/simulator.py
class Docking1dSimulatorValidator(BaseSimulatorValidator):
"""
A validator for the Docking1dSimulatorValidator config.
step_size: A float representing how many simulated seconds pass each time the simulator updates
"""
step_size: float