Skip to content

Base parts


Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.

This is a US Government Work not subject to copyright protection in the US.

The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.


Integration module provides abstraction for integration to enable RL environment connect to simulation environments and/or real environments. The concept is that if we build either simulation or real environments from this common interface we can transition between them.

  • For example if there is an Sim1StickController and a Sim2StickController the interface between them is derived from BaseStickController and thus to the policy the interface will be the same.

The base classes have properties which determine what the deriving classes must adhere to in order to comply with the intention of the base class.

  • For example the BasePlatform has a position property which has a position_properties (a MultiBoxProp object) that determines the ranges and types of the position property for the BasePlatform.

To differentiate from the agent properties, base integration nomenclature uses control and measurements for the base parts, in contrast to the nomenclature of the agent which is usually action and observation. In this sense actions are made up of controls, and observations are made up of measurements.

BaseController (BasePlatformPart, ABC) ¤

BaseController base abstraction for a controller. A controller is used to move a platform with action commands. The actions are usually changing the desired rates or applied forces to the platform.

Source code in corl/simulators/base_parts.py
class BaseController(BasePlatformPart, abc.ABC):
    """
    BaseController base abstraction for a controller. A controller is used to move a platform with action commands.
    The actions are usually changing the desired rates or applied forces to the platform.
    """

    @property
    def control_properties(self) -> Prop:
        """
        The properties of the control given to the apply_control function

        Returns
        -------
        Prop
            The properties of the control given tot he apply_control function
        """
        return self._properties

    def validate_control(self, control: np.ndarray) -> None:
        """
        The generic method to validate a control for this controller.

        Parameters
        ----------
        control
            The control to be validated
        """
        if not self.control_properties.create_space().contains(control):
            raise ValueError(f"{type(self).__name__} control {control} not in space {self.control_properties.create_space()} values")

    @abc.abstractmethod
    def apply_control(self, control: np.ndarray) -> None:
        """
        The generic method to apply the control for this controller.

        Parameters
        ----------
        control
            The control to be executed by the controller
        """
        ...

    @abc.abstractmethod
    def get_applied_control(self) -> typing.Union[np.ndarray, numbers.Number]:
        """
        Get the previously applied control that was given to the apply_control function
        Returns
        -------
        previously applied control that was given to the apply_control function
        """

    def get_validated_applied_control(self) -> typing.Union[np.ndarray, numbers.Number]:
        """
        Get the previously applied control with nan check
        Returns
        -------
        previously applied control that was given to the apply_control function
        """
        return nan_check_result(self.get_applied_control())

control_properties: Prop property readonly ¤

The properties of the control given to the apply_control function

Returns¤

Prop The properties of the control given tot he apply_control function

apply_control(self, control) ¤

The generic method to apply the control for this controller.

Parameters¤

control The control to be executed by the controller

Source code in corl/simulators/base_parts.py
@abc.abstractmethod
def apply_control(self, control: np.ndarray) -> None:
    """
    The generic method to apply the control for this controller.

    Parameters
    ----------
    control
        The control to be executed by the controller
    """
    ...

get_applied_control(self) ¤

Get the previously applied control that was given to the apply_control function Returns


previously applied control that was given to the apply_control function

Source code in corl/simulators/base_parts.py
@abc.abstractmethod
def get_applied_control(self) -> typing.Union[np.ndarray, numbers.Number]:
    """
    Get the previously applied control that was given to the apply_control function
    Returns
    -------
    previously applied control that was given to the apply_control function
    """

get_validated_applied_control(self) ¤

Get the previously applied control with nan check Returns


previously applied control that was given to the apply_control function

Source code in corl/simulators/base_parts.py
def get_validated_applied_control(self) -> typing.Union[np.ndarray, numbers.Number]:
    """
    Get the previously applied control with nan check
    Returns
    -------
    previously applied control that was given to the apply_control function
    """
    return nan_check_result(self.get_applied_control())

validate_control(self, control) ¤

The generic method to validate a control for this controller.

Parameters¤

control The control to be validated

Source code in corl/simulators/base_parts.py
def validate_control(self, control: np.ndarray) -> None:
    """
    The generic method to validate a control for this controller.

    Parameters
    ----------
    control
        The control to be validated
    """
    if not self.control_properties.create_space().contains(control):
        raise ValueError(f"{type(self).__name__} control {control} not in space {self.control_properties.create_space()} values")

BasePlatformPart (ABC) ¤

BasePlatformPart abstract class for the classes that will be part of the BasePlatform. This includes controls,and sensors

Source code in corl/simulators/base_parts.py
class BasePlatformPart(abc.ABC):
    """
    BasePlatformPart abstract class for the classes that will be part of the BasePlatform.
    This includes controls,and sensors
    """

    def __init__(self, parent_platform, config, property_class) -> None:
        config["part_class"] = self.__class__
        self.config = self.get_validator(**config)
        self._properties = property_class(**self.config.properties)
        self._parent_platform = parent_platform
        self._valid = self.config.initial_validity

    @property
    def valid(self) -> bool:
        """
        Signifies if measurements from this part should be trusted as having
        data that is valid or important to training

        Returns
        -------
        bool
            The current state of the part being valid or not
        """
        return self._valid

    def set_valid(self) -> None:
        """
        Signifies that this part should transition to being valid
        """
        self._valid = True

    def set_invalid(self) -> None:
        """
        Signifies that this part should transition to being invalid
        """
        self._valid = False

    @property
    def name(self) -> typing.Optional[str]:
        """
        The name for this platform part

        Returns
        -------
        str
            The name for this platform part
        """
        return self.config.name

    @property
    def parent_platform(self) -> 'BasePlatform':  # type: ignore  # noqa: F821
        """
        The parent platform this platform part is attached to

        Returns
        -------
        BasePlatform
            The parent platform this platform part is attached to
        """
        return self._parent_platform

    @property
    def exclusiveness(self) -> typing.Set[str]:
        """Return exclusiveness"""
        return set()

    @property
    def get_validator(self) -> typing.Type[BasePlatformPartValidator]:
        """
        return the validator that will be used on the configuration
        of this part
        """
        return BasePlatformPartValidator

    @classmethod
    def embed_properties(cls, property_class: typing.Type[Prop]) -> typing.Type[BasePlatformPart]:
        """Embed the properties in the class definition."""

        class DynamicPlatformPart(cls):  # type: ignore[valid-type,misc]  # pylint: disable=missing-class-docstring

            def __init__(self, parent_platform, config) -> None:
                super().__init__(parent_platform=parent_platform, config=config, property_class=property_class)

        DynamicPlatformPart.__doc__ = cls.__doc__
        DynamicPlatformPart.__name__ += f':{cls.__name__}:{property_class.__name__}'
        DynamicPlatformPart.__qualname__ += f':{cls.__name__}:{property_class.__name__}'
        DynamicPlatformPart.embedded_properties = property_class

        return DynamicPlatformPart

exclusiveness: Set[str] property readonly ¤

Return exclusiveness

get_validator: Type[BasePlatformPartValidator] property readonly ¤

return the validator that will be used on the configuration of this part

name: Optional[str] property readonly ¤

The name for this platform part

Returns¤

str The name for this platform part

parent_platform: 'BasePlatform' property readonly ¤

The parent platform this platform part is attached to

Returns¤

BasePlatform The parent platform this platform part is attached to

valid: bool property readonly ¤

Signifies if measurements from this part should be trusted as having data that is valid or important to training

Returns¤

bool The current state of the part being valid or not

embed_properties(property_class) classmethod ¤

Embed the properties in the class definition.

Source code in corl/simulators/base_parts.py
@classmethod
def embed_properties(cls, property_class: typing.Type[Prop]) -> typing.Type[BasePlatformPart]:
    """Embed the properties in the class definition."""

    class DynamicPlatformPart(cls):  # type: ignore[valid-type,misc]  # pylint: disable=missing-class-docstring

        def __init__(self, parent_platform, config) -> None:
            super().__init__(parent_platform=parent_platform, config=config, property_class=property_class)

    DynamicPlatformPart.__doc__ = cls.__doc__
    DynamicPlatformPart.__name__ += f':{cls.__name__}:{property_class.__name__}'
    DynamicPlatformPart.__qualname__ += f':{cls.__name__}:{property_class.__name__}'
    DynamicPlatformPart.embedded_properties = property_class

    return DynamicPlatformPart

set_invalid(self) ¤

Signifies that this part should transition to being invalid

Source code in corl/simulators/base_parts.py
def set_invalid(self) -> None:
    """
    Signifies that this part should transition to being invalid
    """
    self._valid = False

set_valid(self) ¤

Signifies that this part should transition to being valid

Source code in corl/simulators/base_parts.py
def set_valid(self) -> None:
    """
    Signifies that this part should transition to being valid
    """
    self._valid = True

BasePlatformPartValidator (BaseModel) pydantic-model ¤

name: the optional name for this part, the class name will be used otherwise

Source code in corl/simulators/base_parts.py
class BasePlatformPartValidator(BaseModel):
    """
    name: the optional name for this part, the class name
    will be used otherwise
    """
    part_class: PyObject
    name: typing.Optional[str] = None
    initial_validity: bool = True
    properties: typing.Optional[typing.Dict] = dict()

    @validator('name', always=True)
    def check_name(cls, v, values):
        """Check if agent subclass AgentBase"""
        if v is None:
            assert 'part_class' in values
            v = PluginLibrary.FindGroup(values['part_class'])
        return v

check_name(v, values) classmethod ¤

Check if agent subclass AgentBase

Source code in corl/simulators/base_parts.py
@validator('name', always=True)
def check_name(cls, v, values):
    """Check if agent subclass AgentBase"""
    if v is None:
        assert 'part_class' in values
        v = PluginLibrary.FindGroup(values['part_class'])
    return v

BaseSensor (BasePlatformPart, ABC) ¤

BaseSensor base abstraction for a sensor. A sensor is a attached to a platform and provides information about the environment.

Source code in corl/simulators/base_parts.py
class BaseSensor(BasePlatformPart, abc.ABC):
    """
    BaseSensor base abstraction for a sensor. A sensor is a attached to a platform
    and provides information about the environment.
    """

    def __init__(self, parent_platform, config, property_class) -> None:
        super().__init__(parent_platform=parent_platform, config=config, property_class=property_class)
        self._last_measurement: typing.Optional[typing.Union[np.ndarray, typing.Tuple, typing.Dict]] = None

    @property
    def measurement_properties(self) -> Prop:
        """
        The properties of the object returned by the get_measurement function

        Returns
        -------
        Prop
            The properties of the measurement returned by the get_measurement function
        """
        return self._properties

    @abc.abstractmethod
    def _calculate_measurement(self, state: typing.Tuple) -> typing.Union[np.ndarray, typing.Tuple, typing.Dict]:
        """
        The generic method to get calculate measurements from this sensor. This is used to calculate
         the measurement to be returned by
        the get_measurement function. This allows caching the measurement to avoid re-calculation

        Parameters
        ----------
        state: typing.Tuple
            The current state of the environment used to obtain the measurement

        Returns
        -------
        typing.Union[np.ndarray, typing.Tuple, typing.Dict]
            The measurements from this sensor
        """
        ...

    def calculate_and_cache_measurement(self, state: typing.Tuple):
        """
        Calculates the measurement and caches the result in the _last_measurement variable

        Parameters
        ----------
        state: typing.Tuple
            The current state of the environment used to obtain the measurement
        """
        measurement = self._calculate_measurement(state)
        try:
            nan_check_result(measurement, True)
            self._last_measurement = measurement
        except ValueError:
            warnings.warn(
                "The mover has produced a state that is invalid - NaNs --- Code is going to set broken/damaged - Reuse last state"
            )
            # raise ValueError(f"Error calculating Measurement in {self.__class__}\n" f"Measurement: {self._last_measurement}\n") from err
        if self._last_measurement is None:
            raise ValueError('Measurement is None')

    def get_measurement(self) -> typing.Union[np.ndarray, typing.Tuple, typing.Dict, typing.List]:
        """
        The generic method to get measurements from this sensor.

        Returns
        -------
        typing.Union[np.ndarray, typing.Tuple, typing.Dict]
            The measurements from this sensor
        """
        if self._last_measurement is None:
            raise ValueError(f'Measurement is None - may also want to check operable states - ({type(self)})')
        return self._last_measurement

measurement_properties: Prop property readonly ¤

The properties of the object returned by the get_measurement function

Returns¤

Prop The properties of the measurement returned by the get_measurement function

calculate_and_cache_measurement(self, state) ¤

Calculates the measurement and caches the result in the _last_measurement variable

Parameters¤

typing.Tuple

The current state of the environment used to obtain the measurement

Source code in corl/simulators/base_parts.py
def calculate_and_cache_measurement(self, state: typing.Tuple):
    """
    Calculates the measurement and caches the result in the _last_measurement variable

    Parameters
    ----------
    state: typing.Tuple
        The current state of the environment used to obtain the measurement
    """
    measurement = self._calculate_measurement(state)
    try:
        nan_check_result(measurement, True)
        self._last_measurement = measurement
    except ValueError:
        warnings.warn(
            "The mover has produced a state that is invalid - NaNs --- Code is going to set broken/damaged - Reuse last state"
        )
        # raise ValueError(f"Error calculating Measurement in {self.__class__}\n" f"Measurement: {self._last_measurement}\n") from err
    if self._last_measurement is None:
        raise ValueError('Measurement is None')

get_measurement(self) ¤

The generic method to get measurements from this sensor.

Returns¤

typing.Union[np.ndarray, typing.Tuple, typing.Dict] The measurements from this sensor

Source code in corl/simulators/base_parts.py
def get_measurement(self) -> typing.Union[np.ndarray, typing.Tuple, typing.Dict, typing.List]:
    """
    The generic method to get measurements from this sensor.

    Returns
    -------
    typing.Union[np.ndarray, typing.Tuple, typing.Dict]
        The measurements from this sensor
    """
    if self._last_measurement is None:
        raise ValueError(f'Measurement is None - may also want to check operable states - ({type(self)})')
    return self._last_measurement

BaseTimeSensor (BaseSensor) ¤

Base type for a time sensor

Source code in corl/simulators/base_parts.py
class BaseTimeSensor(BaseSensor):
    """Base type for a time sensor

    Arguments:
        BaseSensor -- The base class type for the sensor
    """

    def __init__(self, parent_platform, config: typing.Dict = None):
        super().__init__(parent_platform, config, base_props.TimeProp)

CommandWithArgs (BaseModel) pydantic-model ¤

Model for a command with its argumets.

Source code in corl/simulators/base_parts.py
class CommandWithArgs(BaseModel):  # type: ignore[no-redef]
    """Model for a command with its argumets."""
    command: str
    args: typing.List[typing.Any] = []
    kwargs: typing.Dict[str, typing.Any] = {}

MutuallyExclusiveParts ¤

Class to check to see/controll mutually exclusive parts

Source code in corl/simulators/base_parts.py
class MutuallyExclusiveParts():
    """
    Class to check to see/controll mutually exclusive parts
    """

    def __init__(self, exclusive_parts, allow_other_keys=False):
        """
        exclusive_parts: The parts to check exclusivity
        allow_other_keys: allows exclusivity other than the parts defined here
        """
        self._exclusive_parts = exclusive_parts
        self.allow_other_keys = allow_other_keys

    def are_platform_parts_mutually_exclusive(self, *args: BasePlatformPart):
        """Checks to see if platform parts are mutually exclusive

        Returns:
            [type] -- [description]
        """
        parts = [platform_part.exclusiveness for platform_part in args]
        return self.are_parts_mutually_exclusive(*parts)

    def are_parts_mutually_exclusive(self, *args):
        """[summary]

        Returns:
            [type] -- [description]
        """
        total_set = set()
        for part_exclusive_set in args:
            for part_exclusiveness in part_exclusive_set:
                if part_exclusiveness in total_set:
                    return False
                if part_exclusiveness in self._exclusive_parts:
                    total_set.add(part_exclusiveness)
                elif not self.allow_other_keys:
                    raise RuntimeError(
                        f"Error: you attempted to add a part with the exclusivness {part_exclusiveness}, but "
                        f"this plaforms exclusive parts for this component type were {self._exclusive_parts}, "
                        "and the platform specified to not allow other exclusivity"
                    )
        return True

    def get_duplicate_parts(self, *args):
        """[summary]

        Returns:
            [type] -- [description]
        """
        total_list = []
        for mutually_exclusive_part in args:
            total_list += list(mutually_exclusive_part.exclusiveness)
        ret_list = []
        for exclusive_key in self._exclusive_parts:
            if total_list.count(exclusive_key) > 1:
                ret_list.append(exclusive_key)
        return ret_list

__init__(self, exclusive_parts, allow_other_keys=False) special ¤

exclusive_parts: The parts to check exclusivity allow_other_keys: allows exclusivity other than the parts defined here

Source code in corl/simulators/base_parts.py
def __init__(self, exclusive_parts, allow_other_keys=False):
    """
    exclusive_parts: The parts to check exclusivity
    allow_other_keys: allows exclusivity other than the parts defined here
    """
    self._exclusive_parts = exclusive_parts
    self.allow_other_keys = allow_other_keys

are_parts_mutually_exclusive(self, *args) ¤

[summary]

Returns:

Type Description

[type] -- [description]

Source code in corl/simulators/base_parts.py
def are_parts_mutually_exclusive(self, *args):
    """[summary]

    Returns:
        [type] -- [description]
    """
    total_set = set()
    for part_exclusive_set in args:
        for part_exclusiveness in part_exclusive_set:
            if part_exclusiveness in total_set:
                return False
            if part_exclusiveness in self._exclusive_parts:
                total_set.add(part_exclusiveness)
            elif not self.allow_other_keys:
                raise RuntimeError(
                    f"Error: you attempted to add a part with the exclusivness {part_exclusiveness}, but "
                    f"this plaforms exclusive parts for this component type were {self._exclusive_parts}, "
                    "and the platform specified to not allow other exclusivity"
                )
    return True

are_platform_parts_mutually_exclusive(self, *args) ¤

Checks to see if platform parts are mutually exclusive

Returns:

Type Description

[type] -- [description]

Source code in corl/simulators/base_parts.py
def are_platform_parts_mutually_exclusive(self, *args: BasePlatformPart):
    """Checks to see if platform parts are mutually exclusive

    Returns:
        [type] -- [description]
    """
    parts = [platform_part.exclusiveness for platform_part in args]
    return self.are_parts_mutually_exclusive(*parts)

get_duplicate_parts(self, *args) ¤

[summary]

Returns:

Type Description

[type] -- [description]

Source code in corl/simulators/base_parts.py
def get_duplicate_parts(self, *args):
    """[summary]

    Returns:
        [type] -- [description]
    """
    total_list = []
    for mutually_exclusive_part in args:
        total_list += list(mutually_exclusive_part.exclusiveness)
    ret_list = []
    for exclusive_key in self._exclusive_parts:
        if total_list.count(exclusive_key) > 1:
            ret_list.append(exclusive_key)
    return ret_list

NoOpController (BaseController) ¤

NoOpController controller define by empty prop can be use when simulator is creating actions for an agent.

Source code in corl/simulators/base_parts.py
class NoOpController(BaseController):
    """
    NoOpController controller define by empty prop can be use when simulator is
    creating actions for an agent.
    """

    def __init__(self, parent_platform, config, property_class) -> None:
        self.config: NoOpControllerValidator
        super().__init__(parent_platform=parent_platform, config=config, property_class=property_class)
        self.new_control = np.array([], dtype=np.float32)

        for command_data in self.config.platform_init_commands:
            func = attrgetter(command_data.command)
            func(self.parent_platform)(*command_data.args, **command_data.kwargs)

    @property
    def get_validator(self) -> typing.Type[NoOpControllerValidator]:
        """Validator for NoOpController"""
        return NoOpControllerValidator

    def apply_control(self, control: np.ndarray) -> None:
        """
        The generic method to apply the control for this controller.

        Parameters
        ----------
        control
            The control to be executed by the controller
        """
        self.new_control = control

    def get_applied_control(self):
        return self.new_control

get_validator: Type[NoOpControllerValidator] property readonly ¤

Validator for NoOpController

apply_control(self, control) ¤

The generic method to apply the control for this controller.

Parameters¤

control The control to be executed by the controller

Source code in corl/simulators/base_parts.py
def apply_control(self, control: np.ndarray) -> None:
    """
    The generic method to apply the control for this controller.

    Parameters
    ----------
    control
        The control to be executed by the controller
    """
    self.new_control = control

get_applied_control(self) ¤

Get the previously applied control that was given to the apply_control function Returns


previously applied control that was given to the apply_control function

Source code in corl/simulators/base_parts.py
def get_applied_control(self):
    return self.new_control

NoOpControllerValidator (BasePlatformPartValidator) pydantic-model ¤

Validator for NoOpController

platform_init_commands: Allow the user to specify platform initialization commands. The primary purpose of this is to allow verification testing between different simulators under different initial conditions without the complexity of actions being sent to those simulators.

Dot notation is supported, so foo.bar.baz will call the method self.parent_platform.foo.bar.baz().

Source code in corl/simulators/base_parts.py
class NoOpControllerValidator(BasePlatformPartValidator):
    """Validator for NoOpController

    platform_init_commands: Allow the user to specify platform initialization commands.  The primary purpose of this is to allow
    verification testing between different simulators under different initial conditions without the complexity of actions being
    sent to those simulators.

    Dot notation is supported, so `foo.bar.baz` will call the method `self.parent_platform.foo.bar.baz()`.

    """
    platform_init_commands: typing.List[CommandWithArgs] = []

    @validator('platform_init_commands', pre=True, each_item=True)
    def expand_command(cls, v):
        """Convert simple string form to full form with arguments."""
        if isinstance(v, str):
            return {'command': v}
        return v

expand_command(v) classmethod ¤

Convert simple string form to full form with arguments.

Source code in corl/simulators/base_parts.py
@validator('platform_init_commands', pre=True, each_item=True)
def expand_command(cls, v):
    """Convert simple string form to full form with arguments."""
    if isinstance(v, str):
        return {'command': v}
    return v