Skip to content

Env space util


Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.

This is a US Government Work not subject to copyright protection in the US.

The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.


ENV Space Util Module

EnvSpaceUtil ¤

ENV Space Util

Source code in corl/libraries/env_space_util.py
class EnvSpaceUtil:  # pylint: disable=R0904
    """ ENV Space Util
    """
    _logger = logging.getLogger("EnvSpaceUtil")
    sample_type = typing.Union[OrderedDict, dict, tuple, np.ndarray, list]

    @staticmethod
    def deep_sanity_check_space_sample(  # pylint: disable=R0912
        space: gym.spaces.Space,
        sample: sample_type,
        key_stack: str = "",
    ) -> None:
        """Ensure space sample is consistent with space. This will give a traceback of the exact space that failed

        Parameters
        ----------
        space: gym.spaces.Dict
            the space we expect the sample to conform to
        sample: sample_type
            sample that we are checking if it belongs to the given space
        key_stack: str
            string of the keys we have used when getting to this current spot in the observation_space, observation
            this is used for recursive calls do not set in the initial call to this function
        """
        if isinstance(space, gym.spaces.Dict):
            if not isinstance(sample, (StateDict, OrderedDict, dict)):
                raise ValueError(
                    f"space{key_stack}={space} was a gym.spaces.Dict type but\n"
                    f"sample{key_stack}={sample} \n"
                    f"is a {type(sample)} type and not a StateDict, OrderedDict, or dict"
                )
            for key, value in sample.items():
                EnvSpaceUtil.deep_sanity_check_space_sample(space.spaces[key], value, key_stack=f"{key_stack}[{key}]")
        elif isinstance(space, gym.spaces.Tuple):
            if not isinstance(sample, tuple):
                raise ValueError(
                    f"space{key_stack}={space} is a gym.spaces.Tuple type but\n"
                    f"sample{key_stack}={sample} \n"
                    f"is a {type(sample)} type and not a tuple type"
                )
            for idx, value in enumerate(sample):
                EnvSpaceUtil.deep_sanity_check_space_sample(space.spaces[idx], value, key_stack=f"{key_stack}[{idx}]")
        elif isinstance(space, gym.spaces.Discrete):
            if not isinstance(sample, (int, np.integer, np.ndarray)):
                raise ValueError(
                    f"space{key_stack}={space} is a gym.spaces.Discrete type but\n"
                    f"sample{key_stack}={sample} \n"
                    f"is a {type(sample)} type and not an int or np.integer or np.ndarray"
                )
            if not space.contains(sample):
                raise ValueError(
                    f"sample{key_stack} has value of {sample} however "
                    f"space{key_stack} has space definition of {space} {space.n}"
                )
        elif isinstance(space, gym.spaces.Box):
            if not isinstance(sample, (np.ndarray, list, np.floating)):
                raise ValueError(
                    f"space{key_stack}={space} is a gym.spaces.Box type but\n"
                    f"sample{key_stack}={sample} \n"
                    f"is a {type(sample)} type and not a np.ndarray, list, or np.float type"
                )
            if not space.contains(sample):
                sample_dtype = getattr(sample, 'dtype', None)
                raise ValueError(
                    f"sample{key_stack} has value of {sample} however "
                    f"space{key_stack} has space definition of {space} {space.low} {space.high} "
                    f"space dtype is {space.dtype}, sample dtype is {sample_dtype}"
                )
        elif isinstance(space, gym.spaces.MultiBinary):
            if not isinstance(sample, (np.ndarray, list, np.integer)):
                raise ValueError(
                    f"space{key_stack}={space} is a gym.spaces.MultiBinary type but\n"
                    f"sample{key_stack}={sample} \n"
                    f"is a {type(sample)} type and not a np.ndarray, list, or np.integer type"
                )
            if not space.contains(sample):
                raise ValueError(
                    f"sample{key_stack} has value of {sample} however "
                    f"space{key_stack} has space definition of {space} {space.n}"
                )
        elif isinstance(space, gym.spaces.MultiDiscrete):
            if not isinstance(sample, (np.ndarray, list, np.integer)):
                raise ValueError(
                    f"space{key_stack}={space} is a gym.spaces.MultiDiscrete type but\n"
                    f"sample{key_stack}={sample} \n"
                    f"is a {type(sample)} type and not a np.ndarray, list, or np.integer type"
                )
            if not space.contains(sample):
                raise ValueError(
                    f"sample{key_stack} has value of {sample} however "
                    f"space{key_stack} has space definition of {space} {space.nvec}"
                )
        elif isinstance(space, Repeated):
            if not isinstance(sample, list):
                raise ValueError(
                    f"space{key_stack}={space} is a ray.rllib.utils.spaces.repeated.Repeated type but\n"
                    f"sample{key_stack}={sample}\n"
                    f"is a {type(sample)} type and not a list  type"
                )
            for idx, item in enumerate(sample):
                EnvSpaceUtil.deep_sanity_check_space_sample(space.child_space, item, key_stack=f"{key_stack}[{idx}]")

    @staticmethod
    def sanity_check_space_sample(space, sample):
        """[summary]

        Parameters
        ----------
        space : [type]
            [description]
        sample : [type]
            [description]

        Raises
        ------
        RuntimeError
            [description]
        """
        if not space.contains(sample):
            raise RuntimeError(f"sample of {sample} does not meet space {space} setup")

    @staticmethod
    def deep_merge_dict(source: dict, destination: dict):
        """
        Merget two dictionaries that also can contain sub dictionaries. This function returns the second dict
        but it also modifies it in place

        run me with nosetests --with-doctest file.py

        >>> a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } }
        >>> b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } }
        >>> merge(b, a) == { 'first' : { 'all_rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } }
        True
        """
        for key, value in source.items():
            if isinstance(value, dict):
                # get node or create one
                node = destination.setdefault(key, OrderedDict())
                EnvSpaceUtil.deep_merge_dict(value, node)
            else:
                destination[key] = value

        return destination

    @staticmethod
    def scale_space(space: gym.spaces.Space, scale: float) -> gym.spaces.Space:
        """
        Multiplies the low and high properties of all the Boxes in the given gym space by the scale input
        Parameters
        ----------
        space: gym.spaces.Space
            the gym space to scale the Boxes of
        scale: float
            what to multiply the Box low and high by
        Returns
        -------
        gym.spaces.Space
            the scaled gym space
        """
        # TODO: this copy probably doesn't actually work but I can dream
        val = copy.deepcopy(space)
        if isinstance(space, gym.spaces.Dict):
            new_dict = OrderedDict()
            for key, value in space.spaces.items():
                new_dict[key] = EnvSpaceUtil.scale_space(value, scale=scale)
            val = gym.spaces.Dict(spaces=new_dict)
        elif isinstance(space, gym.spaces.Tuple):
            new_thing = [EnvSpaceUtil.scale_space(sp, scale=scale) for sp in space.spaces]
            val = gym.spaces.Tuple(tuple(new_thing))
        elif isinstance(space, gym.spaces.Box):
            scaled_box = gym.spaces.Box(
                low=np.multiply(space.low, scale).astype(np.float32),
                high=np.multiply(space.high, scale).astype(np.float32),
                shape=space.shape,
                dtype=np.float32
            )
            val = scaled_box

        return val

    @staticmethod
    def zero_mean_space(space: gym.spaces.Space) -> gym.spaces.Space:
        """
        Returns a space object where every Box instance has its low and high shifted to be zero mean
        Parameters
        ----------
        space: gym.spaces.Space
            The gym space to zero mean

        Returns
        -------
        gym.spaces.Space
            A gym space the same as the input but with the Box instances shifted
        """
        # TODO: this copy doesn't actually work but I can dream
        val = copy.deepcopy(space)
        if isinstance(space, gym.spaces.Dict):
            new_dict = OrderedDict()
            for key, value in space.spaces.items():
                new_dict[key] = EnvSpaceUtil.zero_mean_space(value)
            val = gym.spaces.Dict(spaces=new_dict)
        elif isinstance(space, gym.spaces.Tuple):
            new_thing = [EnvSpaceUtil.zero_mean_space(sp) for sp in space.spaces]
            val = gym.spaces.Tuple(tuple(new_thing))
        elif isinstance(space, gym.spaces.Box):
            mean = (space.high + space.low) / 2
            zero_mean_box = gym.spaces.Box(low=space.low - mean, high=space.high - mean, shape=space.shape, dtype=np.float32)
            val = zero_mean_box

        return val

    @staticmethod
    def space_box_min_maxer(
        space_likes: typing.Tuple[gym.spaces.Space],
        out_min: float = -1.0,
        out_max: float = 1.0,
    ) -> gym.spaces.Space:
        """
        Makes a gym box to the out_min and out_max range

        Parameters
        ----------
        space_likes: typing.Tuple[gym.spaces.Space]
            the gym space to turn all boxes into the scaled space
        out_min: float
            the new low for the boxes
        out_max: float
            the new high for the boxes

        Returns
        -------
        gym.spaces.Space:
            the new gym spaces where all boxes have had their bounds changed
        """
        space_arg = space_likes[0]
        if isinstance(space_arg, gym.spaces.Box):
            return gym.spaces.Box(low=out_min, high=out_max, shape=space_arg.shape, dtype=np.float32)
        return copy.deepcopy(space_arg)

    @staticmethod
    def normalize_space(
        space: gym.spaces.Space,
        out_min=-1,
        out_max=1,
    ) -> gym.spaces.Space:
        """
        This is a convenience wrapper for box_scaler

        Parameters
        ----------
        space: gym.spaces.Space
            the gym space to turn all boxes into the scaled space
        out_min: float
            the new low for the boxes
        out_max: float
            the new high for the boxes

        Returns
        -------
        gym.spaces.Space:
            the new gym spaces where all boxes have had their bounds changed
        """
        return EnvSpaceUtil.iterate_over_space_likes(
            func=EnvSpaceUtil.space_box_min_maxer,
            space_likes=(space, ),
            out_min=out_min,
            out_max=out_max,
            return_space=True,
        )

    @staticmethod
    def get_zero_sample_from_space(space: gym.spaces.Space) -> sample_type:
        """
        Given a gym space returns an instance of that space but instead of sampling from the
        gym space, returns all zeros. If the space is not a Box and we cannot iterate over it
        then we will sample from it.

        Parameters
        ----------
        space: gym.spaces.Space
            The gym space to zero sample from

        Returns
        -------
        sample_type
            The instance of the gym space but all Box spaces are sampled as zero
        """
        val = space.sample()
        if isinstance(space, gym.spaces.Dict):
            new_dict = OrderedDict()
            for key, value in space.spaces.items():
                new_dict[key] = EnvSpaceUtil.get_zero_sample_from_space(value)
            val = new_dict
        elif isinstance(space, gym.spaces.Tuple):
            new_tuple = [EnvSpaceUtil.get_zero_sample_from_space(sp) for sp in space.spaces]
            val = tuple(new_tuple)
        elif isinstance(space, gym.spaces.Box):
            val = np.zeros(shape=space.shape, dtype=np.float32)
        return val

    @staticmethod
    def get_mean_sample_from_space(space: gym.spaces.Space) -> sample_type:
        """
        Given a gym space returns an instance of that space but instead of sampling from the
        gym space, returns all zeros. If the space is not a Box and we cannot iterate over it
        then we will sample from it.

        Parameters
        ----------
        space: gym.spaces.Space
            The gym space to zero sample from

        Returns
        -------
        sample_type
            The instance of the gym space but all Box spaces are sampled as zero
        """
        val = space.sample()
        if isinstance(space, gym.spaces.Dict):
            new_dict = OrderedDict()
            for key, value in space.spaces.items():
                new_dict[key] = EnvSpaceUtil.get_zero_sample_from_space(value)
            val = new_dict
        elif isinstance(space, gym.spaces.Tuple):
            new_tuple = [EnvSpaceUtil.get_zero_sample_from_space(sp) for sp in space.spaces]
            val = tuple(new_tuple)
        elif isinstance(space, gym.spaces.Box):
            val = (space.high + space.low) / 2.0
        return val

    @staticmethod
    def add_space_samples(
        space_template: gym.spaces.Space,
        space_sample1: sample_type,
        space_sample2: sample_type,
    ) -> sample_type:
        """
        Adds together two instances of gym spaces. This only adds the ndarray or list types (that were sampled from Box)
        If the object is not a ndarray or list then the value of space_sample1 is returned by default
        Parameters
        ----------
        space_template: gym.spaces.Space
            The template to use for adding these space instances.
            This is to determine the difference between a Box and Discrete or MultiDiscrete or MultiBinary
        space_sample1: sample_type
            The first instance to add
        space_sample2: sample_type
            The second instance to add
        Returns
        -------
        sample_type
            an instance of the space object but with all the Box types added

        """
        # if not type(space_sample1) == type(space_sample2):
        #     raise ValueError('space instances must be of same type')
        # TODO: I want to check they are the same type but dict and OrderedDict should match which makes this annoying
        val: EnvSpaceUtil.sample_type
        if isinstance(space_template, gym.spaces.Dict):
            new_dict = OrderedDict()
            for key, space_value in space_template.spaces.items():
                value1 = space_sample1[key]
                value2 = space_sample2[key]
                new_dict[key] = EnvSpaceUtil.add_space_samples(space_value, value1, value2)
            val = new_dict
        elif isinstance(space_template, gym.spaces.Tuple):
            new_tuple = [EnvSpaceUtil.add_space_samples(*args) for args in zip(space_template, space_sample1, space_sample2)]
            val = tuple(new_tuple)
        elif isinstance(space_template, gym.spaces.Box):
            if isinstance(space_sample1, np.ndarray):
                val = np.array(space_sample1 + space_sample2)
            elif isinstance(space_sample1, list):
                val = [value1 + value2 for value1, value2 in zip(space_sample1, space_sample2)]
        else:
            val = copy.deepcopy(space_sample1)
        return val

    @staticmethod
    def clip_space_sample_to_space(space_sample: sample_type, space: gym.spaces.Space, is_wrap: bool = False) -> sample_type:
        """
        Clips a space instance to a given space. After this the space should contain the space instance

        Parameters
        ----------
        space_sample: sample_type
            the space instance we are going to clip
        space: gym.spaces.Space
            the gym space to clip the instance to.
        Returns
        -------
        sample_type
            the clipped space instance
        """
        val = copy.deepcopy(space_sample)
        if isinstance(space, gym.spaces.Dict):
            new_dict = OrderedDict()
            for key, space_value in space.spaces.items():
                space_sample_value = space_sample[key]
                new_dict[key] = EnvSpaceUtil.clip_space_sample_to_space(space_sample_value, space_value, is_wrap)
            val = new_dict
        elif isinstance(space, gym.spaces.Tuple):
            new_tuple = [EnvSpaceUtil.clip_space_sample_to_space(siv, sv, is_wrap) for siv, sv in zip(space_sample, space)]
            val = tuple(new_tuple)
        elif isinstance(space, gym.spaces.Box):
            if is_wrap:
                # Takes care of the case where the controls wrap at the min/max value
                # Example: Range = 0-1, value = 1.1 ----> clipping puts to .1
                if space_sample > space.high:
                    val = space.low + (space_sample - space.high)
                elif space_sample < space.low:
                    val = space.high - (space.low - space_sample)
            else:
                # Takes care of the case where the controls saturate at the min/max value
                # Example: Range = 0-1, value = 1.1 ----> clipping puts to 1
                assert isinstance(space_sample, (Sequence, np.ndarray)), \
                    f'Box spaces must have sequence samples, received {type(space_sample).__name__}'
                val = np.clip(a=space_sample, a_min=space.low, a_max=space.high)
        return val

    @staticmethod
    def turn_box_into_tuple_of_discretes(
        space: gym.spaces.Space, num_actions: typing.Union[int, typing.List[int], Iterable, dict] = 10
    ) -> gym.spaces.Space:
        """
        Takes a gym space and replaces any Box types with MultiDiscrete types with the same number of
        possible of discrete as the box and each discrete has the same number of possible actions = num_actions
        Parameters
        ----------
        space
        num_actions: int, list, dict
            how many discrete actions to give to each possible discrete in the MultiDiscrete
        Returns
        -------
        gym.spaces.Space
            A gym space where all the Box types are replaced with MultiDiscrete types
        """

        # first pass through the code num_actions is an int or list, this code turns it into an
        # iterator for the rest of the recursive calls
        if isinstance(num_actions, int):
            num_actions = repeat(num_actions)
        elif isinstance(num_actions, (list, tuple)):
            num_actions = iter(num_actions)

        # TODO: this copy doesn't actually work but I can dream
        val = copy.deepcopy(space)

        if isinstance(space, gym.spaces.Dict):
            new_dict = OrderedDict()
            for key, space_value in space.spaces.items():
                if isinstance(num_actions, dict):
                    new_dict[key] = EnvSpaceUtil.turn_box_into_tuple_of_discretes(space=space_value, num_actions=num_actions[key])
                else:
                    new_dict[key] = EnvSpaceUtil.turn_box_into_tuple_of_discretes(space=space_value, num_actions=num_actions)
            val = gym.spaces.Dict(spaces=new_dict)
        elif isinstance(space, gym.spaces.Tuple):
            new_tuple = [EnvSpaceUtil.turn_box_into_tuple_of_discretes(sv, num_actions=num_actions) for sv in space]
            if len(new_tuple) == 1 and isinstance(new_tuple[0], gym.spaces.Tuple):
                val = new_tuple[0]
            else:
                val = gym.spaces.Tuple(tuple(new_tuple))
        elif isinstance(space, gym.spaces.Box):
            assert isinstance(num_actions, Iterator), f'num_actions must be iterator, received {type(num_actions).__name__}'
            val = gym.spaces.Tuple([Discrete(np.asarray(next(num_actions))) for _ in range(0, space.low.size)])

        return val

    # TODO: maybe use this function in more places? maybe not? it could be slower?
    @staticmethod
    def iterate_over_space_likes(
        func,
        space_likes: typing.Tuple[typing.Union[gym.spaces.Space, sample_type], ...],
        return_space: bool,
        *func_args,
        **func_kwargs,
    ) -> typing.Union[gym.spaces.Space, sample_type]:
        """
        Iterates over space_likes which are tuple, dicts or the gym equivalent.
        When it encounters an actual item that is not a container it calls the func method.
        put any args, or kwargs you want to give to func in the overall call and they will be forwarded

        Parameters
        ----------
        func:
            the function to apply
        space_likes: typing.Tuple[typing.Union[gym.spaces.Space, sample_type], ...]
            the spaces to iterate over. They must have the same keywords for dicts and number of items for tuples
        return_space: bool
            if true the containers will be gym space equivalents
        func_args
            the arguments to give to func
        func_kwargs
            the keyword arguments to give to func

        Returns
        -------
        The contained result by calling func and stuffing back into the tuples and dicts in the call
        if return_space=True the containers are gym spaces
        """

        first_space_like = space_likes[0]
        val = None
        if isinstance(first_space_like, (gym.spaces.Dict, dict, OrderedDict)):
            new_dict = OrderedDict()
            keys: typing.KeysView
            if isinstance(first_space_like, gym.spaces.Dict):
                keys = first_space_like.spaces.keys()
            else:
                keys = first_space_like.keys()
            for key in keys:
                new_space_likes = tuple(spacer[key] for spacer in space_likes)
                new_dict[key] = EnvSpaceUtil.iterate_over_space_likes(  # type: ignore[misc]
                    func,
                    space_likes=new_space_likes,
                    return_space=return_space,
                    *func_args,
                    **func_kwargs,
                )
            val = gym.spaces.Dict(spaces=new_dict) if return_space else new_dict
        elif isinstance(first_space_like, (gym.spaces.Tuple, tuple)):
            new_tuple = [
                EnvSpaceUtil.iterate_over_space_likes(  # type: ignore[misc]
                    func,
                    space_likes=new_space_likes,
                    return_space=return_space,
                    *func_args,
                    **func_kwargs,
                ) for new_space_likes in zip(*space_likes)
            ]
            val = (gym.spaces.Tuple(tuple(new_tuple)) if return_space else tuple(new_tuple))
        elif isinstance(first_space_like, Repeated):
            # if space_likes is longer than 1 that means that return_space = False
            # if there is only the space that means we need to generate just the space
            # itself and can use the second path, however for the case where we have a sample
            # it comes in as a list and we must iterate over the entire repeated list and process it
            if len(space_likes) > 1:
                repeated_samples = space_likes[1]
                assert isinstance(repeated_samples, MutableSequence), \
                    f'repeated_samples must be MutableSequence, received {type(repeated_samples).__name__}'
                for indx, sample in enumerate(repeated_samples):
                    repeated_samples[indx] = EnvSpaceUtil.iterate_over_space_likes(  # type: ignore[misc]
                        func,
                        space_likes=(first_space_like.child_space, sample),
                        return_space=return_space,
                        *func_args,
                        **func_kwargs,
                    )
                val = repeated_samples
            else:
                new_child_space = EnvSpaceUtil.iterate_over_space_likes(  # type: ignore[misc]
                    func,
                    space_likes=(first_space_like.child_space, ),
                    return_space=return_space,
                    *func_args,
                    **func_kwargs,
                )

                val = Repeated(child_space=new_child_space, max_len=first_space_like.max_len)
        else:
            val = func(space_likes, *func_args, **func_kwargs)
        return val

    @staticmethod
    def turn_orig_space_box_to_cont_sample(space_likes: typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type]) -> sample_type:
        """
        Given a continuous space and a discrete space and a sample of the discrete space,
        this function turns the discrete sample into a continuous sample

        Parameters
        ----------
        space_likes: typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type]
            the first is the original gym space that is continuous,
            the second is the new gym space with discrete objects
            the last is the space sample that is of discrete types

        Returns
        -------
        sample_type:
            the sample space that is now a continuous version of the discrete sample space according to the cont space
        """
        (original_space_arg, discrete_only_space_arg, space_sample_arg) = space_likes
        if isinstance(original_space_arg,
                      gym.spaces.Box) and (isinstance(discrete_only_space_arg, (gym.spaces.MultiDiscrete, gym.spaces.Discrete))):
            if isinstance(discrete_only_space_arg, gym.spaces.MultiDiscrete):
                possible_n = discrete_only_space_arg.nvec
            elif isinstance(discrete_only_space_arg, gym.spaces.Discrete):
                possible_n = discrete_only_space_arg.n
            else:
                raise RuntimeError("This should not be reachable")

            action = space_sample_arg
            low = original_space_arg.low
            high = original_space_arg.high
            new_cont_sample = (action / (possible_n - 1)) * (high - low) + low

            return new_cont_sample

        return copy.deepcopy(space_sample_arg)

    @staticmethod
    def turn_orig_space_box_to_cont_sample_powerspace(
        space_likes: typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type, sample_type]
    ) -> sample_type:
        """
        Given a continuous space and a discrete space and a sample of the discrete space,
        this function turns the discrete sample into a continuous sample

        Parameters
        ----------
        space_likes: typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type]
            the first is the original gym space that is continuous,
            the second is the new gym space with discrete objects
            the last is the space sample that is of discrete types

        Returns
        -------
        sample_type:
            the sample space that is now a continuous version of the discrete sample space according to the cont space
        """
        # There is a test built for this function. Please keep the test up to date for any changes you make.
        (original_space_arg, discrete_only_space_arg, space_sample_arg, pow_n) = space_likes
        if isinstance(original_space_arg,
                      gym.spaces.Box) and (isinstance(discrete_only_space_arg, (gym.spaces.Tuple, gym.spaces.Discrete))):
            if isinstance(discrete_only_space_arg, gym.spaces.Tuple):
                possible_n = np.asarray([x.n for x in discrete_only_space_arg])
                pow_n = np.asarray(pow_n)
                discrete_sample = np.asarray(space_sample_arg)
            elif isinstance(discrete_only_space_arg, gym.spaces.Discrete):
                possible_n = np.asarray([discrete_only_space_arg.n])
                pow_n = np.asarray([pow_n])
                discrete_sample = np.asarray([space_sample_arg])
            else:
                raise RuntimeError("This should not be reachable")

            low = original_space_arg.low
            high = original_space_arg.high

            if any(low > high):
                raise RuntimeError("lower bounds of space somehow higher than high bounds")

            if not all(possible_n % 2):
                if any(pow_n > 1):
                    raise RuntimeError("The exponential power factor not supported when possible_n % 2")

                difference = high - low
                movement_per_space_sample = difference / possible_n

                new_cont_sample = low + (movement_per_space_sample * space_sample_arg)

                return new_cont_sample

            if any(pow_n <= 0):
                raise RuntimeError("The exponential power factor used to stretch/shrink the discrete space must be greater than zero")

            if any(-low != high) or any(low >= high):
                raise RuntimeError("Currently only support symmetric space about zero for power space setup - TODO")
            p_high = np.power(high, 1 / pow_n.astype(float))
            p_low = -p_high  # Since we have specified the space must be symetric around 0
            new_cont_sample = (discrete_sample / (possible_n - 1)) * (p_high - p_low) + p_low
            new_cont_sample = np.power(np.abs(new_cont_sample), pow_n) * np.sign(new_cont_sample)

            return new_cont_sample

        return copy.deepcopy(space_sample_arg)

    @staticmethod
    def turn_discrete_action_back_to_cont(
        original_space: gym.spaces.Space,
        discrete_only_space: gym.spaces.Space,
        space_sample: sample_type,
    ) -> sample_type:
        """
        This is a convenience wrapper for turn_orig_space_box_to_cont_sample

        Parameters
        ----------
        original_space: gym.spaces.Space
            the continuous space
        discrete_only_space: gym.spaces.Space
            the discrete space
        space_sample: sample_type
            the sample of the discrete space

        Returns
        -------
        sample_type:
            the continuous version of the discrete sample
        """
        return EnvSpaceUtil.iterate_over_space_likes(
            EnvSpaceUtil.turn_orig_space_box_to_cont_sample,
            space_likes=(original_space, discrete_only_space, space_sample),
            return_space=False,
        )

    @staticmethod
    def turn_discrete_action_back_to_cont_powerspace(
        original_space: gym.spaces.Space, discrete_only_space: gym.spaces.Space, space_sample: sample_type, pow_n: dict
    ) -> sample_type:
        """
        This is a convenience wrapper for turn_orig_space_box_to_cont_sample_powerspace

        Parameters
        ----------
        original_space: gym.spaces.Space
            the continuous space
        discrete_only_space: gym.spaces.Space
            the discrete space
        space_sample: sample_type
            the sample of the discrete space

        Returns
        -------
        sample_type:
            the continuous version of the discrete sample
        """
        return EnvSpaceUtil.iterate_over_space_likes(
            EnvSpaceUtil.turn_orig_space_box_to_cont_sample_powerspace,
            space_likes=(original_space, discrete_only_space, space_sample, pow_n),
            return_space=False,
        )

    @staticmethod
    def box_scaler(
        space_likes: typing.Tuple[gym.spaces.Space, sample_type],
        out_min: float = -1,
        out_max: float = 1,
    ) -> sample_type:
        """
        This scales a box space to be between the out_min and out_max arguments

        Parameters
        ----------
        space_likes: typing.Tuple[gym.spaces.Space, sample_type]
            the first is the gym spade to determine the input min and max
            the second is the sample of this space to scale
        out_min: float
            the minimum of the output scaling
        out_max: float
            the maximum of the output scaling

        Returns
        -------
        sample_type:
            the scaled sample with min of out_min and max of out_max
        """
        (space_arg, space_sample_arg) = space_likes
        if isinstance(space_arg, gym.spaces.Box) and space_arg.is_bounded():
            val = space_sample_arg
            in_min = space_arg.low
            in_max = space_arg.high
            norm_value = (out_max - out_min) * (val - in_min) / (in_max - in_min) + out_min
            return norm_value.astype(np.float32)
        return copy.deepcopy(space_sample_arg)

    @staticmethod
    def scale_sample_from_space(
        space: gym.spaces.Space,
        space_sample: sample_type,
        out_min: float = -1,
        out_max: float = 1,
    ) -> sample_type:
        """
        This is a convenience wrapper for box_scaler

        Parameters
        ----------
        space: gym.spaces.Space
            the space to use for the input min and max
        space_sample: sample_type
            the space sample to scale
        out_min: float
            the minimum of the output scaling
        out_max: float
            the maximum of the output scaling

        Returns
        -------
        sample_type:
            the scaled sample with min of out_min and max of
            out_max (this is in dicts and tuples the same as space_sample was)
        """
        return EnvSpaceUtil.iterate_over_space_likes(
            func=EnvSpaceUtil.box_scaler,
            space_likes=(space, space_sample),
            out_min=out_min,
            out_max=out_max,
            return_space=False,
        )

    @staticmethod
    def box_unscaler(
        space_likes: typing.Tuple[gym.spaces.Space, sample_type],
        out_min: float = -1,
        out_max: float = 1,
    ) -> sample_type:
        """
        Unscales the space_sample according to be the scale of the input space.
        In this sense out_min and out_max are the min max of the sample

        Parameters
        ----------
        space_likes: typing.Tuple[gym.spaces.Space, sample_type]
            the first is the gym spade to determine the input min and max
            the second is the sample of this space to scale
        out_min: float
            the minimum of the sample
        out_max: float
            the maximum of the sample

        Returns
        -------
        space_type:
            the unscaled sample
        """
        (space_arg, space_sample_arg) = space_likes
        if isinstance(space_arg, gym.spaces.Box):
            norm_value = space_sample_arg
            assert isinstance(norm_value, np.ndarray), f'norm_value must be np.ndarray, received {type(norm_value).__name__}'
            in_min = space_arg.low
            in_max = space_arg.high
            val = (norm_value - out_min) * (in_max - in_min) / (out_max - out_min) + in_min
            return val
        return copy.deepcopy(space_sample_arg)

    @staticmethod
    def unscale_sample_from_space(
        space: gym.spaces.Space,
        space_sample: sample_type,
        out_min: float = -1,
        out_max: float = 1,
    ) -> sample_type:
        """
        This is a convenience wrapper for box_unscaler

        Parameters
        ----------
        space: gym.spaces.Space
            the gym space we will unscale to
        space_sample: sample_type
            the sample we want to unscale. thus this is a scaled version of the input space
            with a min,max defined by the arguments out_min,out_max
        out_min: float
            the minimum of the sample
        out_max: float
            the maximum of the sample

        Returns
        -------
        sample_type:
            the unscaled version of the space_sample. Thus the space should now contain this output sample
        """

        return EnvSpaceUtil.iterate_over_space_likes(
            func=EnvSpaceUtil.box_unscaler,
            space_likes=(space, space_sample),
            out_min=out_min,
            out_max=out_max,
            return_space=False,
        )

    @staticmethod
    def convert_config_param_to_space(action_space: gym.spaces.Space, parameter: typing.Union[int, float, list, dict]) -> dict:
        """
        This is a a convert for parameters used in action space conversions

        Parameters
        ----------
        space: gym.spaces.Space
            the gym space that defines the actions
        parameter: typing.Union[int, float, list, dict]
            the parameter as defined in a a config

        Returns
        -------
        dict:
            actions are dicts, the output is a dict with the parameters set for each key
        """
        action_params = {}
        sample_action = action_space.sample().items()
        if isinstance(parameter, (int, float)):
            parameter = [parameter]  # change to list if needed
            if len(parameter) == 1:  # if length 1, broadcast to the proper length
                for key, value in sample_action:
                    action_params[key] = parameter * len(value)
        elif isinstance(parameter, list):
            if len(sample_action) != 1:
                raise ValueError(f"list configs can only be applied to action space dicts with single key: {sample_action}")
            for key, value in sample_action:
                if len(value) != len(parameter):
                    raise ValueError(f"config length does not match action length of {len(value)}. {parameter}")
                action_params[key] = parameter
        elif isinstance(parameter, dict):  # pylint: disable=too-many-nested-blocks
            for key, value in sample_action:
                if key not in parameter:
                    if isinstance(key, tuple):  # parameters may be specfied using tuple values as keys
                        action_params[key] = np.zeros(len(key))  # type: ignore[assignment]
                        for idx, sub_key in enumerate(key):
                            if sub_key not in parameter:
                                raise ValueError(f"action space key not in config key: {sub_key}, config: {parameter} ")
                            action_params[key][idx] = parameter[sub_key]
                    else:
                        raise ValueError(f"action space key not in config key: {key}, config: {parameter} ")
                elif len(value) != len(parameter[key]):
                    raise ValueError(f"config value length for key {key} does not match action length of {len(value)}. {parameter}")
                else:
                    action_params[key] = parameter[key]

        return action_params

add_space_samples(space_template, space_sample1, space_sample2) staticmethod ¤

Adds together two instances of gym spaces. This only adds the ndarray or list types (that were sampled from Box) If the object is not a ndarray or list then the value of space_sample1 is returned by default Parameters


gym.spaces.Space

The template to use for adding these space instances. This is to determine the difference between a Box and Discrete or MultiDiscrete or MultiBinary

sample_type

The first instance to add

sample_type

The second instance to add

Returns¤

sample_type an instance of the space object but with all the Box types added

Source code in corl/libraries/env_space_util.py
@staticmethod
def add_space_samples(
    space_template: gym.spaces.Space,
    space_sample1: sample_type,
    space_sample2: sample_type,
) -> sample_type:
    """
    Adds together two instances of gym spaces. This only adds the ndarray or list types (that were sampled from Box)
    If the object is not a ndarray or list then the value of space_sample1 is returned by default
    Parameters
    ----------
    space_template: gym.spaces.Space
        The template to use for adding these space instances.
        This is to determine the difference between a Box and Discrete or MultiDiscrete or MultiBinary
    space_sample1: sample_type
        The first instance to add
    space_sample2: sample_type
        The second instance to add
    Returns
    -------
    sample_type
        an instance of the space object but with all the Box types added

    """
    # if not type(space_sample1) == type(space_sample2):
    #     raise ValueError('space instances must be of same type')
    # TODO: I want to check they are the same type but dict and OrderedDict should match which makes this annoying
    val: EnvSpaceUtil.sample_type
    if isinstance(space_template, gym.spaces.Dict):
        new_dict = OrderedDict()
        for key, space_value in space_template.spaces.items():
            value1 = space_sample1[key]
            value2 = space_sample2[key]
            new_dict[key] = EnvSpaceUtil.add_space_samples(space_value, value1, value2)
        val = new_dict
    elif isinstance(space_template, gym.spaces.Tuple):
        new_tuple = [EnvSpaceUtil.add_space_samples(*args) for args in zip(space_template, space_sample1, space_sample2)]
        val = tuple(new_tuple)
    elif isinstance(space_template, gym.spaces.Box):
        if isinstance(space_sample1, np.ndarray):
            val = np.array(space_sample1 + space_sample2)
        elif isinstance(space_sample1, list):
            val = [value1 + value2 for value1, value2 in zip(space_sample1, space_sample2)]
    else:
        val = copy.deepcopy(space_sample1)
    return val

box_scaler(space_likes, out_min=-1, out_max=1) staticmethod ¤

This scales a box space to be between the out_min and out_max arguments

Parameters¤

typing.Tuple[gym.spaces.Space, sample_type]

the first is the gym spade to determine the input min and max the second is the sample of this space to scale

float

the minimum of the output scaling

float

the maximum of the output scaling

Returns¤

Sample_type

the scaled sample with min of out_min and max of out_max

Source code in corl/libraries/env_space_util.py
@staticmethod
def box_scaler(
    space_likes: typing.Tuple[gym.spaces.Space, sample_type],
    out_min: float = -1,
    out_max: float = 1,
) -> sample_type:
    """
    This scales a box space to be between the out_min and out_max arguments

    Parameters
    ----------
    space_likes: typing.Tuple[gym.spaces.Space, sample_type]
        the first is the gym spade to determine the input min and max
        the second is the sample of this space to scale
    out_min: float
        the minimum of the output scaling
    out_max: float
        the maximum of the output scaling

    Returns
    -------
    sample_type:
        the scaled sample with min of out_min and max of out_max
    """
    (space_arg, space_sample_arg) = space_likes
    if isinstance(space_arg, gym.spaces.Box) and space_arg.is_bounded():
        val = space_sample_arg
        in_min = space_arg.low
        in_max = space_arg.high
        norm_value = (out_max - out_min) * (val - in_min) / (in_max - in_min) + out_min
        return norm_value.astype(np.float32)
    return copy.deepcopy(space_sample_arg)

box_unscaler(space_likes, out_min=-1, out_max=1) staticmethod ¤

Unscales the space_sample according to be the scale of the input space. In this sense out_min and out_max are the min max of the sample

Parameters¤

typing.Tuple[gym.spaces.Space, sample_type]

the first is the gym spade to determine the input min and max the second is the sample of this space to scale

float

the minimum of the sample

float

the maximum of the sample

Returns¤

Space_type

the unscaled sample

Source code in corl/libraries/env_space_util.py
@staticmethod
def box_unscaler(
    space_likes: typing.Tuple[gym.spaces.Space, sample_type],
    out_min: float = -1,
    out_max: float = 1,
) -> sample_type:
    """
    Unscales the space_sample according to be the scale of the input space.
    In this sense out_min and out_max are the min max of the sample

    Parameters
    ----------
    space_likes: typing.Tuple[gym.spaces.Space, sample_type]
        the first is the gym spade to determine the input min and max
        the second is the sample of this space to scale
    out_min: float
        the minimum of the sample
    out_max: float
        the maximum of the sample

    Returns
    -------
    space_type:
        the unscaled sample
    """
    (space_arg, space_sample_arg) = space_likes
    if isinstance(space_arg, gym.spaces.Box):
        norm_value = space_sample_arg
        assert isinstance(norm_value, np.ndarray), f'norm_value must be np.ndarray, received {type(norm_value).__name__}'
        in_min = space_arg.low
        in_max = space_arg.high
        val = (norm_value - out_min) * (in_max - in_min) / (out_max - out_min) + in_min
        return val
    return copy.deepcopy(space_sample_arg)

clip_space_sample_to_space(space_sample, space, is_wrap=False) staticmethod ¤

Clips a space instance to a given space. After this the space should contain the space instance

Parameters¤

sample_type

the space instance we are going to clip

gym.spaces.Space

the gym space to clip the instance to.

Returns¤

sample_type the clipped space instance

Source code in corl/libraries/env_space_util.py
@staticmethod
def clip_space_sample_to_space(space_sample: sample_type, space: gym.spaces.Space, is_wrap: bool = False) -> sample_type:
    """
    Clips a space instance to a given space. After this the space should contain the space instance

    Parameters
    ----------
    space_sample: sample_type
        the space instance we are going to clip
    space: gym.spaces.Space
        the gym space to clip the instance to.
    Returns
    -------
    sample_type
        the clipped space instance
    """
    val = copy.deepcopy(space_sample)
    if isinstance(space, gym.spaces.Dict):
        new_dict = OrderedDict()
        for key, space_value in space.spaces.items():
            space_sample_value = space_sample[key]
            new_dict[key] = EnvSpaceUtil.clip_space_sample_to_space(space_sample_value, space_value, is_wrap)
        val = new_dict
    elif isinstance(space, gym.spaces.Tuple):
        new_tuple = [EnvSpaceUtil.clip_space_sample_to_space(siv, sv, is_wrap) for siv, sv in zip(space_sample, space)]
        val = tuple(new_tuple)
    elif isinstance(space, gym.spaces.Box):
        if is_wrap:
            # Takes care of the case where the controls wrap at the min/max value
            # Example: Range = 0-1, value = 1.1 ----> clipping puts to .1
            if space_sample > space.high:
                val = space.low + (space_sample - space.high)
            elif space_sample < space.low:
                val = space.high - (space.low - space_sample)
        else:
            # Takes care of the case where the controls saturate at the min/max value
            # Example: Range = 0-1, value = 1.1 ----> clipping puts to 1
            assert isinstance(space_sample, (Sequence, np.ndarray)), \
                f'Box spaces must have sequence samples, received {type(space_sample).__name__}'
            val = np.clip(a=space_sample, a_min=space.low, a_max=space.high)
    return val

convert_config_param_to_space(action_space, parameter) staticmethod ¤

This is a a convert for parameters used in action space conversions

Parameters¤

gym.spaces.Space

the gym space that defines the actions

typing.Union[int, float, list, dict]

the parameter as defined in a a config

Returns¤

Dict

actions are dicts, the output is a dict with the parameters set for each key

Source code in corl/libraries/env_space_util.py
@staticmethod
def convert_config_param_to_space(action_space: gym.spaces.Space, parameter: typing.Union[int, float, list, dict]) -> dict:
    """
    This is a a convert for parameters used in action space conversions

    Parameters
    ----------
    space: gym.spaces.Space
        the gym space that defines the actions
    parameter: typing.Union[int, float, list, dict]
        the parameter as defined in a a config

    Returns
    -------
    dict:
        actions are dicts, the output is a dict with the parameters set for each key
    """
    action_params = {}
    sample_action = action_space.sample().items()
    if isinstance(parameter, (int, float)):
        parameter = [parameter]  # change to list if needed
        if len(parameter) == 1:  # if length 1, broadcast to the proper length
            for key, value in sample_action:
                action_params[key] = parameter * len(value)
    elif isinstance(parameter, list):
        if len(sample_action) != 1:
            raise ValueError(f"list configs can only be applied to action space dicts with single key: {sample_action}")
        for key, value in sample_action:
            if len(value) != len(parameter):
                raise ValueError(f"config length does not match action length of {len(value)}. {parameter}")
            action_params[key] = parameter
    elif isinstance(parameter, dict):  # pylint: disable=too-many-nested-blocks
        for key, value in sample_action:
            if key not in parameter:
                if isinstance(key, tuple):  # parameters may be specfied using tuple values as keys
                    action_params[key] = np.zeros(len(key))  # type: ignore[assignment]
                    for idx, sub_key in enumerate(key):
                        if sub_key not in parameter:
                            raise ValueError(f"action space key not in config key: {sub_key}, config: {parameter} ")
                        action_params[key][idx] = parameter[sub_key]
                else:
                    raise ValueError(f"action space key not in config key: {key}, config: {parameter} ")
            elif len(value) != len(parameter[key]):
                raise ValueError(f"config value length for key {key} does not match action length of {len(value)}. {parameter}")
            else:
                action_params[key] = parameter[key]

    return action_params

deep_merge_dict(source, destination) staticmethod ¤

Merget two dictionaries that also can contain sub dictionaries. This function returns the second dict but it also modifies it in place

run me with nosetests --with-doctest file.py

a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } } b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } } merge(b, a) == { 'first' : { 'all_rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } } True

Source code in corl/libraries/env_space_util.py
@staticmethod
def deep_merge_dict(source: dict, destination: dict):
    """
    Merget two dictionaries that also can contain sub dictionaries. This function returns the second dict
    but it also modifies it in place

    run me with nosetests --with-doctest file.py

    >>> a = { 'first' : { 'all_rows' : { 'pass' : 'dog', 'number' : '1' } } }
    >>> b = { 'first' : { 'all_rows' : { 'fail' : 'cat', 'number' : '5' } } }
    >>> merge(b, a) == { 'first' : { 'all_rows' : { 'pass' : 'dog', 'fail' : 'cat', 'number' : '5' } } }
    True
    """
    for key, value in source.items():
        if isinstance(value, dict):
            # get node or create one
            node = destination.setdefault(key, OrderedDict())
            EnvSpaceUtil.deep_merge_dict(value, node)
        else:
            destination[key] = value

    return destination

deep_sanity_check_space_sample(space, sample, key_stack='') staticmethod ¤

Ensure space sample is consistent with space. This will give a traceback of the exact space that failed

Parameters¤

gym.spaces.Dict

the space we expect the sample to conform to

sample_type

sample that we are checking if it belongs to the given space

str

string of the keys we have used when getting to this current spot in the observation_space, observation this is used for recursive calls do not set in the initial call to this function

Source code in corl/libraries/env_space_util.py
@staticmethod
def deep_sanity_check_space_sample(  # pylint: disable=R0912
    space: gym.spaces.Space,
    sample: sample_type,
    key_stack: str = "",
) -> None:
    """Ensure space sample is consistent with space. This will give a traceback of the exact space that failed

    Parameters
    ----------
    space: gym.spaces.Dict
        the space we expect the sample to conform to
    sample: sample_type
        sample that we are checking if it belongs to the given space
    key_stack: str
        string of the keys we have used when getting to this current spot in the observation_space, observation
        this is used for recursive calls do not set in the initial call to this function
    """
    if isinstance(space, gym.spaces.Dict):
        if not isinstance(sample, (StateDict, OrderedDict, dict)):
            raise ValueError(
                f"space{key_stack}={space} was a gym.spaces.Dict type but\n"
                f"sample{key_stack}={sample} \n"
                f"is a {type(sample)} type and not a StateDict, OrderedDict, or dict"
            )
        for key, value in sample.items():
            EnvSpaceUtil.deep_sanity_check_space_sample(space.spaces[key], value, key_stack=f"{key_stack}[{key}]")
    elif isinstance(space, gym.spaces.Tuple):
        if not isinstance(sample, tuple):
            raise ValueError(
                f"space{key_stack}={space} is a gym.spaces.Tuple type but\n"
                f"sample{key_stack}={sample} \n"
                f"is a {type(sample)} type and not a tuple type"
            )
        for idx, value in enumerate(sample):
            EnvSpaceUtil.deep_sanity_check_space_sample(space.spaces[idx], value, key_stack=f"{key_stack}[{idx}]")
    elif isinstance(space, gym.spaces.Discrete):
        if not isinstance(sample, (int, np.integer, np.ndarray)):
            raise ValueError(
                f"space{key_stack}={space} is a gym.spaces.Discrete type but\n"
                f"sample{key_stack}={sample} \n"
                f"is a {type(sample)} type and not an int or np.integer or np.ndarray"
            )
        if not space.contains(sample):
            raise ValueError(
                f"sample{key_stack} has value of {sample} however "
                f"space{key_stack} has space definition of {space} {space.n}"
            )
    elif isinstance(space, gym.spaces.Box):
        if not isinstance(sample, (np.ndarray, list, np.floating)):
            raise ValueError(
                f"space{key_stack}={space} is a gym.spaces.Box type but\n"
                f"sample{key_stack}={sample} \n"
                f"is a {type(sample)} type and not a np.ndarray, list, or np.float type"
            )
        if not space.contains(sample):
            sample_dtype = getattr(sample, 'dtype', None)
            raise ValueError(
                f"sample{key_stack} has value of {sample} however "
                f"space{key_stack} has space definition of {space} {space.low} {space.high} "
                f"space dtype is {space.dtype}, sample dtype is {sample_dtype}"
            )
    elif isinstance(space, gym.spaces.MultiBinary):
        if not isinstance(sample, (np.ndarray, list, np.integer)):
            raise ValueError(
                f"space{key_stack}={space} is a gym.spaces.MultiBinary type but\n"
                f"sample{key_stack}={sample} \n"
                f"is a {type(sample)} type and not a np.ndarray, list, or np.integer type"
            )
        if not space.contains(sample):
            raise ValueError(
                f"sample{key_stack} has value of {sample} however "
                f"space{key_stack} has space definition of {space} {space.n}"
            )
    elif isinstance(space, gym.spaces.MultiDiscrete):
        if not isinstance(sample, (np.ndarray, list, np.integer)):
            raise ValueError(
                f"space{key_stack}={space} is a gym.spaces.MultiDiscrete type but\n"
                f"sample{key_stack}={sample} \n"
                f"is a {type(sample)} type and not a np.ndarray, list, or np.integer type"
            )
        if not space.contains(sample):
            raise ValueError(
                f"sample{key_stack} has value of {sample} however "
                f"space{key_stack} has space definition of {space} {space.nvec}"
            )
    elif isinstance(space, Repeated):
        if not isinstance(sample, list):
            raise ValueError(
                f"space{key_stack}={space} is a ray.rllib.utils.spaces.repeated.Repeated type but\n"
                f"sample{key_stack}={sample}\n"
                f"is a {type(sample)} type and not a list  type"
            )
        for idx, item in enumerate(sample):
            EnvSpaceUtil.deep_sanity_check_space_sample(space.child_space, item, key_stack=f"{key_stack}[{idx}]")

get_mean_sample_from_space(space) staticmethod ¤

Given a gym space returns an instance of that space but instead of sampling from the gym space, returns all zeros. If the space is not a Box and we cannot iterate over it then we will sample from it.

Parameters¤

gym.spaces.Space

The gym space to zero sample from

Returns¤

sample_type The instance of the gym space but all Box spaces are sampled as zero

Source code in corl/libraries/env_space_util.py
@staticmethod
def get_mean_sample_from_space(space: gym.spaces.Space) -> sample_type:
    """
    Given a gym space returns an instance of that space but instead of sampling from the
    gym space, returns all zeros. If the space is not a Box and we cannot iterate over it
    then we will sample from it.

    Parameters
    ----------
    space: gym.spaces.Space
        The gym space to zero sample from

    Returns
    -------
    sample_type
        The instance of the gym space but all Box spaces are sampled as zero
    """
    val = space.sample()
    if isinstance(space, gym.spaces.Dict):
        new_dict = OrderedDict()
        for key, value in space.spaces.items():
            new_dict[key] = EnvSpaceUtil.get_zero_sample_from_space(value)
        val = new_dict
    elif isinstance(space, gym.spaces.Tuple):
        new_tuple = [EnvSpaceUtil.get_zero_sample_from_space(sp) for sp in space.spaces]
        val = tuple(new_tuple)
    elif isinstance(space, gym.spaces.Box):
        val = (space.high + space.low) / 2.0
    return val

get_zero_sample_from_space(space) staticmethod ¤

Given a gym space returns an instance of that space but instead of sampling from the gym space, returns all zeros. If the space is not a Box and we cannot iterate over it then we will sample from it.

Parameters¤

gym.spaces.Space

The gym space to zero sample from

Returns¤

sample_type The instance of the gym space but all Box spaces are sampled as zero

Source code in corl/libraries/env_space_util.py
@staticmethod
def get_zero_sample_from_space(space: gym.spaces.Space) -> sample_type:
    """
    Given a gym space returns an instance of that space but instead of sampling from the
    gym space, returns all zeros. If the space is not a Box and we cannot iterate over it
    then we will sample from it.

    Parameters
    ----------
    space: gym.spaces.Space
        The gym space to zero sample from

    Returns
    -------
    sample_type
        The instance of the gym space but all Box spaces are sampled as zero
    """
    val = space.sample()
    if isinstance(space, gym.spaces.Dict):
        new_dict = OrderedDict()
        for key, value in space.spaces.items():
            new_dict[key] = EnvSpaceUtil.get_zero_sample_from_space(value)
        val = new_dict
    elif isinstance(space, gym.spaces.Tuple):
        new_tuple = [EnvSpaceUtil.get_zero_sample_from_space(sp) for sp in space.spaces]
        val = tuple(new_tuple)
    elif isinstance(space, gym.spaces.Box):
        val = np.zeros(shape=space.shape, dtype=np.float32)
    return val

iterate_over_space_likes(func, space_likes, return_space, *func_args, **func_kwargs) staticmethod ¤

Iterates over space_likes which are tuple, dicts or the gym equivalent. When it encounters an actual item that is not a container it calls the func method. put any args, or kwargs you want to give to func in the overall call and they will be forwarded

Parameters¤

Func

the function to apply

typing.Tuple[typing.Union[gym.spaces.Space, sample_type], ...]

the spaces to iterate over. They must have the same keywords for dicts and number of items for tuples

bool

if true the containers will be gym space equivalents

func_args the arguments to give to func func_kwargs the keyword arguments to give to func

Returns¤

The contained result by calling func and stuffing back into the tuples and dicts in the call if return_space=True the containers are gym spaces

Source code in corl/libraries/env_space_util.py
@staticmethod
def iterate_over_space_likes(
    func,
    space_likes: typing.Tuple[typing.Union[gym.spaces.Space, sample_type], ...],
    return_space: bool,
    *func_args,
    **func_kwargs,
) -> typing.Union[gym.spaces.Space, sample_type]:
    """
    Iterates over space_likes which are tuple, dicts or the gym equivalent.
    When it encounters an actual item that is not a container it calls the func method.
    put any args, or kwargs you want to give to func in the overall call and they will be forwarded

    Parameters
    ----------
    func:
        the function to apply
    space_likes: typing.Tuple[typing.Union[gym.spaces.Space, sample_type], ...]
        the spaces to iterate over. They must have the same keywords for dicts and number of items for tuples
    return_space: bool
        if true the containers will be gym space equivalents
    func_args
        the arguments to give to func
    func_kwargs
        the keyword arguments to give to func

    Returns
    -------
    The contained result by calling func and stuffing back into the tuples and dicts in the call
    if return_space=True the containers are gym spaces
    """

    first_space_like = space_likes[0]
    val = None
    if isinstance(first_space_like, (gym.spaces.Dict, dict, OrderedDict)):
        new_dict = OrderedDict()
        keys: typing.KeysView
        if isinstance(first_space_like, gym.spaces.Dict):
            keys = first_space_like.spaces.keys()
        else:
            keys = first_space_like.keys()
        for key in keys:
            new_space_likes = tuple(spacer[key] for spacer in space_likes)
            new_dict[key] = EnvSpaceUtil.iterate_over_space_likes(  # type: ignore[misc]
                func,
                space_likes=new_space_likes,
                return_space=return_space,
                *func_args,
                **func_kwargs,
            )
        val = gym.spaces.Dict(spaces=new_dict) if return_space else new_dict
    elif isinstance(first_space_like, (gym.spaces.Tuple, tuple)):
        new_tuple = [
            EnvSpaceUtil.iterate_over_space_likes(  # type: ignore[misc]
                func,
                space_likes=new_space_likes,
                return_space=return_space,
                *func_args,
                **func_kwargs,
            ) for new_space_likes in zip(*space_likes)
        ]
        val = (gym.spaces.Tuple(tuple(new_tuple)) if return_space else tuple(new_tuple))
    elif isinstance(first_space_like, Repeated):
        # if space_likes is longer than 1 that means that return_space = False
        # if there is only the space that means we need to generate just the space
        # itself and can use the second path, however for the case where we have a sample
        # it comes in as a list and we must iterate over the entire repeated list and process it
        if len(space_likes) > 1:
            repeated_samples = space_likes[1]
            assert isinstance(repeated_samples, MutableSequence), \
                f'repeated_samples must be MutableSequence, received {type(repeated_samples).__name__}'
            for indx, sample in enumerate(repeated_samples):
                repeated_samples[indx] = EnvSpaceUtil.iterate_over_space_likes(  # type: ignore[misc]
                    func,
                    space_likes=(first_space_like.child_space, sample),
                    return_space=return_space,
                    *func_args,
                    **func_kwargs,
                )
            val = repeated_samples
        else:
            new_child_space = EnvSpaceUtil.iterate_over_space_likes(  # type: ignore[misc]
                func,
                space_likes=(first_space_like.child_space, ),
                return_space=return_space,
                *func_args,
                **func_kwargs,
            )

            val = Repeated(child_space=new_child_space, max_len=first_space_like.max_len)
    else:
        val = func(space_likes, *func_args, **func_kwargs)
    return val

normalize_space(space, out_min=-1, out_max=1) staticmethod ¤

This is a convenience wrapper for box_scaler

Parameters¤

gym.spaces.Space

the gym space to turn all boxes into the scaled space

float

the new low for the boxes

float

the new high for the boxes

Returns¤

gym.spaces.Space: the new gym spaces where all boxes have had their bounds changed

Source code in corl/libraries/env_space_util.py
@staticmethod
def normalize_space(
    space: gym.spaces.Space,
    out_min=-1,
    out_max=1,
) -> gym.spaces.Space:
    """
    This is a convenience wrapper for box_scaler

    Parameters
    ----------
    space: gym.spaces.Space
        the gym space to turn all boxes into the scaled space
    out_min: float
        the new low for the boxes
    out_max: float
        the new high for the boxes

    Returns
    -------
    gym.spaces.Space:
        the new gym spaces where all boxes have had their bounds changed
    """
    return EnvSpaceUtil.iterate_over_space_likes(
        func=EnvSpaceUtil.space_box_min_maxer,
        space_likes=(space, ),
        out_min=out_min,
        out_max=out_max,
        return_space=True,
    )

sanity_check_space_sample(space, sample) staticmethod ¤

[summary]

Parameters¤

space : [type] [description] sample : [type] [description]

Raises¤

RuntimeError [description]

Source code in corl/libraries/env_space_util.py
@staticmethod
def sanity_check_space_sample(space, sample):
    """[summary]

    Parameters
    ----------
    space : [type]
        [description]
    sample : [type]
        [description]

    Raises
    ------
    RuntimeError
        [description]
    """
    if not space.contains(sample):
        raise RuntimeError(f"sample of {sample} does not meet space {space} setup")

scale_sample_from_space(space, space_sample, out_min=-1, out_max=1) staticmethod ¤

This is a convenience wrapper for box_scaler

Parameters¤

gym.spaces.Space

the space to use for the input min and max

sample_type

the space sample to scale

float

the minimum of the output scaling

float

the maximum of the output scaling

Returns¤

Sample_type

the scaled sample with min of out_min and max of out_max (this is in dicts and tuples the same as space_sample was)

Source code in corl/libraries/env_space_util.py
@staticmethod
def scale_sample_from_space(
    space: gym.spaces.Space,
    space_sample: sample_type,
    out_min: float = -1,
    out_max: float = 1,
) -> sample_type:
    """
    This is a convenience wrapper for box_scaler

    Parameters
    ----------
    space: gym.spaces.Space
        the space to use for the input min and max
    space_sample: sample_type
        the space sample to scale
    out_min: float
        the minimum of the output scaling
    out_max: float
        the maximum of the output scaling

    Returns
    -------
    sample_type:
        the scaled sample with min of out_min and max of
        out_max (this is in dicts and tuples the same as space_sample was)
    """
    return EnvSpaceUtil.iterate_over_space_likes(
        func=EnvSpaceUtil.box_scaler,
        space_likes=(space, space_sample),
        out_min=out_min,
        out_max=out_max,
        return_space=False,
    )

scale_space(space, scale) staticmethod ¤

Multiplies the low and high properties of all the Boxes in the given gym space by the scale input Parameters


gym.spaces.Space

the gym space to scale the Boxes of

float

what to multiply the Box low and high by

Returns¤

gym.spaces.Space the scaled gym space

Source code in corl/libraries/env_space_util.py
@staticmethod
def scale_space(space: gym.spaces.Space, scale: float) -> gym.spaces.Space:
    """
    Multiplies the low and high properties of all the Boxes in the given gym space by the scale input
    Parameters
    ----------
    space: gym.spaces.Space
        the gym space to scale the Boxes of
    scale: float
        what to multiply the Box low and high by
    Returns
    -------
    gym.spaces.Space
        the scaled gym space
    """
    # TODO: this copy probably doesn't actually work but I can dream
    val = copy.deepcopy(space)
    if isinstance(space, gym.spaces.Dict):
        new_dict = OrderedDict()
        for key, value in space.spaces.items():
            new_dict[key] = EnvSpaceUtil.scale_space(value, scale=scale)
        val = gym.spaces.Dict(spaces=new_dict)
    elif isinstance(space, gym.spaces.Tuple):
        new_thing = [EnvSpaceUtil.scale_space(sp, scale=scale) for sp in space.spaces]
        val = gym.spaces.Tuple(tuple(new_thing))
    elif isinstance(space, gym.spaces.Box):
        scaled_box = gym.spaces.Box(
            low=np.multiply(space.low, scale).astype(np.float32),
            high=np.multiply(space.high, scale).astype(np.float32),
            shape=space.shape,
            dtype=np.float32
        )
        val = scaled_box

    return val

space_box_min_maxer(space_likes, out_min=-1.0, out_max=1.0) staticmethod ¤

Makes a gym box to the out_min and out_max range

Parameters¤

typing.Tuple[gym.spaces.Space]

the gym space to turn all boxes into the scaled space

float

the new low for the boxes

float

the new high for the boxes

Returns¤

gym.spaces.Space: the new gym spaces where all boxes have had their bounds changed

Source code in corl/libraries/env_space_util.py
@staticmethod
def space_box_min_maxer(
    space_likes: typing.Tuple[gym.spaces.Space],
    out_min: float = -1.0,
    out_max: float = 1.0,
) -> gym.spaces.Space:
    """
    Makes a gym box to the out_min and out_max range

    Parameters
    ----------
    space_likes: typing.Tuple[gym.spaces.Space]
        the gym space to turn all boxes into the scaled space
    out_min: float
        the new low for the boxes
    out_max: float
        the new high for the boxes

    Returns
    -------
    gym.spaces.Space:
        the new gym spaces where all boxes have had their bounds changed
    """
    space_arg = space_likes[0]
    if isinstance(space_arg, gym.spaces.Box):
        return gym.spaces.Box(low=out_min, high=out_max, shape=space_arg.shape, dtype=np.float32)
    return copy.deepcopy(space_arg)

turn_box_into_tuple_of_discretes(space, num_actions=10) staticmethod ¤

Takes a gym space and replaces any Box types with MultiDiscrete types with the same number of possible of discrete as the box and each discrete has the same number of possible actions = num_actions Parameters


space

int, list, dict

how many discrete actions to give to each possible discrete in the MultiDiscrete

Returns¤

gym.spaces.Space A gym space where all the Box types are replaced with MultiDiscrete types

Source code in corl/libraries/env_space_util.py
@staticmethod
def turn_box_into_tuple_of_discretes(
    space: gym.spaces.Space, num_actions: typing.Union[int, typing.List[int], Iterable, dict] = 10
) -> gym.spaces.Space:
    """
    Takes a gym space and replaces any Box types with MultiDiscrete types with the same number of
    possible of discrete as the box and each discrete has the same number of possible actions = num_actions
    Parameters
    ----------
    space
    num_actions: int, list, dict
        how many discrete actions to give to each possible discrete in the MultiDiscrete
    Returns
    -------
    gym.spaces.Space
        A gym space where all the Box types are replaced with MultiDiscrete types
    """

    # first pass through the code num_actions is an int or list, this code turns it into an
    # iterator for the rest of the recursive calls
    if isinstance(num_actions, int):
        num_actions = repeat(num_actions)
    elif isinstance(num_actions, (list, tuple)):
        num_actions = iter(num_actions)

    # TODO: this copy doesn't actually work but I can dream
    val = copy.deepcopy(space)

    if isinstance(space, gym.spaces.Dict):
        new_dict = OrderedDict()
        for key, space_value in space.spaces.items():
            if isinstance(num_actions, dict):
                new_dict[key] = EnvSpaceUtil.turn_box_into_tuple_of_discretes(space=space_value, num_actions=num_actions[key])
            else:
                new_dict[key] = EnvSpaceUtil.turn_box_into_tuple_of_discretes(space=space_value, num_actions=num_actions)
        val = gym.spaces.Dict(spaces=new_dict)
    elif isinstance(space, gym.spaces.Tuple):
        new_tuple = [EnvSpaceUtil.turn_box_into_tuple_of_discretes(sv, num_actions=num_actions) for sv in space]
        if len(new_tuple) == 1 and isinstance(new_tuple[0], gym.spaces.Tuple):
            val = new_tuple[0]
        else:
            val = gym.spaces.Tuple(tuple(new_tuple))
    elif isinstance(space, gym.spaces.Box):
        assert isinstance(num_actions, Iterator), f'num_actions must be iterator, received {type(num_actions).__name__}'
        val = gym.spaces.Tuple([Discrete(np.asarray(next(num_actions))) for _ in range(0, space.low.size)])

    return val

turn_discrete_action_back_to_cont(original_space, discrete_only_space, space_sample) staticmethod ¤

This is a convenience wrapper for turn_orig_space_box_to_cont_sample

Parameters¤

gym.spaces.Space

the continuous space

gym.spaces.Space

the discrete space

sample_type

the sample of the discrete space

Returns¤

Sample_type

the continuous version of the discrete sample

Source code in corl/libraries/env_space_util.py
@staticmethod
def turn_discrete_action_back_to_cont(
    original_space: gym.spaces.Space,
    discrete_only_space: gym.spaces.Space,
    space_sample: sample_type,
) -> sample_type:
    """
    This is a convenience wrapper for turn_orig_space_box_to_cont_sample

    Parameters
    ----------
    original_space: gym.spaces.Space
        the continuous space
    discrete_only_space: gym.spaces.Space
        the discrete space
    space_sample: sample_type
        the sample of the discrete space

    Returns
    -------
    sample_type:
        the continuous version of the discrete sample
    """
    return EnvSpaceUtil.iterate_over_space_likes(
        EnvSpaceUtil.turn_orig_space_box_to_cont_sample,
        space_likes=(original_space, discrete_only_space, space_sample),
        return_space=False,
    )

turn_discrete_action_back_to_cont_powerspace(original_space, discrete_only_space, space_sample, pow_n) staticmethod ¤

This is a convenience wrapper for turn_orig_space_box_to_cont_sample_powerspace

Parameters¤

gym.spaces.Space

the continuous space

gym.spaces.Space

the discrete space

sample_type

the sample of the discrete space

Returns¤

Sample_type

the continuous version of the discrete sample

Source code in corl/libraries/env_space_util.py
@staticmethod
def turn_discrete_action_back_to_cont_powerspace(
    original_space: gym.spaces.Space, discrete_only_space: gym.spaces.Space, space_sample: sample_type, pow_n: dict
) -> sample_type:
    """
    This is a convenience wrapper for turn_orig_space_box_to_cont_sample_powerspace

    Parameters
    ----------
    original_space: gym.spaces.Space
        the continuous space
    discrete_only_space: gym.spaces.Space
        the discrete space
    space_sample: sample_type
        the sample of the discrete space

    Returns
    -------
    sample_type:
        the continuous version of the discrete sample
    """
    return EnvSpaceUtil.iterate_over_space_likes(
        EnvSpaceUtil.turn_orig_space_box_to_cont_sample_powerspace,
        space_likes=(original_space, discrete_only_space, space_sample, pow_n),
        return_space=False,
    )

turn_orig_space_box_to_cont_sample(space_likes) staticmethod ¤

Given a continuous space and a discrete space and a sample of the discrete space, this function turns the discrete sample into a continuous sample

Parameters¤

typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type]

the first is the original gym space that is continuous, the second is the new gym space with discrete objects the last is the space sample that is of discrete types

Returns¤

Sample_type

the sample space that is now a continuous version of the discrete sample space according to the cont space

Source code in corl/libraries/env_space_util.py
@staticmethod
def turn_orig_space_box_to_cont_sample(space_likes: typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type]) -> sample_type:
    """
    Given a continuous space and a discrete space and a sample of the discrete space,
    this function turns the discrete sample into a continuous sample

    Parameters
    ----------
    space_likes: typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type]
        the first is the original gym space that is continuous,
        the second is the new gym space with discrete objects
        the last is the space sample that is of discrete types

    Returns
    -------
    sample_type:
        the sample space that is now a continuous version of the discrete sample space according to the cont space
    """
    (original_space_arg, discrete_only_space_arg, space_sample_arg) = space_likes
    if isinstance(original_space_arg,
                  gym.spaces.Box) and (isinstance(discrete_only_space_arg, (gym.spaces.MultiDiscrete, gym.spaces.Discrete))):
        if isinstance(discrete_only_space_arg, gym.spaces.MultiDiscrete):
            possible_n = discrete_only_space_arg.nvec
        elif isinstance(discrete_only_space_arg, gym.spaces.Discrete):
            possible_n = discrete_only_space_arg.n
        else:
            raise RuntimeError("This should not be reachable")

        action = space_sample_arg
        low = original_space_arg.low
        high = original_space_arg.high
        new_cont_sample = (action / (possible_n - 1)) * (high - low) + low

        return new_cont_sample

    return copy.deepcopy(space_sample_arg)

turn_orig_space_box_to_cont_sample_powerspace(space_likes) staticmethod ¤

Given a continuous space and a discrete space and a sample of the discrete space, this function turns the discrete sample into a continuous sample

Parameters¤

typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type]

the first is the original gym space that is continuous, the second is the new gym space with discrete objects the last is the space sample that is of discrete types

Returns¤

Sample_type

the sample space that is now a continuous version of the discrete sample space according to the cont space

Source code in corl/libraries/env_space_util.py
@staticmethod
def turn_orig_space_box_to_cont_sample_powerspace(
    space_likes: typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type, sample_type]
) -> sample_type:
    """
    Given a continuous space and a discrete space and a sample of the discrete space,
    this function turns the discrete sample into a continuous sample

    Parameters
    ----------
    space_likes: typing.Tuple[gym.spaces.Space, gym.spaces.Space, sample_type]
        the first is the original gym space that is continuous,
        the second is the new gym space with discrete objects
        the last is the space sample that is of discrete types

    Returns
    -------
    sample_type:
        the sample space that is now a continuous version of the discrete sample space according to the cont space
    """
    # There is a test built for this function. Please keep the test up to date for any changes you make.
    (original_space_arg, discrete_only_space_arg, space_sample_arg, pow_n) = space_likes
    if isinstance(original_space_arg,
                  gym.spaces.Box) and (isinstance(discrete_only_space_arg, (gym.spaces.Tuple, gym.spaces.Discrete))):
        if isinstance(discrete_only_space_arg, gym.spaces.Tuple):
            possible_n = np.asarray([x.n for x in discrete_only_space_arg])
            pow_n = np.asarray(pow_n)
            discrete_sample = np.asarray(space_sample_arg)
        elif isinstance(discrete_only_space_arg, gym.spaces.Discrete):
            possible_n = np.asarray([discrete_only_space_arg.n])
            pow_n = np.asarray([pow_n])
            discrete_sample = np.asarray([space_sample_arg])
        else:
            raise RuntimeError("This should not be reachable")

        low = original_space_arg.low
        high = original_space_arg.high

        if any(low > high):
            raise RuntimeError("lower bounds of space somehow higher than high bounds")

        if not all(possible_n % 2):
            if any(pow_n > 1):
                raise RuntimeError("The exponential power factor not supported when possible_n % 2")

            difference = high - low
            movement_per_space_sample = difference / possible_n

            new_cont_sample = low + (movement_per_space_sample * space_sample_arg)

            return new_cont_sample

        if any(pow_n <= 0):
            raise RuntimeError("The exponential power factor used to stretch/shrink the discrete space must be greater than zero")

        if any(-low != high) or any(low >= high):
            raise RuntimeError("Currently only support symmetric space about zero for power space setup - TODO")
        p_high = np.power(high, 1 / pow_n.astype(float))
        p_low = -p_high  # Since we have specified the space must be symetric around 0
        new_cont_sample = (discrete_sample / (possible_n - 1)) * (p_high - p_low) + p_low
        new_cont_sample = np.power(np.abs(new_cont_sample), pow_n) * np.sign(new_cont_sample)

        return new_cont_sample

    return copy.deepcopy(space_sample_arg)

unscale_sample_from_space(space, space_sample, out_min=-1, out_max=1) staticmethod ¤

This is a convenience wrapper for box_unscaler

Parameters¤

gym.spaces.Space

the gym space we will unscale to

sample_type

the sample we want to unscale. thus this is a scaled version of the input space with a min,max defined by the arguments out_min,out_max

float

the minimum of the sample

float

the maximum of the sample

Returns¤

Sample_type

the unscaled version of the space_sample. Thus the space should now contain this output sample

Source code in corl/libraries/env_space_util.py
@staticmethod
def unscale_sample_from_space(
    space: gym.spaces.Space,
    space_sample: sample_type,
    out_min: float = -1,
    out_max: float = 1,
) -> sample_type:
    """
    This is a convenience wrapper for box_unscaler

    Parameters
    ----------
    space: gym.spaces.Space
        the gym space we will unscale to
    space_sample: sample_type
        the sample we want to unscale. thus this is a scaled version of the input space
        with a min,max defined by the arguments out_min,out_max
    out_min: float
        the minimum of the sample
    out_max: float
        the maximum of the sample

    Returns
    -------
    sample_type:
        the unscaled version of the space_sample. Thus the space should now contain this output sample
    """

    return EnvSpaceUtil.iterate_over_space_likes(
        func=EnvSpaceUtil.box_unscaler,
        space_likes=(space, space_sample),
        out_min=out_min,
        out_max=out_max,
        return_space=False,
    )

zero_mean_space(space) staticmethod ¤

Returns a space object where every Box instance has its low and high shifted to be zero mean Parameters


gym.spaces.Space

The gym space to zero mean

Returns¤

gym.spaces.Space A gym space the same as the input but with the Box instances shifted

Source code in corl/libraries/env_space_util.py
@staticmethod
def zero_mean_space(space: gym.spaces.Space) -> gym.spaces.Space:
    """
    Returns a space object where every Box instance has its low and high shifted to be zero mean
    Parameters
    ----------
    space: gym.spaces.Space
        The gym space to zero mean

    Returns
    -------
    gym.spaces.Space
        A gym space the same as the input but with the Box instances shifted
    """
    # TODO: this copy doesn't actually work but I can dream
    val = copy.deepcopy(space)
    if isinstance(space, gym.spaces.Dict):
        new_dict = OrderedDict()
        for key, value in space.spaces.items():
            new_dict[key] = EnvSpaceUtil.zero_mean_space(value)
        val = gym.spaces.Dict(spaces=new_dict)
    elif isinstance(space, gym.spaces.Tuple):
        new_thing = [EnvSpaceUtil.zero_mean_space(sp) for sp in space.spaces]
        val = gym.spaces.Tuple(tuple(new_thing))
    elif isinstance(space, gym.spaces.Box):
        mean = (space.high + space.low) / 2
        zero_mean_box = gym.spaces.Box(low=space.low - mean, high=space.high - mean, shape=space.shape, dtype=np.float32)
        val = zero_mean_box

    return val