Frame stacking
Air Force Research Laboratory (AFRL) Autonomous Capabilities Team (ACT3) Reinforcement Learning (RL) Core.
This is a US Government Work not subject to copyright protection in the US.
The use, dissemination or disclosure of data in this file is subject to limitation or restriction. See accompanying README and LICENSE for details.
The following module contains the implementation for the stacked observation
FrameStackingModel (TFModelV2)
¤
The following class is a slight modification of the base Fully Connected Model within RLLIB. The model adds on the ability to do frame stacking. This is done within the model to (1) enable the HPARAM search over the setting for the number of frames - Not possible with environment wrappers (2) enable a more in line view of the framestacking inline with paper representation.
Note: Unlike the environment wrapper which just concatenates the data into one large vector and sends through
network. This implementation will process N identical obs (vectors) through all hidden layers until the last
FC layer which inputs are flattened to ensure that we are only producing the expected number of output actions
The architecture produced is located below:
- Note: Parallel paths. These are only generated if the Value function is defined as not sharing network.
In the case that the network is shared only a single path would exist.
- Note: All of the default capability of the default RLLIB model is maintained and may not be shown in
diagram. Further testing needed for other paths.
FC Layers 1-N
|-----|
|---------->|Dense|-|
| |-----| |-| |-------| |-----|
| |-----| |------->|Flatten|---->| FC |---|
| |-----| |-------| |-----| |
N Obs |--------| | Model
-------------> | | |
(opt) N Reward | | |------->[FC Out ] ------> Actions
-------------> | Inputs |
(opt) N Action | | |------->[Value Out] ------> Values
-------------> |--------| |
| |-----| |
|---------->|Dense|-| |
|-----| |-| |-------| |-----| |
|-----| |------->|Flatten|---->| FC |---|
|-----| |-------| |-----|
|----------------------------------------- |
| Note: Ensure final output is in |
| 1 X ACTION not 1 X Frame X ACTION |
|------------------------------------------|
Inputs = 1 X Frames X (Obs + Rewards + Actions)
Outputs = 1 X Actions
The following is a example summary of the model from tensor flow on the base Single Environments
Model: "functional_1"
__________________________________________________________________________________________________
Layer (type) Output Shape Param # Connected to
==================================================================================================
observations (InputLayer) [(None, 5, 77)] 0
__________________________________________________________________________________________________
fc_0 (Dense) (None, 5, 256) 19968 observations[0][0]
__________________________________________________________________________________________________
fc_value_0 (Dense) (None, 5, 256) 19968 observations[0][0]
__________________________________________________________________________________________________
fc_flatten_1 (Flatten) (None, 1280) 0 fc_0[0][0]
__________________________________________________________________________________________________
fc_value_flatten_1 (Flatten) (None, 1280) 0 fc_value_0[0][0]
__________________________________________________________________________________________________
fc_1 (Dense) (None, 256) 327936 fc_flatten_1[0][0]
__________________________________________________________________________________________________
fc_value_1 (Dense) (None, 256) 327936 fc_value_flatten_1[0][0]
__________________________________________________________________________________________________
fc_out (Dense) (None, 51) 13107 fc_1[0][0]
__________________________________________________________________________________________________
value_out (Dense) (None, 1) 257 fc_value_1[0][0]
==================================================================================================
Total params: 709,172
Trainable params: 709,172
Non-trainable params: 0
__________________________________________________________________________________________________
Source code in corl/models/frame_stacking.py
class FrameStackingModel(TFModelV2): # pylint: disable=abstract-method
""" The following class is a slight modification of the base Fully Connected Model within RLLIB. The model adds on
the ability to do frame stacking. This is done within the model to (1) enable the HPARAM search over the setting
for the number of frames - Not possible with environment wrappers (2) enable a more in line view of the
framestacking inline with paper representation.
Note: Unlike the environment wrapper which just concatenates the data into one large vector and sends through
network. This implementation will process N identical obs (vectors) through all hidden layers until the last
FC layer which inputs are flattened to ensure that we are only producing the expected number of output actions
The architecture produced is located below:
- Note: Parallel paths. These are only generated if the Value function is defined as not sharing network.
In the case that the network is shared only a single path would exist.
- Note: All of the default capability of the default RLLIB model is maintained and may not be shown in
diagram. Further testing needed for other paths.
FC Layers 1-N
|-----|
|---------->|Dense|-|
| |-----| |-| |-------| |-----|
| |-----| |------->|Flatten|---->| FC |---|
| |-----| |-------| |-----| |
N Obs |--------| | Model
-------------> | | |
(opt) N Reward | | |------->[FC Out ] ------> Actions
-------------> | Inputs |
(opt) N Action | | |------->[Value Out] ------> Values
-------------> |--------| |
| |-----| |
|---------->|Dense|-| |
|-----| |-| |-------| |-----| |
|-----| |------->|Flatten|---->| FC |---|
|-----| |-------| |-----|
|----------------------------------------- |
| Note: Ensure final output is in |
| 1 X ACTION not 1 X Frame X ACTION |
|------------------------------------------|
Inputs = 1 X Frames X (Obs + Rewards + Actions)
Outputs = 1 X Actions
The following is a example summary of the model from tensor flow on the base Single Environments
Model: "functional_1"
__________________________________________________________________________________________________
Layer (type) Output Shape Param # Connected to
==================================================================================================
observations (InputLayer) [(None, 5, 77)] 0
__________________________________________________________________________________________________
fc_0 (Dense) (None, 5, 256) 19968 observations[0][0]
__________________________________________________________________________________________________
fc_value_0 (Dense) (None, 5, 256) 19968 observations[0][0]
__________________________________________________________________________________________________
fc_flatten_1 (Flatten) (None, 1280) 0 fc_0[0][0]
__________________________________________________________________________________________________
fc_value_flatten_1 (Flatten) (None, 1280) 0 fc_value_0[0][0]
__________________________________________________________________________________________________
fc_1 (Dense) (None, 256) 327936 fc_flatten_1[0][0]
__________________________________________________________________________________________________
fc_value_1 (Dense) (None, 256) 327936 fc_value_flatten_1[0][0]
__________________________________________________________________________________________________
fc_out (Dense) (None, 51) 13107 fc_1[0][0]
__________________________________________________________________________________________________
value_out (Dense) (None, 1) 257 fc_value_1[0][0]
==================================================================================================
Total params: 709,172
Trainable params: 709,172
Non-trainable params: 0
__________________________________________________________________________________________________
Arguments:
TFModelV2 {[type]} -- [description]
"""
PREV_N_OBS = "prev_n_obs"
PREV_N_REWARDS = "prev_n_rewards"
PREV_N_ACTIONS = "prev_n_actions"
def __init__(
self,
obs_space,
action_space,
num_outputs,
model_config,
name,
post_fcnet_hiddens=None,
num_frames: int = 4,
include_actions: bool = True,
include_rewards: bool = True,
):
"""Class constructor
Arguments:
obs_space (gym.spaces.Space): Observation space of the target gym
env. This may have an `original_space` attribute that
specifies how to unflatten the tensor into a ragged tensor.
action_space (gym.spaces.Space): Action space of the target gym
env.
num_outputs (int): Number of output units of the model.
model_config (ModelConfigDict): Config for the model, documented
in ModelCatalog.
name (str): Name (scope) for the model.
This method should create any variables used by the model.
Keyword Arguments:
num_frames {int} -- The number of frames to stack (default: {4})
include_actions {int} -- Whether or not to include actions as part of frame stacking (default: True)
include_actions {int} -- Whether or not to include actions as part of frame stacking (default: True)
Returns:
[type] -- [description]
"""
if post_fcnet_hiddens is None:
post_fcnet_hiddens = []
# Initializes a ModelV2 object.
TFModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name)
# This model specific items
self.num_frames = num_frames
# Base model items
self.num_outputs = num_outputs
# Read out the model configuration parameters passed by RLLIB. Note this is maintained to ensure
# compatibility with existing setup
free_log_std, hiddens, activation, no_final_linear, vf_share_layers = self.get_config_opts()
# Generate free-floating bias variables for the second half of
# the outputs.
if free_log_std:
assert num_outputs % 2 == 0, ("num_outputs must be divisible by two", num_outputs)
num_outputs = num_outputs // 2
self.log_std_var = tf.Variable([0.0] * num_outputs, dtype=tf.float32, name="log_std")
# Create the input layers for the observations, actions, and rewards
flattened_action_space = flatten_space(action_space)
observations, actions, rewards = self.create_input_layers(obs_space, flattened_action_space)
# Select the input layer configuration based on input arguments
self.include_rewards = include_rewards
self.include_actions = include_actions
self.input_list, self.inputs = FrameStackingModel.select_input_layer_configuration(include_rewards,
include_actions,
observations,
actions,
rewards)
# Create layers 0 to second-last.
last_layer = self.create_dense_hidden_layers(hiddens, self.inputs, activation, "fc")
# The action distribution outputs.
logits_out, last_layer = self.create_last_fc_layer_output(no_final_linear,
num_outputs,
activation,
last_layer,
hiddens,
post_fcnet_hiddens,
obs_space)
# Concat the log std vars to the end of the state-dependent means.
if free_log_std and logits_out is not None:
def tiled_log_std(x):
return tf.tile(tf.expand_dims(self.log_std_var, 0), [tf.shape(x)[0], 1])
log_std_out = tf.keras.layers.Lambda(tiled_log_std)(self.inputs)
logits_out = tf.keras.layers.Concatenate(axis=1)([logits_out, log_std_out])
last_vf_layer = self.build_vf_network(vf_share_layers, self.inputs, hiddens, post_fcnet_hiddens, activation)
value_out = tf.keras.layers.Dense(1, name="value_out", activation=None, kernel_initializer=normc_initializer(0.01)
)(last_vf_layer if last_vf_layer is not None else last_layer)
self.base_model = tf.keras.Model(self.input_list, [(logits_out if logits_out is not None else last_layer), value_out])
# print(self.base_model.summary())
self.register_view_requirements(num_frames, obs_space, flattened_action_space)
self._value_out = None
@staticmethod
def select_input_layer_configuration(include_rewards, include_actions, observations, actions, rewards):
"""Sets up the input layer based on the configuration of the arguments to the model
Arguments:
include_rewards {bool} -- Flag to indicate that rewards should be part of frame stacking
include_actions {bool} -- Flag to indicate that the actions should be part of frame stacking
observations {Tensor} -- [description]
actions {Tensor} -- [description]
rewards {Tensor} -- [description]
"""
# Last hidden layer output (before logits outputs).
if include_actions and not include_rewards:
input_list = [observations, actions]
inputs = tf.keras.layers.Concatenate(axis=-1)(input_list)
elif not include_actions and include_rewards:
input_list = [observations, rewards]
inputs = tf.keras.layers.Concatenate(axis=-1)(input_list)
elif include_actions and include_rewards:
input_list = [observations, actions, rewards]
inputs = tf.keras.layers.Concatenate(axis=-1)(input_list)
else:
input_list = [observations]
inputs = observations
return input_list, inputs
def create_input_layers(self, obs_space, action_space):
"""Creats the input layers for starting the graph
Arguments:
obs_space {gym.Space} -- The input space - flattended
action_space {gym.Space} -- The input space - flattended
Returns:
tuple[tensor] -- The input layers for observations, rewards, actions
"""
# (?, Number of Frames, 1)
rewards = tf.keras.layers.Input(shape=(self.num_frames, 1), name="rewards")
# (?, Number of Frames, len obs flatten)
observations = tf.keras.layers.Input(shape=(self.num_frames, obs_space.shape[0]), name="observations")
# (?, Number of Frames, len actions flatten)
actions = tf.keras.layers.Input(shape=(self.num_frames, len(action_space)), name="actions")
return observations, actions, rewards
def create_last_fc_layer_output(self, no_final_linear, num_outputs, activation, last_layer, hiddens, post_fcnet_hiddens, obs_space):
"""[summary]
Arguments:
no_final_linear {[type]} -- [description]
num_outputs {[type]} -- [description]
activation {[type]} -- [description]
last_layer {[type]} -- [description]
hiddens {[type]} -- [description]
obs_space {[type]} -- [description]
Returns:
[type] -- [description]
"""
# The action distribution outputs.
logits_out = None
# The last layer is adjusted to be of size num_outputs, but it's a
# layer with activation.
if no_final_linear and num_outputs:
logits_out = tf.keras.layers.Dense(
num_outputs, name="fc_out", activation=activation, kernel_initializer=normc_initializer(1.0)
)(last_layer)
# Finish the layers with the provided sizes (`hiddens`), plus -
# iff num_outputs > 0 - a last linear layer of size num_outputs.
else:
if len(hiddens) > 0:
last_layer = FrameStackingModel.flatten_plus_dense(
hiddens, post_fcnet_hiddens, last_layer, activation, "fc", len(hiddens) - 1
)
if num_outputs:
logits_out = tf.keras.layers.Dense(num_outputs, name="fc_out", activation=None,
kernel_initializer=normc_initializer(0.01))(last_layer)
# Adjust num_outputs to be the number of nodes in the last layer.
else:
self.num_outputs = ([int(np.product(obs_space.shape))] + hiddens[-1:])[-1]
return logits_out, last_layer
def build_vf_network(self, vf_share_layers, inputs, hiddens, flatten_plus_dense, activation):
"""Creates the value function network if configured in model config
Arguments:
vf_share_layers {[type]} -- [description]
inputs {[type]} -- [description]
hiddens {[type]} -- [description]
activation {[type]} -- [description]
Returns:
[type] -- [description]
"""
last_vf_layer = None
if not vf_share_layers:
# Build a parallel set of hidden layers for the value net.
value_function_prefix = "fc_value"
last_vf_layer = self.create_dense_hidden_layers(hiddens, inputs, activation, value_function_prefix)
last_vf_layer = self.flatten_plus_dense(
hiddens, flatten_plus_dense, last_vf_layer, activation, value_function_prefix, len(hiddens) - 1
)
return last_vf_layer
def register_view_requirements(self, num_frames: int, obs_space, flattened_action_space):
"""Sets up the view requirements for the forward pass call
Arguments:
num_frames {int} -- The number of frames to stack
obs_space {[type]} -- The observation space definition
flattened_action_space {[type]} -- flattened action space
"""
self.view_requirements[FrameStackingModel.PREV_N_OBS
] = ViewRequirement(data_col="obs", shift="-{}:0".format(num_frames - 1), space=obs_space)
if self.include_rewards:
self.view_requirements[FrameStackingModel.PREV_N_REWARDS
] = ViewRequirement(data_col="rewards", shift="-{}:-1".format(self.num_frames))
if self.include_actions:
self.view_requirements[FrameStackingModel.PREV_N_ACTIONS] = ViewRequirement(
data_col="actions",
shift="-{}:-1".format(self.num_frames),
space=gym.spaces.box.Box(low=-np.inf, high=np.inf, shape=(len(flattened_action_space), ), dtype=np.int64)
)
def forward(self, input_dict: Dict[str, TensorType], state: List[TensorType],
seq_lens: TensorType) -> (TensorType, List[TensorType]): # type: ignore
""" Call the model with the given input tensors and state.
Any complex observations (dicts, tuples, etc.) will be unpacked by __call__ before being passed to forward(). To access
the flattened observation tensor, refer to input_dict[“obs”].
This method can be called any number of times. In eager execution, each call to forward() will eagerly evaluate the model.
In symbolic execution, each call to forward creates a computation graph that operates over the variables of this model
(i.e., shares weights).
Custom models should override this instead of __call__.
Arguments:
input_dict (dict) – dictionary of input tensors, including “obs”, “obs_flat”, “prev_action”, “prev_reward”, “is_training”,
“eps_id”, “agent_id”, “infos”, and “t”.
state (list) – list of state tensors with sizes matching those returned by get_initial_state + the batch dimension
seq_lens (Tensor) – 1d tensor holding input sequence lengths
Returns:
The model output tensor of size [BATCH, num_outputs], and the new RNN state.
"""
if self.include_actions and not self.include_rewards:
model_out, self._value_out = self.base_model([input_dict[FrameStackingModel.PREV_N_OBS],
input_dict[FrameStackingModel.PREV_N_ACTIONS]])
elif not self.include_actions and self.include_rewards:
model_out, self._value_out = self.base_model([input_dict[FrameStackingModel.PREV_N_OBS],
input_dict[FrameStackingModel.PREV_N_REWARDS]])
elif self.include_actions and self.include_rewards:
model_out, self._value_out = self.base_model([input_dict[FrameStackingModel.PREV_N_OBS],
input_dict[FrameStackingModel.PREV_N_ACTIONS],
input_dict[FrameStackingModel.PREV_N_REWARDS]])
else:
model_out, self._value_out = self.base_model([input_dict[FrameStackingModel.PREV_N_OBS]])
return model_out, state
def value_function(self) -> TensorType:
"""Returns the value function output for the most recent forward pass.
Note that a forward call has to be performed first, before this methods can return anything and thus
that calling this method does not cause an extra forward pass through the network.
Returns:
value estimate tensor of shape [BATCH].
"""
return tf.reshape(self._value_out, [-1])
def get_config_opts(self):
"""Gets the configuration options utilizes by the frame stacking model
Returns:
Tuple -- configuration options for the models (Bool, list[int], function, Bool, Bool)
"""
hiddens = self.model_config.get("fcnet_hiddens", []) + self.model_config.get("post_fcnet_hiddens", [])
activation = self.model_config.get("fcnet_activation")
if not self.model_config.get("fcnet_hiddens", []):
activation = self.model_config.get("post_fcnet_activation")
activation = get_activation_fn(activation)
no_final_linear = self.model_config.get("no_final_linear")
vf_share_layers = self.model_config.get("vf_share_layers")
free_log_std = self.model_config.get("free_log_std")
return free_log_std, hiddens, activation, no_final_linear, vf_share_layers
@staticmethod
def create_dense_hidden_layers(hiddens, layer, activation, prefix: str):
"""Creates the hidden dense layers
Arguments:
hiddens {List[int]} -- The list of hidden layers for the FC componets
layer {Tensor} -- [description] --- TODO Remove as not needed to pass in...
activation {Function} -- [description]
prefix {str} -- The string to use for the naming of the layer
Returns:
[type] -- [description]
"""
for index, size in enumerate(hiddens[:-1]):
dense_name = f"{prefix}_{index}"
layer = tf.keras.layers.Dense(size, name=dense_name, activation=activation, kernel_initializer=normc_initializer(1.0))(layer)
return layer
@staticmethod
def flatten_plus_dense(hiddens, post_fcnet_hiddens, layer, activation, prefix: str, index: int):
"""Creates the final/last dense layer with flatten to ensure the correct output size
Arguments:
hiddens {List[int]} -- List containing the size of each hidden layer
layer {Tensor} -- [description]
activation {function]} -- [description]
prefix {str} -- The string to use when creating the layers
index {int} -- The index of the layer to add the flatten on.
Returns:
Tensor -- The layer just created with flatten + FD
"""
flatten_name = f"{prefix}_flatten_{index}"
dense_name = f"{prefix}_{index}"
layer = tf.keras.layers.Flatten(name=flatten_name)(layer)
layer = tf.keras.layers.Dense(hiddens[index], name=dense_name, activation=activation,
kernel_initializer=normc_initializer(1.0))(layer)
for index_cat, size in enumerate(post_fcnet_hiddens):
layer = tf.keras.layers.Dense(
size, name=f"{dense_name}_cat_{index_cat}", activation=activation, kernel_initializer=normc_initializer(1.0)
)(layer)
return layer
__init__(self, obs_space, action_space, num_outputs, model_config, name, post_fcnet_hiddens=None, num_frames=4, include_actions=True, include_rewards=True)
special
¤
Class constructor
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obs_space |
gym.spaces.Space |
Observation space of the target gym
env. This may have an |
required |
action_space |
gym.spaces.Space |
Action space of the target gym env. |
required |
num_outputs |
int |
Number of output units of the model. |
required |
model_config |
ModelConfigDict |
Config for the model, documented in ModelCatalog. |
required |
name |
str |
Name (scope) for the model. |
required |
This method should create any variables used by the model.
Keyword arguments:
Name | Type | Description |
---|---|---|
num_frames |
{int} -- The number of frames to stack (default |
{4}) |
include_actions |
{int} -- Whether or not to include actions as part of frame stacking (default |
True) |
include_actions |
{int} -- Whether or not to include actions as part of frame stacking (default |
True) |
Returns:
Type | Description |
---|---|
[type] -- [description] |
Source code in corl/models/frame_stacking.py
def __init__(
self,
obs_space,
action_space,
num_outputs,
model_config,
name,
post_fcnet_hiddens=None,
num_frames: int = 4,
include_actions: bool = True,
include_rewards: bool = True,
):
"""Class constructor
Arguments:
obs_space (gym.spaces.Space): Observation space of the target gym
env. This may have an `original_space` attribute that
specifies how to unflatten the tensor into a ragged tensor.
action_space (gym.spaces.Space): Action space of the target gym
env.
num_outputs (int): Number of output units of the model.
model_config (ModelConfigDict): Config for the model, documented
in ModelCatalog.
name (str): Name (scope) for the model.
This method should create any variables used by the model.
Keyword Arguments:
num_frames {int} -- The number of frames to stack (default: {4})
include_actions {int} -- Whether or not to include actions as part of frame stacking (default: True)
include_actions {int} -- Whether or not to include actions as part of frame stacking (default: True)
Returns:
[type] -- [description]
"""
if post_fcnet_hiddens is None:
post_fcnet_hiddens = []
# Initializes a ModelV2 object.
TFModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name)
# This model specific items
self.num_frames = num_frames
# Base model items
self.num_outputs = num_outputs
# Read out the model configuration parameters passed by RLLIB. Note this is maintained to ensure
# compatibility with existing setup
free_log_std, hiddens, activation, no_final_linear, vf_share_layers = self.get_config_opts()
# Generate free-floating bias variables for the second half of
# the outputs.
if free_log_std:
assert num_outputs % 2 == 0, ("num_outputs must be divisible by two", num_outputs)
num_outputs = num_outputs // 2
self.log_std_var = tf.Variable([0.0] * num_outputs, dtype=tf.float32, name="log_std")
# Create the input layers for the observations, actions, and rewards
flattened_action_space = flatten_space(action_space)
observations, actions, rewards = self.create_input_layers(obs_space, flattened_action_space)
# Select the input layer configuration based on input arguments
self.include_rewards = include_rewards
self.include_actions = include_actions
self.input_list, self.inputs = FrameStackingModel.select_input_layer_configuration(include_rewards,
include_actions,
observations,
actions,
rewards)
# Create layers 0 to second-last.
last_layer = self.create_dense_hidden_layers(hiddens, self.inputs, activation, "fc")
# The action distribution outputs.
logits_out, last_layer = self.create_last_fc_layer_output(no_final_linear,
num_outputs,
activation,
last_layer,
hiddens,
post_fcnet_hiddens,
obs_space)
# Concat the log std vars to the end of the state-dependent means.
if free_log_std and logits_out is not None:
def tiled_log_std(x):
return tf.tile(tf.expand_dims(self.log_std_var, 0), [tf.shape(x)[0], 1])
log_std_out = tf.keras.layers.Lambda(tiled_log_std)(self.inputs)
logits_out = tf.keras.layers.Concatenate(axis=1)([logits_out, log_std_out])
last_vf_layer = self.build_vf_network(vf_share_layers, self.inputs, hiddens, post_fcnet_hiddens, activation)
value_out = tf.keras.layers.Dense(1, name="value_out", activation=None, kernel_initializer=normc_initializer(0.01)
)(last_vf_layer if last_vf_layer is not None else last_layer)
self.base_model = tf.keras.Model(self.input_list, [(logits_out if logits_out is not None else last_layer), value_out])
# print(self.base_model.summary())
self.register_view_requirements(num_frames, obs_space, flattened_action_space)
self._value_out = None
build_vf_network(self, vf_share_layers, inputs, hiddens, flatten_plus_dense, activation)
¤
Creates the value function network if configured in model config
Returns:
Type | Description |
---|---|
[type] -- [description] |
Source code in corl/models/frame_stacking.py
def build_vf_network(self, vf_share_layers, inputs, hiddens, flatten_plus_dense, activation):
"""Creates the value function network if configured in model config
Arguments:
vf_share_layers {[type]} -- [description]
inputs {[type]} -- [description]
hiddens {[type]} -- [description]
activation {[type]} -- [description]
Returns:
[type] -- [description]
"""
last_vf_layer = None
if not vf_share_layers:
# Build a parallel set of hidden layers for the value net.
value_function_prefix = "fc_value"
last_vf_layer = self.create_dense_hidden_layers(hiddens, inputs, activation, value_function_prefix)
last_vf_layer = self.flatten_plus_dense(
hiddens, flatten_plus_dense, last_vf_layer, activation, value_function_prefix, len(hiddens) - 1
)
return last_vf_layer
create_dense_hidden_layers(hiddens, layer, activation, prefix)
staticmethod
¤
Creates the hidden dense layers
Returns:
Type | Description |
---|---|
[type] -- [description] |
Source code in corl/models/frame_stacking.py
@staticmethod
def create_dense_hidden_layers(hiddens, layer, activation, prefix: str):
"""Creates the hidden dense layers
Arguments:
hiddens {List[int]} -- The list of hidden layers for the FC componets
layer {Tensor} -- [description] --- TODO Remove as not needed to pass in...
activation {Function} -- [description]
prefix {str} -- The string to use for the naming of the layer
Returns:
[type] -- [description]
"""
for index, size in enumerate(hiddens[:-1]):
dense_name = f"{prefix}_{index}"
layer = tf.keras.layers.Dense(size, name=dense_name, activation=activation, kernel_initializer=normc_initializer(1.0))(layer)
return layer
create_input_layers(self, obs_space, action_space)
¤
Creats the input layers for starting the graph
Returns:
Type | Description |
---|---|
tuple[tensor] -- The input layers for observations, rewards, actions |
Source code in corl/models/frame_stacking.py
def create_input_layers(self, obs_space, action_space):
"""Creats the input layers for starting the graph
Arguments:
obs_space {gym.Space} -- The input space - flattended
action_space {gym.Space} -- The input space - flattended
Returns:
tuple[tensor] -- The input layers for observations, rewards, actions
"""
# (?, Number of Frames, 1)
rewards = tf.keras.layers.Input(shape=(self.num_frames, 1), name="rewards")
# (?, Number of Frames, len obs flatten)
observations = tf.keras.layers.Input(shape=(self.num_frames, obs_space.shape[0]), name="observations")
# (?, Number of Frames, len actions flatten)
actions = tf.keras.layers.Input(shape=(self.num_frames, len(action_space)), name="actions")
return observations, actions, rewards
create_last_fc_layer_output(self, no_final_linear, num_outputs, activation, last_layer, hiddens, post_fcnet_hiddens, obs_space)
¤
[summary]
Returns:
Type | Description |
---|---|
[type] -- [description] |
Source code in corl/models/frame_stacking.py
def create_last_fc_layer_output(self, no_final_linear, num_outputs, activation, last_layer, hiddens, post_fcnet_hiddens, obs_space):
"""[summary]
Arguments:
no_final_linear {[type]} -- [description]
num_outputs {[type]} -- [description]
activation {[type]} -- [description]
last_layer {[type]} -- [description]
hiddens {[type]} -- [description]
obs_space {[type]} -- [description]
Returns:
[type] -- [description]
"""
# The action distribution outputs.
logits_out = None
# The last layer is adjusted to be of size num_outputs, but it's a
# layer with activation.
if no_final_linear and num_outputs:
logits_out = tf.keras.layers.Dense(
num_outputs, name="fc_out", activation=activation, kernel_initializer=normc_initializer(1.0)
)(last_layer)
# Finish the layers with the provided sizes (`hiddens`), plus -
# iff num_outputs > 0 - a last linear layer of size num_outputs.
else:
if len(hiddens) > 0:
last_layer = FrameStackingModel.flatten_plus_dense(
hiddens, post_fcnet_hiddens, last_layer, activation, "fc", len(hiddens) - 1
)
if num_outputs:
logits_out = tf.keras.layers.Dense(num_outputs, name="fc_out", activation=None,
kernel_initializer=normc_initializer(0.01))(last_layer)
# Adjust num_outputs to be the number of nodes in the last layer.
else:
self.num_outputs = ([int(np.product(obs_space.shape))] + hiddens[-1:])[-1]
return logits_out, last_layer
flatten_plus_dense(hiddens, post_fcnet_hiddens, layer, activation, prefix, index)
staticmethod
¤
Creates the final/last dense layer with flatten to ensure the correct output size
Returns:
Type | Description |
---|---|
Tensor -- The layer just created with flatten + FD |
Source code in corl/models/frame_stacking.py
@staticmethod
def flatten_plus_dense(hiddens, post_fcnet_hiddens, layer, activation, prefix: str, index: int):
"""Creates the final/last dense layer with flatten to ensure the correct output size
Arguments:
hiddens {List[int]} -- List containing the size of each hidden layer
layer {Tensor} -- [description]
activation {function]} -- [description]
prefix {str} -- The string to use when creating the layers
index {int} -- The index of the layer to add the flatten on.
Returns:
Tensor -- The layer just created with flatten + FD
"""
flatten_name = f"{prefix}_flatten_{index}"
dense_name = f"{prefix}_{index}"
layer = tf.keras.layers.Flatten(name=flatten_name)(layer)
layer = tf.keras.layers.Dense(hiddens[index], name=dense_name, activation=activation,
kernel_initializer=normc_initializer(1.0))(layer)
for index_cat, size in enumerate(post_fcnet_hiddens):
layer = tf.keras.layers.Dense(
size, name=f"{dense_name}_cat_{index_cat}", activation=activation, kernel_initializer=normc_initializer(1.0)
)(layer)
return layer
forward(self, input_dict, state, seq_lens)
¤
Call the model with the given input tensors and state.
Any complex observations (dicts, tuples, etc.) will be unpacked by call before being passed to forward(). To access the flattened observation tensor, refer to input_dict[“obs”].
This method can be called any number of times. In eager execution, each call to forward() will eagerly evaluate the model. In symbolic execution, each call to forward creates a computation graph that operates over the variables of this model (i.e., shares weights).
Custom models should override this instead of call.
Returns:
Type | Description |
---|---|
(Any, List[Any]) |
The model output tensor of size [BATCH, num_outputs], and the new RNN state. |
Source code in corl/models/frame_stacking.py
def forward(self, input_dict: Dict[str, TensorType], state: List[TensorType],
seq_lens: TensorType) -> (TensorType, List[TensorType]): # type: ignore
""" Call the model with the given input tensors and state.
Any complex observations (dicts, tuples, etc.) will be unpacked by __call__ before being passed to forward(). To access
the flattened observation tensor, refer to input_dict[“obs”].
This method can be called any number of times. In eager execution, each call to forward() will eagerly evaluate the model.
In symbolic execution, each call to forward creates a computation graph that operates over the variables of this model
(i.e., shares weights).
Custom models should override this instead of __call__.
Arguments:
input_dict (dict) – dictionary of input tensors, including “obs”, “obs_flat”, “prev_action”, “prev_reward”, “is_training”,
“eps_id”, “agent_id”, “infos”, and “t”.
state (list) – list of state tensors with sizes matching those returned by get_initial_state + the batch dimension
seq_lens (Tensor) – 1d tensor holding input sequence lengths
Returns:
The model output tensor of size [BATCH, num_outputs], and the new RNN state.
"""
if self.include_actions and not self.include_rewards:
model_out, self._value_out = self.base_model([input_dict[FrameStackingModel.PREV_N_OBS],
input_dict[FrameStackingModel.PREV_N_ACTIONS]])
elif not self.include_actions and self.include_rewards:
model_out, self._value_out = self.base_model([input_dict[FrameStackingModel.PREV_N_OBS],
input_dict[FrameStackingModel.PREV_N_REWARDS]])
elif self.include_actions and self.include_rewards:
model_out, self._value_out = self.base_model([input_dict[FrameStackingModel.PREV_N_OBS],
input_dict[FrameStackingModel.PREV_N_ACTIONS],
input_dict[FrameStackingModel.PREV_N_REWARDS]])
else:
model_out, self._value_out = self.base_model([input_dict[FrameStackingModel.PREV_N_OBS]])
return model_out, state
get_config_opts(self)
¤
Gets the configuration options utilizes by the frame stacking model
Returns:
Type | Description |
---|---|
Tuple -- configuration options for the models (Bool, list[int], function, Bool, Bool) |
Source code in corl/models/frame_stacking.py
def get_config_opts(self):
"""Gets the configuration options utilizes by the frame stacking model
Returns:
Tuple -- configuration options for the models (Bool, list[int], function, Bool, Bool)
"""
hiddens = self.model_config.get("fcnet_hiddens", []) + self.model_config.get("post_fcnet_hiddens", [])
activation = self.model_config.get("fcnet_activation")
if not self.model_config.get("fcnet_hiddens", []):
activation = self.model_config.get("post_fcnet_activation")
activation = get_activation_fn(activation)
no_final_linear = self.model_config.get("no_final_linear")
vf_share_layers = self.model_config.get("vf_share_layers")
free_log_std = self.model_config.get("free_log_std")
return free_log_std, hiddens, activation, no_final_linear, vf_share_layers
register_view_requirements(self, num_frames, obs_space, flattened_action_space)
¤
Sets up the view requirements for the forward pass call
Source code in corl/models/frame_stacking.py
def register_view_requirements(self, num_frames: int, obs_space, flattened_action_space):
"""Sets up the view requirements for the forward pass call
Arguments:
num_frames {int} -- The number of frames to stack
obs_space {[type]} -- The observation space definition
flattened_action_space {[type]} -- flattened action space
"""
self.view_requirements[FrameStackingModel.PREV_N_OBS
] = ViewRequirement(data_col="obs", shift="-{}:0".format(num_frames - 1), space=obs_space)
if self.include_rewards:
self.view_requirements[FrameStackingModel.PREV_N_REWARDS
] = ViewRequirement(data_col="rewards", shift="-{}:-1".format(self.num_frames))
if self.include_actions:
self.view_requirements[FrameStackingModel.PREV_N_ACTIONS] = ViewRequirement(
data_col="actions",
shift="-{}:-1".format(self.num_frames),
space=gym.spaces.box.Box(low=-np.inf, high=np.inf, shape=(len(flattened_action_space), ), dtype=np.int64)
)
select_input_layer_configuration(include_rewards, include_actions, observations, actions, rewards)
staticmethod
¤
Sets up the input layer based on the configuration of the arguments to the model
Source code in corl/models/frame_stacking.py
@staticmethod
def select_input_layer_configuration(include_rewards, include_actions, observations, actions, rewards):
"""Sets up the input layer based on the configuration of the arguments to the model
Arguments:
include_rewards {bool} -- Flag to indicate that rewards should be part of frame stacking
include_actions {bool} -- Flag to indicate that the actions should be part of frame stacking
observations {Tensor} -- [description]
actions {Tensor} -- [description]
rewards {Tensor} -- [description]
"""
# Last hidden layer output (before logits outputs).
if include_actions and not include_rewards:
input_list = [observations, actions]
inputs = tf.keras.layers.Concatenate(axis=-1)(input_list)
elif not include_actions and include_rewards:
input_list = [observations, rewards]
inputs = tf.keras.layers.Concatenate(axis=-1)(input_list)
elif include_actions and include_rewards:
input_list = [observations, actions, rewards]
inputs = tf.keras.layers.Concatenate(axis=-1)(input_list)
else:
input_list = [observations]
inputs = observations
return input_list, inputs
value_function(self)
¤
Returns the value function output for the most recent forward pass.
Note that a forward call has to be performed first, before this methods can return anything and thus that calling this method does not cause an extra forward pass through the network.
Returns:
Type | Description |
---|---|
Any |
value estimate tensor of shape [BATCH]. |
Source code in corl/models/frame_stacking.py
def value_function(self) -> TensorType:
"""Returns the value function output for the most recent forward pass.
Note that a forward call has to be performed first, before this methods can return anything and thus
that calling this method does not cause an extra forward pass through the network.
Returns:
value estimate tensor of shape [BATCH].
"""
return tf.reshape(self._value_out, [-1])