Skip to content

Sensor bounds check done


Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.

This is a US Government Work not subject to copyright protection in the US.

The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.


SensorBoundsCheckDone (DoneFuncBase) ¤

Checks to see if a specified state parameter is within bounds

Source code in corl/dones/sensor_bounds_check_done.py
class SensorBoundsCheckDone(DoneFuncBase):
    """
    Checks to see if a specified state parameter is within bounds
    """

    # True means that clients should pass ValueWithUnits rather than evaluated value
    REQUIRED_UNITS = {'min_value': True, 'max_value': True, 'sensor_name': NoneUnitType.NoneUnit}

    def __init__(self, **kwargs) -> None:
        self.config: SensorBoundsCheckDoneValidator
        super().__init__(**kwargs)
        if self.config.name == type(self).__name__:
            self.config.name = self.config.sensor_name + '_Done'

    @property
    def get_validator(self) -> typing.Type[SensorBoundsCheckDoneValidator]:
        """Returns the validator for this done condition"""
        return SensorBoundsCheckDoneValidator

    def __call__(
        self,
        observation: OrderedDict,
        action: OrderedDict,
        next_observation: OrderedDict,
        next_state: StateDict,
        observation_space: StateDict,
        observation_units: StateDict,
    ) -> DoneDict:

        done = DoneDict()

        # Find Target Platform
        platform = get_platform_by_name(next_state, self.platform, allow_invalid=True)

        # platform does not exist
        if platform is None:
            done[self.platform] = False
            return done

        # Find target Sensor
        sensor = get_sensor_by_name(platform, self.config.sensor_name)
        if not isinstance(sensor.measurement_properties, BoxProp):
            raise TypeError(f'Can only do bounds checking on BoxProp, received {type(sensor.measurement_properties).__name__}')

        # Get measured value
        measurement = sensor.get_measurement()
        if len(measurement) != 1:
            raise ValueError("Sensor measurement has more than one element")
        measured_value = ValueWithUnits(value=measurement[0], units=sensor.measurement_properties.unit[0])
        converted_value = measured_value.as_units(self.config.min_value.units)

        # Determine if done
        done[self.platform] = converted_value < self.config.min_value.value or self.config.max_value.value < converted_value

        if done[self.platform]:
            next_state.episode_state[self.platform][self.name] = DoneStatusCodes.LOSE

        self._set_all_done(done)

        return done

get_validator: Type[corl.dones.sensor_bounds_check_done.SensorBoundsCheckDoneValidator] property readonly ¤

Returns the validator for this done condition

SensorBoundsCheckDoneValidator (DoneFuncBaseValidator) pydantic-model ¤

Initialize an SensorBoundsCheckDone

Parameters¤

min_value : float The minimum allowed value of this sensor max_value : float The maximum allowed value of this sensor sensor_name : str The name of the sensor to check

Source code in corl/dones/sensor_bounds_check_done.py
class SensorBoundsCheckDoneValidator(DoneFuncBaseValidator):
    """Initialize an SensorBoundsCheckDone

    Parameters
    ----------
    min_value : float
        The minimum allowed value of this sensor
    max_value : float
        The maximum allowed value of this sensor
    sensor_name : str
        The name of the sensor to check
    """
    min_value: ValueWithUnits
    max_value: ValueWithUnits
    sensor_name: str

    @validator('max_value')
    def min_max_consistent(cls, v, values):
        """Validate that the maximum is bigger than the minimum."""
        if 'min_value' not in values:
            # min_value failed, so let that error message be provided
            return v
        if v.units != values['min_value'].units:
            raise ValueError(f'Inconsistent units for min and max: {values["min_value"].units}, {v.units}')
        if v.value <= values['min_value'].value:
            raise ValueError(f'Minimum bound {values["min_value"].value} exceeds maximum bound {v.value}')
        return v

min_max_consistent(v, values) classmethod ¤

Validate that the maximum is bigger than the minimum.

Source code in corl/dones/sensor_bounds_check_done.py
@validator('max_value')
def min_max_consistent(cls, v, values):
    """Validate that the maximum is bigger than the minimum."""
    if 'min_value' not in values:
        # min_value failed, so let that error message be provided
        return v
    if v.units != values['min_value'].units:
        raise ValueError(f'Inconsistent units for min and max: {values["min_value"].units}, {v.units}')
    if v.value <= values['min_value'].value:
        raise ValueError(f'Minimum bound {values["min_value"].value} exceeds maximum bound {v.value}')
    return v