Skip to content

Docking reward

This module implements the Reward Functions and Reward Validators specific to the 1D Docking task.

DockingReward (RewardFuncBase) ¤

This Reward Function is responsible for calculating the reward (or penalty) associated with a given docking attempt.

Source code in corl/rewards/docking_1d/docking_reward.py
class DockingReward(RewardFuncBase):
    """
    This Reward Function is responsible for calculating the reward (or penalty) associated with a given docking attempt.
    """

    def __init__(self, **kwargs) -> None:
        self.config: DockingRewardValidator
        super().__init__(**kwargs)

    @property
    def get_validator(self):
        """
        Method to return class's Validator.
        """
        return DockingRewardValidator

    def __call__(
        self,
        observation: OrderedDict,
        action,
        next_observation: OrderedDict,
        state: StateDict,
        next_state: StateDict,
        observation_space: StateDict,
        observation_units: StateDict,
    ) -> RewardDict:
        """
        This method determines if the agent has succeeded or failed and returns an appropriate reward.

        Parameters
        ----------
        observation : OrderedDict
            The observations available to the agent from the previous state.
        action
            The last action performed by the agent.
        next_observation : OrderedDict
            The observations available to the agent from the current state.
        state : StateDict
            The previous state of the simulation.
        next_state : StateDict
            The current state of the simulation.
        observation_space : StateDict
            The agent's observation space.
        observation_units : StateDict
            The units corresponding to values in the observation_space

        Returns
        -------
        reward : RewardDict
            The agent's reward for their docking attempt.
        """

        reward = RewardDict()
        value = 0.0

        deputy = get_platform_by_name(next_state, self.config.agent_name)

        position_sensor = get_sensor_by_name(deputy, self.config.position_sensor_name)  # type: ignore
        velocity_sensor = get_sensor_by_name(deputy, self.config.velocity_sensor_name)  # type: ignore

        position = position_sensor.get_measurement()
        velocity = velocity_sensor.get_measurement()
        sim_time = deputy.sim_time  # type: ignore

        chief_position = np.array([0])
        docking_region_radius = self.config.docking_region_radius

        distance = abs(position - chief_position)
        in_docking = distance <= docking_region_radius

        max_velocity_exceeded = self.config.velocity_threshold < velocity  # type: ignore

        if sim_time > self.config.timeout:
            # episode reached max time
            value = self.config.timeout_reward
        elif distance >= self.config.max_goal_distance:
            # agent exceeded max distance from goal
            value = self.config.distance_reward
        elif in_docking and max_velocity_exceeded:
            # agent exceeded velocity constraint within docking region
            value = self.config.crash_reward
        elif in_docking and not max_velocity_exceeded:
            # agent safely made it to the docking region
            value = self.config.success_reward
            if self.config.timeout:
                # Add time reward component, if timeout specified
                value += 1 - (sim_time / self.config.timeout)

        reward[self.config.agent_name] = value
        return reward

get_validator property readonly ¤

Method to return class's Validator.

__call__(self, observation, action, next_observation, state, next_state, observation_space, observation_units) special ¤

This method determines if the agent has succeeded or failed and returns an appropriate reward.

Parameters¤

observation : OrderedDict The observations available to the agent from the previous state. action The last action performed by the agent. next_observation : OrderedDict The observations available to the agent from the current state. state : StateDict The previous state of the simulation. next_state : StateDict The current state of the simulation. observation_space : StateDict The agent's observation space. observation_units : StateDict The units corresponding to values in the observation_space

Returns¤

reward : RewardDict The agent's reward for their docking attempt.

Source code in corl/rewards/docking_1d/docking_reward.py
def __call__(
    self,
    observation: OrderedDict,
    action,
    next_observation: OrderedDict,
    state: StateDict,
    next_state: StateDict,
    observation_space: StateDict,
    observation_units: StateDict,
) -> RewardDict:
    """
    This method determines if the agent has succeeded or failed and returns an appropriate reward.

    Parameters
    ----------
    observation : OrderedDict
        The observations available to the agent from the previous state.
    action
        The last action performed by the agent.
    next_observation : OrderedDict
        The observations available to the agent from the current state.
    state : StateDict
        The previous state of the simulation.
    next_state : StateDict
        The current state of the simulation.
    observation_space : StateDict
        The agent's observation space.
    observation_units : StateDict
        The units corresponding to values in the observation_space

    Returns
    -------
    reward : RewardDict
        The agent's reward for their docking attempt.
    """

    reward = RewardDict()
    value = 0.0

    deputy = get_platform_by_name(next_state, self.config.agent_name)

    position_sensor = get_sensor_by_name(deputy, self.config.position_sensor_name)  # type: ignore
    velocity_sensor = get_sensor_by_name(deputy, self.config.velocity_sensor_name)  # type: ignore

    position = position_sensor.get_measurement()
    velocity = velocity_sensor.get_measurement()
    sim_time = deputy.sim_time  # type: ignore

    chief_position = np.array([0])
    docking_region_radius = self.config.docking_region_radius

    distance = abs(position - chief_position)
    in_docking = distance <= docking_region_radius

    max_velocity_exceeded = self.config.velocity_threshold < velocity  # type: ignore

    if sim_time > self.config.timeout:
        # episode reached max time
        value = self.config.timeout_reward
    elif distance >= self.config.max_goal_distance:
        # agent exceeded max distance from goal
        value = self.config.distance_reward
    elif in_docking and max_velocity_exceeded:
        # agent exceeded velocity constraint within docking region
        value = self.config.crash_reward
    elif in_docking and not max_velocity_exceeded:
        # agent safely made it to the docking region
        value = self.config.success_reward
        if self.config.timeout:
            # Add time reward component, if timeout specified
            value += 1 - (sim_time / self.config.timeout)

    reward[self.config.agent_name] = value
    return reward

DockingRewardValidator (RewardFuncBaseValidator) pydantic-model ¤

This Validator ensures the DockingReward's config defines values relevant to successful and unsuccessful docking attempts.

Source code in corl/rewards/docking_1d/docking_reward.py
class DockingRewardValidator(RewardFuncBaseValidator):
    """
    This Validator ensures the DockingReward's config defines values relevant to successful and unsuccessful docking
    attempts.
    """
    success_reward: float
    timeout_reward: float
    distance_reward: float
    crash_reward: float
    timeout: float
    docking_region_radius: float
    max_goal_distance: float
    velocity_threshold: float
    position_sensor_name: str
    velocity_sensor_name: str