Observe sensor
Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.
This is a US Government Work not subject to copyright protection in the US.
The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.
ObserveSensor (BaseAgentPlatformGlue)
¤
Glue to observe a sensor
Configuration
string String of sensor class, sensor base name, or the PluginLibrary group name
For example consider a platform which has "SimOrientationRateSensor" attached.
The following will all be equivalent:
- SimOrientationRateSensor
- BaseOrientationRateSensor
- Sensor_OrientationRate
Important: If multiple sensors are found an array will be returned (ex. Gload Learjet)
This will also be the name attached to the glue, which can then be used
Source code in corl/glues/common/observe_sensor.py
class ObserveSensor(BaseAgentPlatformGlue):
"""Glue to observe a sensor
Configuration:
sensor: string String of sensor class, sensor base name, or the PluginLibrary group name
For example consider a platform which has "SimOrientationRateSensor" attached.
The following will all be equivalent:
- SimOrientationRateSensor
- BaseOrientationRateSensor
- Sensor_OrientationRate
Important: If multiple sensors are found an array will be returned (ex. Gload Learjet)
This will also be the name attached to the glue, which can then be used
"""
# pylint: disable=too-few-public-methods
class Fields:
"""
Fields in this glue
"""
DIRECT_OBSERVATION = "direct_observation"
@property
def get_validator(self) -> typing.Type[ObserveSensorValidator]:
return ObserveSensorValidator
def __init__(self, **kwargs) -> None:
self.config: ObserveSensorValidator
super().__init__(**kwargs)
self._sensor = get_sensor_by_name(self._platform, self.config.sensor)
self._sensor_name: str = self.config.sensor
self.out_units = self.config.output_units
@lru_cache(maxsize=1)
def get_unique_name(self) -> str:
"""Class method that retreives the unique name for the glue instance
"""
# TODO: This may result in ObserveSensor_Sensor_X, which is kinda ugly
return "ObserveSensor_" + self._sensor_name
# TODO: broken
def invalid_value(self) -> OrderedDict:
"""Return zeros when invalid
"""
arr: typing.List[float] = []
if isinstance(self._sensor.measurement_properties.low, abc.Sequence): # type: ignore
for _ in enumerate(self._sensor.measurement_properties.low): # type: ignore
arr.append(0.0)
elif np.isscalar(self._sensor.measurement_properties.low): # type: ignore
arr.append(0.0)
else:
raise RuntimeError("Expecting either array or scalar")
d = OrderedDict()
d[self.Fields.DIRECT_OBSERVATION] = np.asarray(arr)
return d
@lru_cache(maxsize=1)
def observation_units(self):
"""Units of the sensors in this glue
"""
out_units = []
for value in self.out_units:
if isinstance(value, abc.Sequence):
# list of lists 2D case
sub_units = []
for val in value:
sub_units.append(GetStrFromUnit(val))
out_units.append(sub_units)
else:
# 1D case
out_units.append(GetStrFromUnit(value))
d = gym.spaces.dict.Dict()
d.spaces[self.Fields.DIRECT_OBSERVATION] = out_units
return d
@lru_cache(maxsize=1)
def observation_space(self) -> gym.spaces.Space:
"""Observation Space
"""
d = gym.spaces.dict.Dict()
if isinstance(self._sensor.measurement_properties, BoxProp):
d.spaces[self.Fields.DIRECT_OBSERVATION] = self._sensor.measurement_properties.create_converted_space(self.out_units)
elif isinstance(self._sensor.measurement_properties, (DiscreteProp)):
d.spaces[self.Fields.DIRECT_OBSERVATION] = self._sensor.measurement_properties.create_space()
elif isinstance(self._sensor.measurement_properties, (MultiBinary)):
d.spaces[self.Fields.DIRECT_OBSERVATION] = self._sensor.measurement_properties.create_space()
else:
raise TypeError("Only supports {BoxProp.__name__}, {MultiBinary.__name__} and {DiscreteProp.__name__}")
return d
def get_observation(self) -> OrderedDict:
"""Observation Values
"""
d = OrderedDict()
if isinstance(self._sensor.measurement_properties, BoxProp):
sensed_value = self._sensor.get_measurement()
if isinstance(sensed_value, np.ndarray):
sensed_value = sensed_value.tolist()
else:
sensed_value = list(sensed_value)
for indx, value in enumerate(sensed_value):
if isinstance(value, abc.Sequence):
# list of lists 2D case
for indx1, value1 in enumerate(value):
in_unit2d = self._sensor.measurement_properties.unit[indx][indx1]
assert isinstance(in_unit2d, str)
assert isinstance(self.out_units, abc.Sequence)
out_unit1 = self.out_units[indx]
assert isinstance(out_unit1, abc.Sequence)
out_unit2d = out_unit1[indx1]
assert isinstance(out_unit2d, enum.Enum)
assert isinstance(sensed_value, abc.Sequence)
sensed_value1 = sensed_value[indx]
assert isinstance(sensed_value1, list)
sensed_value1[indx1] = Convert(value1, in_unit2d, out_unit2d)
else:
# 1D case
in_unit = self._sensor.measurement_properties.unit[indx]
assert isinstance(in_unit, str)
out_unit = self.out_units[indx]
assert isinstance(out_unit, enum.Enum)
assert isinstance(sensed_value, list)
sensed_value[indx] = Convert(value, in_unit, out_unit)
d[self.Fields.DIRECT_OBSERVATION] = np.array(sensed_value, dtype=np.float32)
elif isinstance(self._sensor.measurement_properties, (DiscreteProp)):
sensed_value_discrete: float = self._sensor.get_measurement()[0]
d[self.Fields.DIRECT_OBSERVATION] = np.array(sensed_value_discrete, dtype=np.int32)
elif isinstance(self._sensor.measurement_properties, (MultiBinary)):
sensed_value_multi_binary = self._sensor.get_measurement()
d[self.Fields.DIRECT_OBSERVATION] = sensed_value_multi_binary # type: ignore
else:
raise TypeError("Only supports {BoxProp.__name__}, {MultiBinary.__name__} and {DiscreteProp.__name__}")
return d
@lru_cache(maxsize=1)
def action_space(self) -> gym.spaces.Space:
"""No Actions
"""
return None
def apply_action(self, action, observation):
"""No Actions
"""
return None
get_validator: Type[corl.glues.common.observe_sensor.ObserveSensorValidator]
property
readonly
¤
returns the validator for this class
Returns:
Type | Description |
---|---|
Type[corl.glues.common.observe_sensor.ObserveSensorValidator] |
BaseAgentGlueValidator -- A pydantic validator to be used to validate kwargs |
Fields
¤
Fields in this glue
Source code in corl/glues/common/observe_sensor.py
class Fields:
"""
Fields in this glue
"""
DIRECT_OBSERVATION = "direct_observation"
action_space(self)
¤
No Actions
Source code in corl/glues/common/observe_sensor.py
@lru_cache(maxsize=1)
def action_space(self) -> gym.spaces.Space:
"""No Actions
"""
return None
apply_action(self, action, observation)
¤
No Actions
Source code in corl/glues/common/observe_sensor.py
def apply_action(self, action, observation):
"""No Actions
"""
return None
get_observation(self)
¤
Observation Values
Source code in corl/glues/common/observe_sensor.py
def get_observation(self) -> OrderedDict:
"""Observation Values
"""
d = OrderedDict()
if isinstance(self._sensor.measurement_properties, BoxProp):
sensed_value = self._sensor.get_measurement()
if isinstance(sensed_value, np.ndarray):
sensed_value = sensed_value.tolist()
else:
sensed_value = list(sensed_value)
for indx, value in enumerate(sensed_value):
if isinstance(value, abc.Sequence):
# list of lists 2D case
for indx1, value1 in enumerate(value):
in_unit2d = self._sensor.measurement_properties.unit[indx][indx1]
assert isinstance(in_unit2d, str)
assert isinstance(self.out_units, abc.Sequence)
out_unit1 = self.out_units[indx]
assert isinstance(out_unit1, abc.Sequence)
out_unit2d = out_unit1[indx1]
assert isinstance(out_unit2d, enum.Enum)
assert isinstance(sensed_value, abc.Sequence)
sensed_value1 = sensed_value[indx]
assert isinstance(sensed_value1, list)
sensed_value1[indx1] = Convert(value1, in_unit2d, out_unit2d)
else:
# 1D case
in_unit = self._sensor.measurement_properties.unit[indx]
assert isinstance(in_unit, str)
out_unit = self.out_units[indx]
assert isinstance(out_unit, enum.Enum)
assert isinstance(sensed_value, list)
sensed_value[indx] = Convert(value, in_unit, out_unit)
d[self.Fields.DIRECT_OBSERVATION] = np.array(sensed_value, dtype=np.float32)
elif isinstance(self._sensor.measurement_properties, (DiscreteProp)):
sensed_value_discrete: float = self._sensor.get_measurement()[0]
d[self.Fields.DIRECT_OBSERVATION] = np.array(sensed_value_discrete, dtype=np.int32)
elif isinstance(self._sensor.measurement_properties, (MultiBinary)):
sensed_value_multi_binary = self._sensor.get_measurement()
d[self.Fields.DIRECT_OBSERVATION] = sensed_value_multi_binary # type: ignore
else:
raise TypeError("Only supports {BoxProp.__name__}, {MultiBinary.__name__} and {DiscreteProp.__name__}")
return d
get_unique_name(self)
¤
Class method that retreives the unique name for the glue instance
Source code in corl/glues/common/observe_sensor.py
@lru_cache(maxsize=1)
def get_unique_name(self) -> str:
"""Class method that retreives the unique name for the glue instance
"""
# TODO: This may result in ObserveSensor_Sensor_X, which is kinda ugly
return "ObserveSensor_" + self._sensor_name
invalid_value(self)
¤
Return zeros when invalid
Source code in corl/glues/common/observe_sensor.py
def invalid_value(self) -> OrderedDict:
"""Return zeros when invalid
"""
arr: typing.List[float] = []
if isinstance(self._sensor.measurement_properties.low, abc.Sequence): # type: ignore
for _ in enumerate(self._sensor.measurement_properties.low): # type: ignore
arr.append(0.0)
elif np.isscalar(self._sensor.measurement_properties.low): # type: ignore
arr.append(0.0)
else:
raise RuntimeError("Expecting either array or scalar")
d = OrderedDict()
d[self.Fields.DIRECT_OBSERVATION] = np.asarray(arr)
return d
observation_space(self)
¤
Observation Space
Source code in corl/glues/common/observe_sensor.py
@lru_cache(maxsize=1)
def observation_space(self) -> gym.spaces.Space:
"""Observation Space
"""
d = gym.spaces.dict.Dict()
if isinstance(self._sensor.measurement_properties, BoxProp):
d.spaces[self.Fields.DIRECT_OBSERVATION] = self._sensor.measurement_properties.create_converted_space(self.out_units)
elif isinstance(self._sensor.measurement_properties, (DiscreteProp)):
d.spaces[self.Fields.DIRECT_OBSERVATION] = self._sensor.measurement_properties.create_space()
elif isinstance(self._sensor.measurement_properties, (MultiBinary)):
d.spaces[self.Fields.DIRECT_OBSERVATION] = self._sensor.measurement_properties.create_space()
else:
raise TypeError("Only supports {BoxProp.__name__}, {MultiBinary.__name__} and {DiscreteProp.__name__}")
return d
observation_units(self)
¤
Units of the sensors in this glue
Source code in corl/glues/common/observe_sensor.py
@lru_cache(maxsize=1)
def observation_units(self):
"""Units of the sensors in this glue
"""
out_units = []
for value in self.out_units:
if isinstance(value, abc.Sequence):
# list of lists 2D case
sub_units = []
for val in value:
sub_units.append(GetStrFromUnit(val))
out_units.append(sub_units)
else:
# 1D case
out_units.append(GetStrFromUnit(value))
d = gym.spaces.dict.Dict()
d.spaces[self.Fields.DIRECT_OBSERVATION] = out_units
return d
ObserveSensorValidator (BaseAgentPlatformGlueValidator)
pydantic-model
¤
output_units: unit to convert the output data to sensor: which sensor to find on the platform
Source code in corl/glues/common/observe_sensor.py
class ObserveSensorValidator(BaseAgentPlatformGlueValidator):
"""
output_units: unit to convert the output data to
sensor: which sensor to find on the platform
"""
sensor: str
output_units: typing.Union[typing.Sequence[enum.Enum], typing.Sequence[typing.Sequence[enum.Enum]]] = []
@validator('output_units', always=True, pre=True)
def validate_output_units(cls, v, values): # pylint: disable=no-self-argument, no-self-use
"""output_units validator"""
units = []
sensor_obj = get_sensor_by_name(values['platform'], values['sensor'])
if not v:
# Configuration does not provide output units
if hasattr(sensor_obj.measurement_properties, 'unit'):
# Sensor measurement properties has unit attribute, get the default unit
for value in sensor_obj.measurement_properties.unit:
if not isinstance(value, str) and isinstance(value, abc.Sequence):
# list of lists 2D case
units.append([GetUnitFromStr(val).DEFAULT for val in value])
else:
# 1D case
units.append(GetUnitFromStr(value).DEFAULT)
else:
# Sensor measurement properties does not have unit attribute, use NoneUnitType.DEFAULT
units.append(NoneUnitType.DEFAULT)
else:
# Configuration provides output units, convert string units to enum.Enum units
# Assert that sensor has units
assert hasattr(sensor_obj.measurement_properties, 'unit'), "Unexpected output_units when sensor does not have unit attribute"
if isinstance(v, str):
# Configuration units are a single string
assert isinstance(v, str), "Unexpected input output_units type"
assert len(sensor_obj.measurement_properties.unit) == 1, "Unexpected length of output_units"
units.append(GetUnitFromStr(v))
elif isinstance(v, abc.Sequence):
# Configuration units are a list
assert len(v) == len(sensor_obj.measurement_properties.unit), "Unexpected length of output_units"
for i, value in enumerate(v):
if not isinstance(value, str) and isinstance(value, abc.Sequence):
# list of lists 2D case
assert len(value) == len(sensor_obj.measurement_properties.unit[i]), "Unexpected length of output_units"
sub_units = []
for j, val in enumerate(value):
assert isinstance(val, str), "Unexpected input output_units type"
assert isinstance(GetUnitFromStr(sensor_obj.measurement_properties.unit[i][j]),
type(GetUnitFromStr(val))), "Unexpected unit dimension"
sub_units.append(GetUnitFromStr(val))
units.append(sub_units)
else:
# 1D case
assert isinstance(value, str), "Unexpected input output_units type"
assert isinstance(GetUnitFromStr(sensor_obj.measurement_properties.unit[i]),
type(GetUnitFromStr(value))), "Unexpected unit dimension"
units.append(GetUnitFromStr(value))
else:
raise TypeError(f'Unexpected_input output_units type: {type(v).__name__}')
return units
validate_output_units(v, values)
classmethod
¤
output_units validator
Source code in corl/glues/common/observe_sensor.py
@validator('output_units', always=True, pre=True)
def validate_output_units(cls, v, values): # pylint: disable=no-self-argument, no-self-use
"""output_units validator"""
units = []
sensor_obj = get_sensor_by_name(values['platform'], values['sensor'])
if not v:
# Configuration does not provide output units
if hasattr(sensor_obj.measurement_properties, 'unit'):
# Sensor measurement properties has unit attribute, get the default unit
for value in sensor_obj.measurement_properties.unit:
if not isinstance(value, str) and isinstance(value, abc.Sequence):
# list of lists 2D case
units.append([GetUnitFromStr(val).DEFAULT for val in value])
else:
# 1D case
units.append(GetUnitFromStr(value).DEFAULT)
else:
# Sensor measurement properties does not have unit attribute, use NoneUnitType.DEFAULT
units.append(NoneUnitType.DEFAULT)
else:
# Configuration provides output units, convert string units to enum.Enum units
# Assert that sensor has units
assert hasattr(sensor_obj.measurement_properties, 'unit'), "Unexpected output_units when sensor does not have unit attribute"
if isinstance(v, str):
# Configuration units are a single string
assert isinstance(v, str), "Unexpected input output_units type"
assert len(sensor_obj.measurement_properties.unit) == 1, "Unexpected length of output_units"
units.append(GetUnitFromStr(v))
elif isinstance(v, abc.Sequence):
# Configuration units are a list
assert len(v) == len(sensor_obj.measurement_properties.unit), "Unexpected length of output_units"
for i, value in enumerate(v):
if not isinstance(value, str) and isinstance(value, abc.Sequence):
# list of lists 2D case
assert len(value) == len(sensor_obj.measurement_properties.unit[i]), "Unexpected length of output_units"
sub_units = []
for j, val in enumerate(value):
assert isinstance(val, str), "Unexpected input output_units type"
assert isinstance(GetUnitFromStr(sensor_obj.measurement_properties.unit[i][j]),
type(GetUnitFromStr(val))), "Unexpected unit dimension"
sub_units.append(GetUnitFromStr(val))
units.append(sub_units)
else:
# 1D case
assert isinstance(value, str), "Unexpected input output_units type"
assert isinstance(GetUnitFromStr(sensor_obj.measurement_properties.unit[i]),
type(GetUnitFromStr(value))), "Unexpected unit dimension"
units.append(GetUnitFromStr(value))
else:
raise TypeError(f'Unexpected_input output_units type: {type(v).__name__}')
return units