Observe sensor repeated
Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.
This is a US Government Work not subject to copyright protection in the US.
The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.
ObserveSensorRepeated Glue
ObserveSensorRepeated (BaseAgentPlatformGlue)
¤
Glue to observe a sensor that can offer a variable length output
Configuration
string String of sensor class, sensor base name, or the PluginLibrary group name
For example consider a platform which has "SimOrientationRateSensor" attached.
The following will all be equivalent:
- SimOrientationRateSensor
- BaseOrientationRateSensor
- Sensor_OrientationRate
Important: If multiple sensors are found an array will be returned (ex. Gload Learjet)
This will also be the name attached to the glue, which can then be used
Source code in corl/glues/common/observe_sensor_repeated.py
class ObserveSensorRepeated(BaseAgentPlatformGlue):
"""Glue to observe a sensor that can offer a variable length output
Configuration:
sensor: string String of sensor class, sensor base name, or the PluginLibrary group name
For example consider a platform which has "SimOrientationRateSensor" attached.
The following will all be equivalent:
- SimOrientationRateSensor
- BaseOrientationRateSensor
- Sensor_OrientationRate
Important: If multiple sensors are found an array will be returned (ex. Gload Learjet)
This will also be the name attached to the glue, which can then be used
"""
# pylint: disable=too-few-public-methods
class Fields:
"""
Fields in this glue
"""
DIRECT_OBSERVATION = "direct_observation"
@property
def get_validator(self) -> typing.Type[ObserveSensorRepeatedValidator]:
return ObserveSensorRepeatedValidator
# TODO: self._sensor.measurement_properties assumes child_space attribute
def __init__(self, **kwargs) -> None:
self.config: ObserveSensorRepeatedValidator
super().__init__(**kwargs)
self._sensor = get_sensor_by_name(self._platform, self.config.sensor)
self._sensor_name: str = self.config.sensor
self.out_units = self.config.output_units
self.max_len = self.config.max_len
@lru_cache(maxsize=1)
def get_unique_name(self) -> str:
"""Class method that retreives the unique name for the glue instance
"""
# TODO: This may result in ObserveSensor_Sensor_X, which is kinda ugly
return "ObserveSensorRepeated_" + self._sensor_name
def invalid_value(self) -> OrderedDict:
"""Return zeros when invalid
"""
arr: typing.List[float] = []
d = OrderedDict()
d[self.Fields.DIRECT_OBSERVATION] = arr
return d
@lru_cache(maxsize=1)
def observation_units(self):
"""Units of the sensors in this glue
"""
out_dict = {}
for field_name, field_unit in self.out_units.items():
out_dict[field_name] = GetStrFromUnit(field_unit)
d = gym.spaces.dict.Dict()
d.spaces[self.Fields.DIRECT_OBSERVATION] = out_dict
return d
# TODO: Assumes self._sensor.measurement_properties has attribute child_space
@lru_cache(maxsize=1)
def observation_space(self) -> gym.spaces.Space:
"""Observation Space
"""
d = gym.spaces.dict.Dict()
child_space = gym.spaces.dict.Dict()
assert isinstance(self._sensor.measurement_properties, RepeatedProp), "Unexpected measurement_properties type"
child_space_measure = self._sensor.measurement_properties.child_space
for field_name, field_meas in child_space_measure.items():
if not isinstance(field_meas, (DiscreteProp, MultiBinary)):
assert isinstance(field_meas, BoxProp), "Unexpected field_meas type"
if field_name not in self.out_units:
child_space.spaces[field_name] = field_meas.create_space()
elif isinstance(self.out_units[field_name], list):
units = self.out_units[field_name]
assert isinstance(units, list), "Unexpected units type"
child_space.spaces[field_name] = field_meas.create_converted_space(units)
else:
child_space.spaces[field_name] = field_meas.create_converted_space([self.out_units[field_name]] * len(field_meas.low))
else:
child_space.spaces[field_name] = field_meas.create_space()
d.spaces[self.Fields.DIRECT_OBSERVATION] = Repeated(
child_space=child_space,
max_len=self.config.max_len,
)
return d
# TODO: Assumes self._sensor.measurement_properties has child_space attribute
def get_observation(self) -> OrderedDict:
"""Observation Values
"""
# NOTE: do not attempt to optimize this function by modifying data in place
# you will mess up all other glues that call self._glue.get_observation
# a copy is required here
sensed_value = self._sensor.get_measurement()
tmp_sensed: typing.List[typing.Dict[str, typing.Any]] = []
append = tmp_sensed.append
assert isinstance(self._sensor.measurement_properties, RepeatedProp), "Unexpected measurement_properties type"
m_props = self._sensor.measurement_properties.child_space
out_units = self.out_units
obs_child_space = self.observation_space()[self.Fields.DIRECT_OBSERVATION].child_space
for platform_data in sensed_value:
tmp_row = {}
for field_name, obs in platform_data.items():
tmp_row[field_name] = obs
if field_name in out_units:
prop = m_props[field_name]
if isinstance(prop, BoxProp):
old_unit = prop.unit[0]
assert isinstance(old_unit, str)
unit_class = GetUnitFromStr(old_unit)
if unit_class != NoneUnitType.NoneUnit:
tmp_row[field_name] = Convert(obs, old_unit, out_units[field_name])
if self.config.enable_clip:
field_space = obs_child_space.spaces[field_name]
if isinstance(field_space, gym.spaces.Box):
tmp_row[field_name] = np.clip(obs, field_space.low, field_space.high)
append(tmp_row)
# we are going to clip the list to the max_len, so the obs is happy
if len(tmp_sensed) == self.max_len:
break
d = OrderedDict()
d[self.Fields.DIRECT_OBSERVATION] = tmp_sensed
return d
@lru_cache(maxsize=1)
def action_space(self) -> gym.spaces.Space:
"""No Actions
"""
return None
def apply_action(self, action: EnvSpaceUtil.sample_type, observation: EnvSpaceUtil.sample_type):
"""No Actions
"""
return None
get_validator: Type[corl.glues.common.observe_sensor_repeated.ObserveSensorRepeatedValidator]
property
readonly
¤
returns the validator for this class
Returns:
Type | Description |
---|---|
Type[corl.glues.common.observe_sensor_repeated.ObserveSensorRepeatedValidator] |
BaseAgentGlueValidator -- A pydantic validator to be used to validate kwargs |
Fields
¤
Fields in this glue
Source code in corl/glues/common/observe_sensor_repeated.py
class Fields:
"""
Fields in this glue
"""
DIRECT_OBSERVATION = "direct_observation"
action_space(self)
¤
No Actions
Source code in corl/glues/common/observe_sensor_repeated.py
@lru_cache(maxsize=1)
def action_space(self) -> gym.spaces.Space:
"""No Actions
"""
return None
apply_action(self, action, observation)
¤
No Actions
Source code in corl/glues/common/observe_sensor_repeated.py
def apply_action(self, action: EnvSpaceUtil.sample_type, observation: EnvSpaceUtil.sample_type):
"""No Actions
"""
return None
get_observation(self)
¤
Observation Values
Source code in corl/glues/common/observe_sensor_repeated.py
def get_observation(self) -> OrderedDict:
"""Observation Values
"""
# NOTE: do not attempt to optimize this function by modifying data in place
# you will mess up all other glues that call self._glue.get_observation
# a copy is required here
sensed_value = self._sensor.get_measurement()
tmp_sensed: typing.List[typing.Dict[str, typing.Any]] = []
append = tmp_sensed.append
assert isinstance(self._sensor.measurement_properties, RepeatedProp), "Unexpected measurement_properties type"
m_props = self._sensor.measurement_properties.child_space
out_units = self.out_units
obs_child_space = self.observation_space()[self.Fields.DIRECT_OBSERVATION].child_space
for platform_data in sensed_value:
tmp_row = {}
for field_name, obs in platform_data.items():
tmp_row[field_name] = obs
if field_name in out_units:
prop = m_props[field_name]
if isinstance(prop, BoxProp):
old_unit = prop.unit[0]
assert isinstance(old_unit, str)
unit_class = GetUnitFromStr(old_unit)
if unit_class != NoneUnitType.NoneUnit:
tmp_row[field_name] = Convert(obs, old_unit, out_units[field_name])
if self.config.enable_clip:
field_space = obs_child_space.spaces[field_name]
if isinstance(field_space, gym.spaces.Box):
tmp_row[field_name] = np.clip(obs, field_space.low, field_space.high)
append(tmp_row)
# we are going to clip the list to the max_len, so the obs is happy
if len(tmp_sensed) == self.max_len:
break
d = OrderedDict()
d[self.Fields.DIRECT_OBSERVATION] = tmp_sensed
return d
get_unique_name(self)
¤
Class method that retreives the unique name for the glue instance
Source code in corl/glues/common/observe_sensor_repeated.py
@lru_cache(maxsize=1)
def get_unique_name(self) -> str:
"""Class method that retreives the unique name for the glue instance
"""
# TODO: This may result in ObserveSensor_Sensor_X, which is kinda ugly
return "ObserveSensorRepeated_" + self._sensor_name
invalid_value(self)
¤
Return zeros when invalid
Source code in corl/glues/common/observe_sensor_repeated.py
def invalid_value(self) -> OrderedDict:
"""Return zeros when invalid
"""
arr: typing.List[float] = []
d = OrderedDict()
d[self.Fields.DIRECT_OBSERVATION] = arr
return d
observation_space(self)
¤
Observation Space
Source code in corl/glues/common/observe_sensor_repeated.py
@lru_cache(maxsize=1)
def observation_space(self) -> gym.spaces.Space:
"""Observation Space
"""
d = gym.spaces.dict.Dict()
child_space = gym.spaces.dict.Dict()
assert isinstance(self._sensor.measurement_properties, RepeatedProp), "Unexpected measurement_properties type"
child_space_measure = self._sensor.measurement_properties.child_space
for field_name, field_meas in child_space_measure.items():
if not isinstance(field_meas, (DiscreteProp, MultiBinary)):
assert isinstance(field_meas, BoxProp), "Unexpected field_meas type"
if field_name not in self.out_units:
child_space.spaces[field_name] = field_meas.create_space()
elif isinstance(self.out_units[field_name], list):
units = self.out_units[field_name]
assert isinstance(units, list), "Unexpected units type"
child_space.spaces[field_name] = field_meas.create_converted_space(units)
else:
child_space.spaces[field_name] = field_meas.create_converted_space([self.out_units[field_name]] * len(field_meas.low))
else:
child_space.spaces[field_name] = field_meas.create_space()
d.spaces[self.Fields.DIRECT_OBSERVATION] = Repeated(
child_space=child_space,
max_len=self.config.max_len,
)
return d
observation_units(self)
¤
Units of the sensors in this glue
Source code in corl/glues/common/observe_sensor_repeated.py
@lru_cache(maxsize=1)
def observation_units(self):
"""Units of the sensors in this glue
"""
out_dict = {}
for field_name, field_unit in self.out_units.items():
out_dict[field_name] = GetStrFromUnit(field_unit)
d = gym.spaces.dict.Dict()
d.spaces[self.Fields.DIRECT_OBSERVATION] = out_dict
return d
ObserveSensorRepeatedValidator (BaseAgentPlatformGlueValidator)
pydantic-model
¤
sensor: which sensor to find on the platform output_units: unit to convert the output data to max_len: the maximum length to allow the observation space to reach enable_clip: enables clipping for spaces that support cliping
Source code in corl/glues/common/observe_sensor_repeated.py
class ObserveSensorRepeatedValidator(BaseAgentPlatformGlueValidator):
"""
sensor: which sensor to find on the platform
output_units: unit to convert the output data to
max_len: the maximum length to allow the observation space to reach
enable_clip: enables clipping for spaces that support cliping
"""
sensor: str
output_units: typing.Dict[str, enum.Enum] = {}
max_len: int = 10
enable_clip: bool = False
@validator('output_units', always=True, pre=True)
def validate_output_units(cls, v, values): # pylint: disable=no-self-argument, no-self-use
"""output_units validator"""
sensor_obj = get_sensor_by_name(values['platform'], values['sensor'])
child_space = sensor_obj.measurement_properties.child_space
units = {}
if v is not None:
# Configuration provides output units, convert string units to enum.Enum units
# Assert that input configuration is a dict
assert isinstance(v, dict), "Configuration output_units must be a dict"
for field_name, value in v.items():
if field_name not in child_space:
raise RuntimeError(
f"output units were provided for repeated field {field_name}, but "
f"that field is not in this repeated space which has keys {child_space.keys()}"
)
prop = child_space[field_name]
if not isinstance(prop, (DiscreteProp, MultiBinary)):
assert isinstance(prop, BoxProp), "Unexpected field_space type"
if np.isscalar(prop.high) or len(prop.high) == 1:
if value is not None and not isinstance(value, str):
raise RuntimeError("output units must be provided as a scalar")
else:
raise RuntimeError("repeated field space unit conversions with lists of output types not currently implemented")
assert isinstance(GetUnitFromStr(prop.unit[0]), type(GetUnitFromStr(value))), "Unexpected unit dimension"
units[field_name] = GetUnitFromStr(value)
# Fill in any output units that were not provided with values from the sensor
for field_name, prop in child_space.items():
if field_name in units:
# This field name already provided so skip
continue
if hasattr(prop, 'unit'):
# Sensor measurement properties has unit attribute, get the default unit
if isinstance(prop.unit, str):
units[field_name] = GetUnitFromStr(prop.unit).DEFAULT
elif isinstance(prop.unit, list):
units[field_name] = GetUnitFromStr(prop.unit[0]).DEFAULT
else:
raise RuntimeError("Unexpected property unit type")
else:
# Sensor measurement properties does not have unit attribute, use NoneUnitType.DEFAULT
units[field_name] = NoneUnitType.DEFAULT
return units
validate_output_units(v, values)
classmethod
¤
output_units validator
Source code in corl/glues/common/observe_sensor_repeated.py
@validator('output_units', always=True, pre=True)
def validate_output_units(cls, v, values): # pylint: disable=no-self-argument, no-self-use
"""output_units validator"""
sensor_obj = get_sensor_by_name(values['platform'], values['sensor'])
child_space = sensor_obj.measurement_properties.child_space
units = {}
if v is not None:
# Configuration provides output units, convert string units to enum.Enum units
# Assert that input configuration is a dict
assert isinstance(v, dict), "Configuration output_units must be a dict"
for field_name, value in v.items():
if field_name not in child_space:
raise RuntimeError(
f"output units were provided for repeated field {field_name}, but "
f"that field is not in this repeated space which has keys {child_space.keys()}"
)
prop = child_space[field_name]
if not isinstance(prop, (DiscreteProp, MultiBinary)):
assert isinstance(prop, BoxProp), "Unexpected field_space type"
if np.isscalar(prop.high) or len(prop.high) == 1:
if value is not None and not isinstance(value, str):
raise RuntimeError("output units must be provided as a scalar")
else:
raise RuntimeError("repeated field space unit conversions with lists of output types not currently implemented")
assert isinstance(GetUnitFromStr(prop.unit[0]), type(GetUnitFromStr(value))), "Unexpected unit dimension"
units[field_name] = GetUnitFromStr(value)
# Fill in any output units that were not provided with values from the sensor
for field_name, prop in child_space.items():
if field_name in units:
# This field name already provided so skip
continue
if hasattr(prop, 'unit'):
# Sensor measurement properties has unit attribute, get the default unit
if isinstance(prop.unit, str):
units[field_name] = GetUnitFromStr(prop.unit).DEFAULT
elif isinstance(prop.unit, list):
units[field_name] = GetUnitFromStr(prop.unit[0]).DEFAULT
else:
raise RuntimeError("Unexpected property unit type")
else:
# Sensor measurement properties does not have unit attribute, use NoneUnitType.DEFAULT
units[field_name] = NoneUnitType.DEFAULT
return units