Skip to content

Obs relative delta controller


Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.

This is a US Government Work not subject to copyright protection in the US.

The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.


AvailablePlatforms

RelativeObsDeltaAction (BaseMultiWrapperGlue) ¤

RelativeObsDeltaAction is a glue class that wraps another glue class. It treats the actions passed to it as a delta from a linked observation E.G. if the wrapped action space has has roll as one of the controls, then a delta action of 0.2 would move the absolute roll position 0.2 higher than it is as measured by the linked roll sensor.

Source code in corl/glues/controller_wrappers/obs_relative_delta_controller.py
class RelativeObsDeltaAction(BaseMultiWrapperGlue):
    """
    RelativeObsDeltaAction is a glue class that wraps another glue class.
    It treats the actions passed to it as a delta from a linked observation
    E.G. if the wrapped action space has has roll as one of the controls, then a delta action of
    0.2 would move the absolute roll position 0.2 higher than it is as measured by the linked roll sensor.
    """

    def __init__(self, **kwargs) -> None:
        self.config: RelativeObsDeltaActionValidator
        super().__init__(**kwargs)

        self._logger = logging.getLogger(RelativeObsDeltaAction.__name__)

        if len(self.glues()) != 2:
            raise RuntimeError(f"Error: RelativeObsDeltaAction expected 2 wrapped glues, got {len(self.glues())}")
        self.controller: ControllerGlue = typing.cast(ControllerGlue, self.glues()[0])
        if not isinstance(self.controller, ControllerGlue):
            raise RuntimeError(
                f"Error: RelativeObsDeltaAction expects the first wrapped glue to be a ControllerGlue, got {self.controller}"
            )
        self.relative_obs_glue: ObserveSensor = typing.cast(ObserveSensor, self.glues()[1])
        if not isinstance(self.relative_obs_glue, ObserveSensor):
            raise RuntimeError(
                f"Error: RelativeObsDeltaAction expects the second wrapped glue to be a ObserveSensor, got {self.relative_obs_glue}"
            )

        # verify that the config setup is not going to get the user into a situation where they are
        # only accessing one part of the obs but applying that obs as the base position for multiple actions
        if self.config.obs_index and len(list(self.controller.action_space().values())[0].low) != 1:
            raise RuntimeError(
                f"ERROR: your glue {self.get_unique_name()} has an action space length of more than 1, "
                "but you specified though obs_index to access only 1 component of the obs "
                "from the wrapped observe Sensor, to fix this error in your config for this glue define 'obs_index': null"
            )

        self.step_size = EnvSpaceUtil.convert_config_param_to_space(
            action_space=self.controller.action_space(), parameter=self.config.step_size
        )

        self._is_wrap = self.config.is_wrap

        self.saved_action_deltas = OrderedDict()
        for space_name, space in self.action_space().items():
            if self.config.initial_value is not None:
                self.saved_action_deltas[space_name] = np.asarray([self.config.initial_value], dtype=np.float32)
            else:
                self.saved_action_deltas[space_name] = space.low

    @property
    def get_validator(self) -> typing.Type[RelativeObsDeltaActionValidator]:
        return RelativeObsDeltaActionValidator

    @lru_cache()
    def get_unique_name(self) -> str:
        """Class method that retreives the unique name for the glue instance
        """
        wrapped_glue_name = self.controller.get_unique_name()
        if wrapped_glue_name is None:
            return None
        return wrapped_glue_name + "RelativeDelta"

    def get_observation(self) -> typing.Union[np.ndarray, typing.Tuple, typing.Dict]:
        return {
            "absolute": self.controller.get_observation(),
            "delta": self.saved_action_deltas,
        }

    @lru_cache()
    def observation_space(self) -> gym.spaces.Space:
        return gym.spaces.Dict({"absolute": self.controller.observation_space(), "delta": self.action_space()})

    @lru_cache()
    def action_space(self) -> gym.spaces.Space:
        """
        Build the action space for the controller, etc.
        """

        # get the action space from the parent
        original_action_space = self.controller.action_space()

        # log the original action space
        self._logger.debug(f"action_space: {original_action_space}")

        # zero mean the space so we can scale it easier
        zero_mean_space = EnvSpaceUtil.zero_mean_space(original_action_space)

        # scale the size of the unbiased space
        for space_name, space in zero_mean_space.items():
            zero_mean_space[space_name] = EnvSpaceUtil.scale_space(space, scale=self.step_size[space_name])

        return zero_mean_space

    # TODO: assumes self.controller._control_properties has unit attribute
    def apply_action(self, action, observation) -> None:
        """
        Apply the action for the controller, etc.
        """

        self._logger.debug(f"apply_action: {action}")

        current_observation = self.relative_obs_glue.get_observation()["direct_observation"]
        # all units in an array must be the same, so this assumption is ok
        obs_units = self.relative_obs_glue.observation_units()["direct_observation"][0]
        if self.config.obs_index:
            current_observation = current_observation[self.config.obs_index]
        assert isinstance(self.controller._control_properties, BoxProp), "Unexpected control_properties type"  # pylint: disable=W0212
        out_unit = self.controller._control_properties.unit[0]  # pylint: disable=W0212
        assert isinstance(out_unit, str)
        unit_converted_obs = Convert(current_observation, obs_units, out_unit)

        new_base_obs = OrderedDict()
        for control in action.keys():
            new_base_obs[control] = unit_converted_obs

        self.saved_action_deltas = action

        absolute_action = EnvSpaceUtil.add_space_samples(
            space_template=self.action_space(),
            space_sample1=action,
            space_sample2=new_base_obs,
        )
        absolute_action = EnvSpaceUtil.clip_space_sample_to_space(absolute_action, self.controller.action_space(), self._is_wrap)

        try:
            self.controller.apply_action(absolute_action, observation)
        except Exception as exc:
            # Purpose - add additional debugging information and re-raise the exception
            raise ValueError(
                f'\n'
                f'action={action}\n'
                f'current_observation={current_observation}\n'
                f'obs_unit={obs_units}\n'
                f'out_unit={out_unit}\n'
                f'action_space={self.action_space()}\n'
                f'controller_action_space={self.controller.action_space()}\n'
                f'is_wrap={self._is_wrap}\n'
            ) from exc

    def get_info_dict(self):
        """
        Get the user specified metadata/metrics/etc.
        """
        return {}

get_validator: Type[corl.glues.controller_wrappers.obs_relative_delta_controller.RelativeObsDeltaActionValidator] property readonly ¤

returns the validator for this class

Returns:

Type Description
Type[corl.glues.controller_wrappers.obs_relative_delta_controller.RelativeObsDeltaActionValidator]

BaseAgentGlueValidator -- A pydantic validator to be used to validate kwargs

action_space(self) ¤

Build the action space for the controller, etc.

Source code in corl/glues/controller_wrappers/obs_relative_delta_controller.py
@lru_cache()
def action_space(self) -> gym.spaces.Space:
    """
    Build the action space for the controller, etc.
    """

    # get the action space from the parent
    original_action_space = self.controller.action_space()

    # log the original action space
    self._logger.debug(f"action_space: {original_action_space}")

    # zero mean the space so we can scale it easier
    zero_mean_space = EnvSpaceUtil.zero_mean_space(original_action_space)

    # scale the size of the unbiased space
    for space_name, space in zero_mean_space.items():
        zero_mean_space[space_name] = EnvSpaceUtil.scale_space(space, scale=self.step_size[space_name])

    return zero_mean_space

apply_action(self, action, observation) ¤

Apply the action for the controller, etc.

Source code in corl/glues/controller_wrappers/obs_relative_delta_controller.py
def apply_action(self, action, observation) -> None:
    """
    Apply the action for the controller, etc.
    """

    self._logger.debug(f"apply_action: {action}")

    current_observation = self.relative_obs_glue.get_observation()["direct_observation"]
    # all units in an array must be the same, so this assumption is ok
    obs_units = self.relative_obs_glue.observation_units()["direct_observation"][0]
    if self.config.obs_index:
        current_observation = current_observation[self.config.obs_index]
    assert isinstance(self.controller._control_properties, BoxProp), "Unexpected control_properties type"  # pylint: disable=W0212
    out_unit = self.controller._control_properties.unit[0]  # pylint: disable=W0212
    assert isinstance(out_unit, str)
    unit_converted_obs = Convert(current_observation, obs_units, out_unit)

    new_base_obs = OrderedDict()
    for control in action.keys():
        new_base_obs[control] = unit_converted_obs

    self.saved_action_deltas = action

    absolute_action = EnvSpaceUtil.add_space_samples(
        space_template=self.action_space(),
        space_sample1=action,
        space_sample2=new_base_obs,
    )
    absolute_action = EnvSpaceUtil.clip_space_sample_to_space(absolute_action, self.controller.action_space(), self._is_wrap)

    try:
        self.controller.apply_action(absolute_action, observation)
    except Exception as exc:
        # Purpose - add additional debugging information and re-raise the exception
        raise ValueError(
            f'\n'
            f'action={action}\n'
            f'current_observation={current_observation}\n'
            f'obs_unit={obs_units}\n'
            f'out_unit={out_unit}\n'
            f'action_space={self.action_space()}\n'
            f'controller_action_space={self.controller.action_space()}\n'
            f'is_wrap={self._is_wrap}\n'
        ) from exc

get_info_dict(self) ¤

Get the user specified metadata/metrics/etc.

Source code in corl/glues/controller_wrappers/obs_relative_delta_controller.py
def get_info_dict(self):
    """
    Get the user specified metadata/metrics/etc.
    """
    return {}

get_observation(self) ¤

Get the actual observation for the platform using the state of the platform, controller, sensors, etc.

Returns¤

EnvSpaceUtil.sample_type The actual observation for this platform from this glue class

Source code in corl/glues/controller_wrappers/obs_relative_delta_controller.py
def get_observation(self) -> typing.Union[np.ndarray, typing.Tuple, typing.Dict]:
    return {
        "absolute": self.controller.get_observation(),
        "delta": self.saved_action_deltas,
    }

get_unique_name(self) ¤

Class method that retreives the unique name for the glue instance

Source code in corl/glues/controller_wrappers/obs_relative_delta_controller.py
@lru_cache()
def get_unique_name(self) -> str:
    """Class method that retreives the unique name for the glue instance
    """
    wrapped_glue_name = self.controller.get_unique_name()
    if wrapped_glue_name is None:
        return None
    return wrapped_glue_name + "RelativeDelta"

RelativeObsDeltaActionValidator (BaseMultiWrapperGlueValidator) pydantic-model ¤

A dict that contains a floating point scalar for each action in the action space,

    by which the corresponding delta action is scaled prior to converting the action
    to the wrapped space.
    e.g. A throttle DeltaAction.apply_action([0.2]) with step_size=[.05] would move the
    absolute throttle position to 0.01 higher than it was at the end of the last step.

Source code in corl/glues/controller_wrappers/obs_relative_delta_controller.py
class RelativeObsDeltaActionValidator(BaseMultiWrapperGlueValidator):
    """
    step_size:      A dict that contains a floating point scalar for each action in the action space,
                    by which the corresponding delta action is scaled prior to converting the action
                    to the wrapped space.
                    e.g. A throttle DeltaAction.apply_action([0.2]) with step_size=[.05] would move the
                    absolute throttle position to 0.01 higher than it was at the end of the last step.
    """
    step_size: float = 1.0
    obs_index: typing.Optional[int] = 0
    is_wrap: bool = False
    initial_value: typing.Optional[float] = None

    @validator("step_size")
    @classmethod
    def check_step_scale(cls, v):
        """
        verifies range of step scale values
        """
        if v >= 1.0 or v < 0:
            raise ValueError("RelativeObsDeltaActionValidator got step size of more that 1.0 or less than 0")
        return v

check_step_scale(v) classmethod ¤

verifies range of step scale values

Source code in corl/glues/controller_wrappers/obs_relative_delta_controller.py
@validator("step_size")
@classmethod
def check_step_scale(cls, v):
    """
    verifies range of step scale values
    """
    if v >= 1.0 or v < 0:
        raise ValueError("RelativeObsDeltaActionValidator got step size of more that 1.0 or less than 0")
    return v