Skip to content

Gym simulator


Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.

This is a US Government Work not subject to copyright protection in the US.

The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.


Base Simulator and Platform for Toy Openai Environments This mainly shows a "how to use example" and provide an setup to unit test with

GymAgentConfig (AgentConfig) pydantic-model ¤

any configuration needed for the simulator to

initialize this platform and configure it in the sim class

a list of tuples where the first element is come python class path

    of a BasePart, and then the second element is a configuration dictionary for that part

Source code in corl/simulators/openai_gym/gym_simulator.py
class GymAgentConfig(AgentConfig):
    """
    platform_config: any configuration needed for the simulator to
                initialize this platform and configure it in the sim class
    parts_list: a list of tuples where the first element is come python class path
                    of a BasePart, and then the second element is a configuration dictionary for that part

    Arguments:
        BaseModel {[type]} -- [description]
    """
    platform_config: GymPlatformConfig

GymPlatformConfig (BaseModel) pydantic-model ¤

GymPlatformSimConfig

Source code in corl/simulators/openai_gym/gym_simulator.py
class GymPlatformConfig(BaseModel):
    """
    GymPlatformSimConfig
    """
    platform_class: PyObject

GymPlatformValidator (BasePlatformValidator) pydantic-model ¤

GymPlatformValidator

Parameters¤

!!! platform "gym.Env"
    Gym env associated with GymPlatform
Source code in corl/simulators/openai_gym/gym_simulator.py
class GymPlatformValidator(BasePlatformValidator):
    """GymPlatformValidator

    Parameters
    ----------
        platform: gym.Env
            Gym env associated with GymPlatform
    """
    platform: gym.Env

OpenAIGymSimulator (BaseSimulator) ¤

Simulator backend for running openai Gyms

Source code in corl/simulators/openai_gym/gym_simulator.py
class OpenAIGymSimulator(BaseSimulator):
    """
    Simulator backend for running openai Gyms
    """

    @property
    def get_simulator_validator(self) -> typing.Type[OpenAIGymSimulatorValidator]:
        """Return validator"""
        return OpenAIGymSimulatorValidator

    def __init__(self, **kwargs) -> None:
        self.config: OpenAIGymSimulatorValidator
        super().__init__(**kwargs)
        self._state = StateDict()
        self.gym_env_dict = {}
        for agent_name in self.config.agent_configs:
            env = gym.make(self.config.gym_env, **self.config.gym_configs)
            for wrapper_cls in self.config.wrappers:
                env = wrapper_cls(env)
            self.gym_env_dict[agent_name] = env
            self.gym_env_dict[agent_name].seed(self.config.seed)
        self.sim_platforms: typing.List = []
        self._time = 0.0

    def get_platforms(self):
        """
        gets the current state of the simulation and makes the sim platforms

        Returns:
            typing.List[OpenAiGymPlatform] -- the list of openai gym platforms
        """
        sim_platforms = []
        for agent_name, agent_env in self.gym_env_dict.items():
            sim_platforms.append(
                self.config.agent_configs[agent_name].platform_config.platform_class(
                    platform_name=agent_name,
                    platform=agent_env,
                    parts_list=self.config.agent_configs[agent_name].parts_list,
                    disable_exclusivity_check=self.config.disable_exclusivity_check,
                )
            )
        return sim_platforms

    def update_sensor_measurements(self):
        """
        Update and caches all the measurements of all the sensors on each platform
        """
        for plat in self.sim_platforms:
            for sensor in plat.sensors:
                sensor.calculate_and_cache_measurement(state=self._state)

    def reset(self, config):
        self._time = 0.0
        self._state.clear()
        self._state.obs = {}
        self._state.rewards = {}
        self._state.dones = {}
        self._state.info = {}
        for agent_name, agent_env in self.gym_env_dict.items():
            self._state.obs[agent_name] = agent_env.reset()

        self.sim_platforms = self.get_platforms()
        self.update_sensor_measurements()
        return self._state

    def step(self):
        for sim_platform in self.sim_platforms:
            agent_name = sim_platform.name
            if sim_platform.operable:
                tmp = self.gym_env_dict[agent_name].step(sim_platform.get_applied_action())
                self._state.obs[agent_name] = tmp[0]
                self._state.rewards[agent_name] = tmp[1]
                self._state.dones[agent_name] = tmp[2]
                self._state.info[agent_name] = tmp[3]
                if self._state.dones[agent_name]:
                    sim_platform.operable = False

        self.update_sensor_measurements()
        self._time += 1
        return self._state

    @property
    def sim_time(self) -> float:
        return self._time

    @property
    def platforms(self) -> typing.List:
        return self.sim_platforms

    def mark_episode_done(self, done_info, episode_state):
        pass

    def save_episode_information(self, dones, rewards, observations):
        pass

    def render(self, state, mode="human"):  # pylint: disable=unused-argument
        """only render first environment
        """
        agent = self.gym_env_dict.keys()[0]
        self.gym_env_dict[agent].render(mode)

get_simulator_validator: Type[corl.simulators.openai_gym.gym_simulator.OpenAIGymSimulatorValidator] property readonly ¤

Return validator

platforms: List property readonly ¤

returns a list of platforms in the simulation

Returns:

Type Description
List

list of platforms

sim_time: float property readonly ¤

returns the time

Returns:

Type Description
float

float - time

get_platforms(self) ¤

gets the current state of the simulation and makes the sim platforms

Returns:

Type Description

typing.List[OpenAiGymPlatform] -- the list of openai gym platforms

Source code in corl/simulators/openai_gym/gym_simulator.py
def get_platforms(self):
    """
    gets the current state of the simulation and makes the sim platforms

    Returns:
        typing.List[OpenAiGymPlatform] -- the list of openai gym platforms
    """
    sim_platforms = []
    for agent_name, agent_env in self.gym_env_dict.items():
        sim_platforms.append(
            self.config.agent_configs[agent_name].platform_config.platform_class(
                platform_name=agent_name,
                platform=agent_env,
                parts_list=self.config.agent_configs[agent_name].parts_list,
                disable_exclusivity_check=self.config.disable_exclusivity_check,
            )
        )
    return sim_platforms

mark_episode_done(self, done_info, episode_state) ¤

Takes in the done_info specifying how the episode completed and does any book keeping around ending an episode

Source code in corl/simulators/openai_gym/gym_simulator.py
def mark_episode_done(self, done_info, episode_state):
    pass

render(self, state, mode='human') ¤

only render first environment

Source code in corl/simulators/openai_gym/gym_simulator.py
def render(self, state, mode="human"):  # pylint: disable=unused-argument
    """only render first environment
    """
    agent = self.gym_env_dict.keys()[0]
    self.gym_env_dict[agent].render(mode)

reset(self, config) ¤

reset resets the simulation and sets up a new episode

Returns:

Type Description

StateDict -- The simulation state, has a .sim_platforms attr to access the platforms made by the simulation

Source code in corl/simulators/openai_gym/gym_simulator.py
def reset(self, config):
    self._time = 0.0
    self._state.clear()
    self._state.obs = {}
    self._state.rewards = {}
    self._state.dones = {}
    self._state.info = {}
    for agent_name, agent_env in self.gym_env_dict.items():
        self._state.obs[agent_name] = agent_env.reset()

    self.sim_platforms = self.get_platforms()
    self.update_sensor_measurements()
    return self._state

save_episode_information(self, dones, rewards, observations) ¤

provides a way to save information about the current episode based on the environment

Source code in corl/simulators/openai_gym/gym_simulator.py
def save_episode_information(self, dones, rewards, observations):
    pass

step(self) ¤

advances the simulation platforms and returns the state

Returns:

Type Description

StateDict -- The state after the simulation updates, has a .sim_platforms attr to access the platforms made by the simulation

Source code in corl/simulators/openai_gym/gym_simulator.py
def step(self):
    for sim_platform in self.sim_platforms:
        agent_name = sim_platform.name
        if sim_platform.operable:
            tmp = self.gym_env_dict[agent_name].step(sim_platform.get_applied_action())
            self._state.obs[agent_name] = tmp[0]
            self._state.rewards[agent_name] = tmp[1]
            self._state.dones[agent_name] = tmp[2]
            self._state.info[agent_name] = tmp[3]
            if self._state.dones[agent_name]:
                sim_platform.operable = False

    self.update_sensor_measurements()
    self._time += 1
    return self._state

update_sensor_measurements(self) ¤

Update and caches all the measurements of all the sensors on each platform

Source code in corl/simulators/openai_gym/gym_simulator.py
def update_sensor_measurements(self):
    """
    Update and caches all the measurements of all the sensors on each platform
    """
    for plat in self.sim_platforms:
        for sensor in plat.sensors:
            sensor.calculate_and_cache_measurement(state=self._state)

OpenAIGymSimulatorValidator (BaseSimulatorValidator) pydantic-model ¤

Validator for OpenAIGymSimulatorValidator

the name of a gym environment registered to the gym

registry

Source code in corl/simulators/openai_gym/gym_simulator.py
class OpenAIGymSimulatorValidator(BaseSimulatorValidator):  # pylint: disable=too-few-public-methods
    """
    Validator for OpenAIGymSimulatorValidator

    gym_env: the name of a gym environment registered to the gym
            registry
    """
    # todo: maybe switch this to a PyObject and do a validator that it
    # implements gym.core.Env
    gym_env: str
    gym_configs: typing.Mapping[str, typing.Optional[typing.Union[bool, float, int, str]]] = {}
    seed: int = 1
    agent_configs: typing.Mapping[str, GymAgentConfig]
    wrappers: typing.List[PyObject] = []

OpenAiGymInclusivePartsPlatform (OpenAiGymPlatform) ¤

The OpenAiGymInclusivePartsPlatform mirrors OpenAiGymPlatform but without mutually exclusive parts

Source code in corl/simulators/openai_gym/gym_simulator.py
class OpenAiGymInclusivePartsPlatform(OpenAiGymPlatform):
    """
    The OpenAiGymInclusivePartsPlatform mirrors OpenAiGymPlatform but without
    mutually exclusive parts
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        if isinstance(self.action_space, gym.spaces.Discrete):
            self._last_applied_action = 0
        elif isinstance(self.action_space, gym.spaces.Box):
            self._last_applied_action = self.action_space.low

        self._operable = True

OpenAiGymPlatform (BasePlatform) ¤

The OpenAiGymPlatform wraps some gym environment as it's platform and allows for saving an action to the platform for when the platform needs to give an action to the environment during the environment step function

Source code in corl/simulators/openai_gym/gym_simulator.py
class OpenAiGymPlatform(BasePlatform):
    """
    The OpenAiGymPlatform wraps some gym environment as it's platform and
    allows for saving an action to the platform for when the platform needs
    to give an action to the environment during the environment step function
    """

    def __init__(self, **kwargs):
        kwargs["exclusive_part_dict"] = {
            BaseController: MutuallyExclusiveParts({"main_controller"}), BaseSensor: MutuallyExclusiveParts({"state_sensor"})
        }

        # hack to get this working until platforms are fixed
        self.config: GymPlatformValidator = self.get_validator(**kwargs)
        self._platform = self.config.platform

        super().__init__(**kwargs)

        if isinstance(self.action_space, gym.spaces.Discrete):
            self._last_applied_action = 0
        elif isinstance(self.action_space, gym.spaces.Box):
            self._last_applied_action = self.action_space.low

        self._operable = True

    @property
    def get_validator(self) -> typing.Type[GymPlatformValidator]:
        return GymPlatformValidator

    @property
    def observation_space(self):
        """
        Provides the observation space for a sensor to use

        Returns:
            gym.Space -- the observation space of the platform gym environment
        """
        return self._platform.observation_space

    @property
    def action_space(self):
        """
        Provides the action space for a controller to use

        Returns:
            gym.Space -- the action space of the platform gym environment
        """
        return self._platform.action_space

    def get_applied_action(self):
        """returns the action stored in this platform

        Returns:
            typing.Any -- any sort of stored action
        """
        return self._last_applied_action

    def save_action_to_platform(self, action):
        """
        saves an action to the platform if it matches
        the action space

        Arguments:
            action typing.Any -- The action to store in the platform

        Raises:
            RuntimeError: if the action attempted to be stored does not match
                        the environments action space
        """
        if not self.action_space.contains(action):
            raise RuntimeError("Error: action attempting to be stored in platform does not match platforms action space")
        self._last_applied_action = action

    @property
    def operable(self):
        return self._operable

    @operable.setter
    def operable(self, value):
        self._operable = value

action_space property readonly ¤

Provides the action space for a controller to use

Returns:

Type Description

gym.Space -- the action space of the platform gym environment

get_validator: Type[corl.simulators.openai_gym.gym_simulator.GymPlatformValidator] property readonly ¤

get validator for this BasePlatform

Returns:

Type Description
Type[corl.simulators.openai_gym.gym_simulator.GymPlatformValidator]

BasePlatformValidator -- validator the platform will use to generate a configuration

observation_space property readonly ¤

Provides the observation space for a sensor to use

Returns:

Type Description

gym.Space -- the observation space of the platform gym environment

operable property writable ¤

Is the platform operable?

Returns¤

bool Is the platform operable?

get_applied_action(self) ¤

returns the action stored in this platform

Returns:

Type Description

typing.Any -- any sort of stored action

Source code in corl/simulators/openai_gym/gym_simulator.py
def get_applied_action(self):
    """returns the action stored in this platform

    Returns:
        typing.Any -- any sort of stored action
    """
    return self._last_applied_action

save_action_to_platform(self, action) ¤

saves an action to the platform if it matches the action space

Exceptions:

Type Description
RuntimeError

if the action attempted to be stored does not match the environments action space

Source code in corl/simulators/openai_gym/gym_simulator.py
def save_action_to_platform(self, action):
    """
    saves an action to the platform if it matches
    the action space

    Arguments:
        action typing.Any -- The action to store in the platform

    Raises:
        RuntimeError: if the action attempted to be stored does not match
                    the environments action space
    """
    if not self.action_space.contains(action):
        raise RuntimeError("Error: action attempting to be stored in platform does not match platforms action space")
    self._last_applied_action = action