Skip to content

Target value difference


Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.

This is a US Government Work not subject to copyright protection in the US.

The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.


TargetValueDifference (BaseDictWrapperGlue) ¤

Glue class for an observation of some constant value

Source code in corl/glues/common/target_value_difference.py
class TargetValueDifference(BaseDictWrapperGlue):
    """
    Glue class for an observation of some constant value
    """
    SENSOR_STR = "sensor"
    TARGET_STR = "target"

    class Fields:  # pylint: disable=too-few-public-methods
        """
        field data
        """
        TARGET_VALUE_DIFF = "target_value_diff"

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        keys = self.glues().keys()
        if not isinstance(self.config.target_value, float) and len(self.glues()) != 2:  # type: ignore
            raise ValueError("self.config.target_value not defined - expecting two glues to get target from 2nd glue")

        if not isinstance(self.config.target_value, float) and len(self.glues()) == 2:  # type: ignore
            if TargetValueDifference.TARGET_STR not in keys:
                raise KeyError(f"Expecting to see {TargetValueDifference.TARGET_STR} in keys - {keys}")

        if isinstance(self.config.target_value, float) and len(self.glues()) != 1:  # type: ignore
            raise ValueError("self.config.target_value defined - expecting one glue")

        if TargetValueDifference.SENSOR_STR not in keys:
            raise KeyError(f"Expecting to see {TargetValueDifference.SENSOR_STR} in keys - {keys}")

        self._logger = logging.getLogger(TargetValueDifference.__name__)

    @property
    def get_validator(self) -> typing.Type[TargetValueDifferenceValidator]:
        """Return validator"""
        return TargetValueDifferenceValidator

    @lru_cache(maxsize=1)
    def get_unique_name(self) -> str:
        """Class method that retrieves the unique name for the glue instance
        """
        unique_name = self.config.unique_name  # type: ignore
        if unique_name:
            return unique_name

        wrapped_glue_name = self.glues()[TargetValueDifference.SENSOR_STR].get_unique_name()
        if wrapped_glue_name is None:
            return None
        return wrapped_glue_name + "Diff"

    def invalid_value(self) -> OrderedDict:
        """When invalid return a value of 0

        TODO: this may need to be self.min in the case that the minimum is larger than 0 (i.e. a harddeck)

        Returns:
            OrderedDict -- Dictionary with <FIELD> entry containing 1D array
        """
        d = OrderedDict()

        if isinstance(self.config.target_value, float):  # type: ignore
            target_value = self.config.target_value  # type: ignore
        else:
            target_value = self.glues()[TargetValueDifference.TARGET_STR].get_observation()  # type: ignore

        d[self.Fields.TARGET_VALUE] = np.asarray([target_value], dtype=np.float32)  # type: ignore
        return d

    @lru_cache(maxsize=1)
    def observation_space(self):
        """Observation space"""
        space = self.glues()[TargetValueDifference.SENSOR_STR].observation_space()

        d = gym.spaces.dict.Dict()
        if len(space.spaces) > 1:
            raise RuntimeError("Target value difference can only wrap a glue with one output")

        for field in space.spaces:
            d.spaces[field + "_diff"] = gym.spaces.Box(self.config.limit.minimum, self.config.limit.maximum, shape=(1, ), dtype=np.float32)
            d.spaces[field + "_diff_invalid"] = gym.spaces.Discrete(2)
        return d

    def get_observation(self):
        """Get observation"""

        def get_wrap_diff(A, B):
            """Returns the min diff angle

             RAD A	         Deg A   RAD A	       Deg B   abs_diff_mod_360	Diff
              1.047197551	  60     2.094395102    120	    60	             60
              2.094395102	 120 	 1.047197551     60	    60	             60
             -2.094395102	-120	-1.047197551    -60	    60	             60
              6.108652382	 350	 0.1745329252    10	   340	             20
             -2.967059728	-170	 2.967059728    170	   340	             20

            Arguments:
                A {float} -- Angle 1 - rad or deg
                B {float} -- Angle 2 - rad or deg

            Returns:
                float -- the min diff angle
            """

            # Convert to degrees if needed.
            temp_a = math.degrees(A) if self.config.is_rad else A
            temp_b = math.degrees(B) if self.config.is_rad else B

            if self.config.method == 1:
                # Compute the diff as abs min angle (always positive)
                abs_diff_mod_360 = abs(temp_a - temp_b) % 360
                result = 360 - abs_diff_mod_360 if abs_diff_mod_360 > 180 else abs_diff_mod_360
            else:
                # Compute the diff as min angle maintain sign for direction
                diff = temp_a - temp_b
                result = (diff + 180) % 360 - 180

            # Return the diff in deg if radians if needed.
            return math.radians(result) if self.config.is_rad else result

        glue = self.glues()[TargetValueDifference.SENSOR_STR]
        obs = glue.get_observation()

        observation_units = None
        if hasattr(glue, "observation_units"):
            observation_units = glue.observation_units()

        if observation_units and self.config.unit:
            observation_unit = None
            length = 0
            for item in observation_units:
                observation_unit = observation_units[item][0]
                length += 1

            if length != 1:
                raise RuntimeError(f"observation_units not of length 1: {observation_units}")

            if units.GetUnitFromStr(observation_unit) != units.GetUnitFromStr(self.config.unit):
                raise RuntimeError(f"Target value units [{self.config.unit}] and observation units [{observation_unit}] don't match")

        target_invalid = False
        target_name = " "
        if isinstance(self.config.target_value, float):
            target_value = self.config.target_value
        else:
            # TODO make this more configurable... assumes 1 value for target
            if hasattr(self.glues()[TargetValueDifference.TARGET_STR], "_sensor"):
                target_invalid = not self.glues()[TargetValueDifference.TARGET_STR]._sensor.valid  # pylint: disable=protected-access
                target_name = self.glues()[TargetValueDifference.TARGET_STR]._sensor_name  # pylint: disable=protected-access
            target_value = self.glues()[TargetValueDifference.TARGET_STR].get_observation()[self.config.target_value_field][
                self.config.target_value_index]

            # I would think that this would be needed by the target value but is already converted.
            # if units.GetUnitFromStr(self.glues()[1]._sensor._properties.unit[0]) != units.GetUnitFromStr(self.config.unit):
            #     target_value = units.Convert( target_value,
            #                                   units.GetUnitFromStr(self.glues()[1]._sensor._properties.unit[0]),
            #                                   units.GetUnitFromStr(self.config.unit))

        d = OrderedDict()

        if target_invalid:
            for field, value in obs.items():
                d[field + "_diff"] = np.array([self.config.limit.minimum], dtype=np.float32)
                d[field + "_diff_invalid"] = target_invalid
        else:
            for field, value in obs.items():
                if self.config.is_wrap:
                    # Note: this always returns the min angle diff and is positive
                    diff = get_wrap_diff(target_value, value[self.config.value_index])
                else:
                    diff = target_value - value[self.config.value_index]

                if self.config.is_abs:
                    diff = abs(diff)

                self._logger.debug(f"{TargetValueDifference.__name__}::{target_name}::obs = {value[self.config.value_index]}")
                self._logger.debug(f"{TargetValueDifference.__name__}::{target_name}::target = {target_value}")
                self._logger.debug(f"{TargetValueDifference.__name__}::{target_name}::diff = {diff}")

                d[field + "_diff"] = np.array([diff], dtype=np.float32)
                d[field + "_diff_invalid"] = glue.agent_removed()

        return d

    @lru_cache(maxsize=1)
    def action_space(self) -> gym.spaces.Space:
        """Action space"""
        return None

    def apply_action(self, action, observation):  # pylint: disable=unused-argument
        """Apply action"""
        return None

get_validator: Type[corl.glues.common.target_value_difference.TargetValueDifferenceValidator] property readonly ¤

Return validator

Fields ¤

field data

Source code in corl/glues/common/target_value_difference.py
class Fields:  # pylint: disable=too-few-public-methods
    """
    field data
    """
    TARGET_VALUE_DIFF = "target_value_diff"

action_space(self) ¤

Action space

Source code in corl/glues/common/target_value_difference.py
@lru_cache(maxsize=1)
def action_space(self) -> gym.spaces.Space:
    """Action space"""
    return None

apply_action(self, action, observation) ¤

Apply action

Source code in corl/glues/common/target_value_difference.py
def apply_action(self, action, observation):  # pylint: disable=unused-argument
    """Apply action"""
    return None

get_observation(self) ¤

Get observation

Source code in corl/glues/common/target_value_difference.py
def get_observation(self):
    """Get observation"""

    def get_wrap_diff(A, B):
        """Returns the min diff angle

         RAD A	         Deg A   RAD A	       Deg B   abs_diff_mod_360	Diff
          1.047197551	  60     2.094395102    120	    60	             60
          2.094395102	 120 	 1.047197551     60	    60	             60
         -2.094395102	-120	-1.047197551    -60	    60	             60
          6.108652382	 350	 0.1745329252    10	   340	             20
         -2.967059728	-170	 2.967059728    170	   340	             20

        Arguments:
            A {float} -- Angle 1 - rad or deg
            B {float} -- Angle 2 - rad or deg

        Returns:
            float -- the min diff angle
        """

        # Convert to degrees if needed.
        temp_a = math.degrees(A) if self.config.is_rad else A
        temp_b = math.degrees(B) if self.config.is_rad else B

        if self.config.method == 1:
            # Compute the diff as abs min angle (always positive)
            abs_diff_mod_360 = abs(temp_a - temp_b) % 360
            result = 360 - abs_diff_mod_360 if abs_diff_mod_360 > 180 else abs_diff_mod_360
        else:
            # Compute the diff as min angle maintain sign for direction
            diff = temp_a - temp_b
            result = (diff + 180) % 360 - 180

        # Return the diff in deg if radians if needed.
        return math.radians(result) if self.config.is_rad else result

    glue = self.glues()[TargetValueDifference.SENSOR_STR]
    obs = glue.get_observation()

    observation_units = None
    if hasattr(glue, "observation_units"):
        observation_units = glue.observation_units()

    if observation_units and self.config.unit:
        observation_unit = None
        length = 0
        for item in observation_units:
            observation_unit = observation_units[item][0]
            length += 1

        if length != 1:
            raise RuntimeError(f"observation_units not of length 1: {observation_units}")

        if units.GetUnitFromStr(observation_unit) != units.GetUnitFromStr(self.config.unit):
            raise RuntimeError(f"Target value units [{self.config.unit}] and observation units [{observation_unit}] don't match")

    target_invalid = False
    target_name = " "
    if isinstance(self.config.target_value, float):
        target_value = self.config.target_value
    else:
        # TODO make this more configurable... assumes 1 value for target
        if hasattr(self.glues()[TargetValueDifference.TARGET_STR], "_sensor"):
            target_invalid = not self.glues()[TargetValueDifference.TARGET_STR]._sensor.valid  # pylint: disable=protected-access
            target_name = self.glues()[TargetValueDifference.TARGET_STR]._sensor_name  # pylint: disable=protected-access
        target_value = self.glues()[TargetValueDifference.TARGET_STR].get_observation()[self.config.target_value_field][
            self.config.target_value_index]

        # I would think that this would be needed by the target value but is already converted.
        # if units.GetUnitFromStr(self.glues()[1]._sensor._properties.unit[0]) != units.GetUnitFromStr(self.config.unit):
        #     target_value = units.Convert( target_value,
        #                                   units.GetUnitFromStr(self.glues()[1]._sensor._properties.unit[0]),
        #                                   units.GetUnitFromStr(self.config.unit))

    d = OrderedDict()

    if target_invalid:
        for field, value in obs.items():
            d[field + "_diff"] = np.array([self.config.limit.minimum], dtype=np.float32)
            d[field + "_diff_invalid"] = target_invalid
    else:
        for field, value in obs.items():
            if self.config.is_wrap:
                # Note: this always returns the min angle diff and is positive
                diff = get_wrap_diff(target_value, value[self.config.value_index])
            else:
                diff = target_value - value[self.config.value_index]

            if self.config.is_abs:
                diff = abs(diff)

            self._logger.debug(f"{TargetValueDifference.__name__}::{target_name}::obs = {value[self.config.value_index]}")
            self._logger.debug(f"{TargetValueDifference.__name__}::{target_name}::target = {target_value}")
            self._logger.debug(f"{TargetValueDifference.__name__}::{target_name}::diff = {diff}")

            d[field + "_diff"] = np.array([diff], dtype=np.float32)
            d[field + "_diff_invalid"] = glue.agent_removed()

    return d

get_unique_name(self) ¤

Class method that retrieves the unique name for the glue instance

Source code in corl/glues/common/target_value_difference.py
@lru_cache(maxsize=1)
def get_unique_name(self) -> str:
    """Class method that retrieves the unique name for the glue instance
    """
    unique_name = self.config.unique_name  # type: ignore
    if unique_name:
        return unique_name

    wrapped_glue_name = self.glues()[TargetValueDifference.SENSOR_STR].get_unique_name()
    if wrapped_glue_name is None:
        return None
    return wrapped_glue_name + "Diff"

invalid_value(self) ¤

When invalid return a value of 0

TODO: this may need to be self.min in the case that the minimum is larger than 0 (i.e. a harddeck)

Returns:

Type Description
OrderedDict

OrderedDict -- Dictionary with entry containing 1D array

Source code in corl/glues/common/target_value_difference.py
def invalid_value(self) -> OrderedDict:
    """When invalid return a value of 0

    TODO: this may need to be self.min in the case that the minimum is larger than 0 (i.e. a harddeck)

    Returns:
        OrderedDict -- Dictionary with <FIELD> entry containing 1D array
    """
    d = OrderedDict()

    if isinstance(self.config.target_value, float):  # type: ignore
        target_value = self.config.target_value  # type: ignore
    else:
        target_value = self.glues()[TargetValueDifference.TARGET_STR].get_observation()  # type: ignore

    d[self.Fields.TARGET_VALUE] = np.asarray([target_value], dtype=np.float32)  # type: ignore
    return d

observation_space(self) ¤

Observation space

Source code in corl/glues/common/target_value_difference.py
@lru_cache(maxsize=1)
def observation_space(self):
    """Observation space"""
    space = self.glues()[TargetValueDifference.SENSOR_STR].observation_space()

    d = gym.spaces.dict.Dict()
    if len(space.spaces) > 1:
        raise RuntimeError("Target value difference can only wrap a glue with one output")

    for field in space.spaces:
        d.spaces[field + "_diff"] = gym.spaces.Box(self.config.limit.minimum, self.config.limit.maximum, shape=(1, ), dtype=np.float32)
        d.spaces[field + "_diff_invalid"] = gym.spaces.Discrete(2)
    return d

TargetValueDifferenceValidator (BaseDictWrapperGlueValidator) pydantic-model ¤

target_value: float - Value to report the difference around unit: str - unit for the target value value_index: - index to extract the value from an ndarray extractor is_wrap: bool - if the target value difference needs to wrap around the limits is_rad: bool - if the read observation is in radians method: int - TODO: fill this is is_abs: bool - if the difference to the target value should be reported as absolute value limit: LimitConfigValidator - limit configuration unique_name: str - optional unique name for a target value difference glue

Source code in corl/glues/common/target_value_difference.py
class TargetValueDifferenceValidator(BaseDictWrapperGlueValidator):
    """
    target_value: float - Value to report the difference around
    unit: str - unit for the target value
    value_index: - index to extract the value from an ndarray extractor
    is_wrap: bool - if the target value difference needs to wrap around the limits
    is_rad: bool - if the read observation is in radians
    method: int - TODO: fill this is
    is_abs: bool - if the difference to the target value should be reported as absolute value
    limit: LimitConfigValidator - limit configuration
    unique_name: str - optional unique name for a target value difference glue
    """
    target_value: typing.Optional[float] = None
    target_value_index: typing.Optional[int] = 0
    target_value_field: typing.Optional[str] = "direct_observation"
    unit: str
    value_index: int = 0
    is_wrap: bool = False
    is_rad: bool = False
    method: int = 0
    is_abs: bool = False
    limit: LimitConfigValidator
    unique_name: str = ''