Skip to content

Remote


Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.

This is a US Government Work not subject to copyright protection in the US.

The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.


RayActorProxy ¤

Creates a 'proxy' to a named ray actor

Source code in corl/episode_parameter_providers/remote.py
class RayActorProxy:
    """Creates a 'proxy' to a named ray actor"""

    def __init__(self, actor_id: str, namespace: typing.Optional[str] = None):
        self._actor_id = actor_id
        self._namespace = namespace

    @property  # type: ignore
    @lru_cache(maxsize=1)
    def actor(self) -> ray.actor.ActorHandle:
        """Retrieves the named ray actor"""
        if not ray.is_initialized():
            raise RuntimeError('Cannot get named actor without ray')
        return ray.get_actor(self._actor_id, self._namespace)

actor: ActorHandle property readonly ¤

Retrieves the named ray actor

RemoteEpisodeParameterProvider (EpisodeParameterProvider) ¤

Wrap EpisodeParameterProvider as a ray remote actor and manage data passing between ray processes.

Source code in corl/episode_parameter_providers/remote.py
class RemoteEpisodeParameterProvider(EpisodeParameterProvider):
    """Wrap EpisodeParameterProvider as a ray remote actor and manage data passing between ray processes."""

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.config = typing.cast(RemoteEpisodeParameterProviderValidator, self.config)

        ray.remote(self.config.internal_class
                   ).options(  # type: ignore
                       name=self.config.actor_name,
                       namespace=self.config.namespace,
                       lifetime='detached',
                   ).remote(**self.config.internal_config, **kwargs)  # noqa: E126 I could not get this to format

        self._rap = RayActorProxy(self.config.actor_name, namespace=self.config.namespace)

    @property
    def get_validator(self) -> typing.Type[RemoteEpisodeParameterProviderValidator]:
        return RemoteEpisodeParameterProviderValidator

    @staticmethod
    def wrap_epp_factory(epp_factory: Factory, actor_name: str, namespace: str = None) -> Factory:
        """Wraps an existing EpisodeParameterProvider Factory as a RemoteEpisodeParameterProvider"""
        if not issubclass(epp_factory.type, EpisodeParameterProvider):  # type: ignore
            raise TypeError(f"Invalid Factory.type: {epp_factory.type}, {EpisodeParameterProvider.__qualname__} required")

        remote_config = {
            'namespace': namespace, 'actor_name': actor_name, 'internal_class': epp_factory.type, 'internal_config': epp_factory.config
        }

        return Factory(type=RemoteEpisodeParameterProvider, config=remote_config)

    def kill_actor(self) -> None:
        """Kill the underlying actor used by this provider."""
        try:
            ray.kill(self._rap.actor)
        except ray.exceptions.RaySystemError:
            pass

    def _do_get_params(self, rng: Randomness) -> typing.Tuple[ParameterModel, typing.Union[int, None]]:

        if isinstance(rng, Generator):
            seed = rng.integers(low=0, high=1000000)
            new_rng = np.random.default_rng(seed=seed)
        elif isinstance(rng, RandomState):
            seed = rng.randint(low=0, high=1000000)
            new_rng, _ = seeding.np_random(seed)
        else:
            raise RuntimeError(f"rng type provided to function was {rng}, but this class only knows numpy Generator or RandomState")

        return ray.get(self._rap.actor.get_params.remote(new_rng))  # type: ignore

    def compute_metrics(self) -> typing.Dict[str, typing.Any]:
        return ray.get(self._rap.actor.compute_metrics.remote())  # type: ignore

    def update(self, results: dict, rng: Randomness) -> None:

        if isinstance(rng, Generator):
            seed = rng.integers(low=0, high=1000000)
            new_rng = np.random.default_rng(seed=seed)
        elif isinstance(rng, RandomState):
            seed = rng.randint(low=0, high=1000000)
            new_rng, _ = seeding.np_random(seed)
        else:
            raise RuntimeError(f"rng type provided to function was {rng}, but this class only knows numpy Generator or RandomState")

        ray.get(self._rap.actor.update.remote(results, new_rng))  # type: ignore

    def save_checkpoint(self, checkpoint_path) -> None:
        ray.get(self._rap.actor.save_checkpoint.remote(checkpoint_path))  # type: ignore

    def load_checkpoint(self, checkpoint_path) -> None:
        ray.get(self._rap.actor.load_checkpoint.remote(checkpoint_path))  # type: ignore

get_validator: Type[corl.episode_parameter_providers.remote.RemoteEpisodeParameterProviderValidator] property readonly ¤

Get the validator for this class.

compute_metrics(self) ¤

Get metrics on the operation of this provider.

Often used in on_episode_end training callbacks.

Source code in corl/episode_parameter_providers/remote.py
def compute_metrics(self) -> typing.Dict[str, typing.Any]:
    return ray.get(self._rap.actor.compute_metrics.remote())  # type: ignore

kill_actor(self) ¤

Kill the underlying actor used by this provider.

Source code in corl/episode_parameter_providers/remote.py
def kill_actor(self) -> None:
    """Kill the underlying actor used by this provider."""
    try:
        ray.kill(self._rap.actor)
    except ray.exceptions.RaySystemError:
        pass

load_checkpoint(self, checkpoint_path) ¤

Load the internal state from a checkpoint.

Parameters¤

checkpoint_path : PathLike Filesystem path from which to restore the checkpoint

Source code in corl/episode_parameter_providers/remote.py
def load_checkpoint(self, checkpoint_path) -> None:
    ray.get(self._rap.actor.load_checkpoint.remote(checkpoint_path))  # type: ignore

save_checkpoint(self, checkpoint_path) ¤

Save the internal state of the parameter provider.

Parameters¤

checkpoint_path : PathLike Filesystem path at which to save the checkpoint

Source code in corl/episode_parameter_providers/remote.py
def save_checkpoint(self, checkpoint_path) -> None:
    ray.get(self._rap.actor.save_checkpoint.remote(checkpoint_path))  # type: ignore

update(self, results, rng) ¤

Update the operation of this provider.

Often used in on_train_result training callbacks.

Parameters¤

results : dict As described by ray.rllib.agents.callbacks.DefaultCallbacks.on_train_result. See https://docs.ray.io/en/master/_modules/ray/rllib/agents/callbacks.html#DefaultCallbacks.on_train_result rng : Union[Generator, RandomState] Random number generator from which to draw random values.

Source code in corl/episode_parameter_providers/remote.py
def update(self, results: dict, rng: Randomness) -> None:

    if isinstance(rng, Generator):
        seed = rng.integers(low=0, high=1000000)
        new_rng = np.random.default_rng(seed=seed)
    elif isinstance(rng, RandomState):
        seed = rng.randint(low=0, high=1000000)
        new_rng, _ = seeding.np_random(seed)
    else:
        raise RuntimeError(f"rng type provided to function was {rng}, but this class only knows numpy Generator or RandomState")

    ray.get(self._rap.actor.update.remote(results, new_rng))  # type: ignore

wrap_epp_factory(epp_factory, actor_name, namespace=None) staticmethod ¤

Wraps an existing EpisodeParameterProvider Factory as a RemoteEpisodeParameterProvider

Source code in corl/episode_parameter_providers/remote.py
@staticmethod
def wrap_epp_factory(epp_factory: Factory, actor_name: str, namespace: str = None) -> Factory:
    """Wraps an existing EpisodeParameterProvider Factory as a RemoteEpisodeParameterProvider"""
    if not issubclass(epp_factory.type, EpisodeParameterProvider):  # type: ignore
        raise TypeError(f"Invalid Factory.type: {epp_factory.type}, {EpisodeParameterProvider.__qualname__} required")

    remote_config = {
        'namespace': namespace, 'actor_name': actor_name, 'internal_class': epp_factory.type, 'internal_config': epp_factory.config
    }

    return Factory(type=RemoteEpisodeParameterProvider, config=remote_config)

RemoteEpisodeParameterProviderValidator (EpisodeParameterProviderValidator) pydantic-model ¤

Validation model for the inputs of RemoteEpisodeParameterProvider

Source code in corl/episode_parameter_providers/remote.py
class RemoteEpisodeParameterProviderValidator(EpisodeParameterProviderValidator):
    """Validation model for the inputs of RemoteEpisodeParameterProvider"""
    internal_class: typing.Type[EpisodeParameterProvider]
    internal_config: typing.Dict[str, typing.Any] = {}
    actor_name: str
    namespace: typing.Optional[str] = None

    @validator('internal_class')
    def internal_not_remote(cls, v):
        """Confirm that internal class is not also remote"""
        assert v != RemoteEpisodeParameterProvider
        return v

internal_not_remote(v) classmethod ¤

Confirm that internal class is not also remote

Source code in corl/episode_parameter_providers/remote.py
@validator('internal_class')
def internal_not_remote(cls, v):
    """Confirm that internal class is not also remote"""
    assert v != RemoteEpisodeParameterProvider
    return v