src.models.generator
1import keras 2import keras.ops as ops 3import numpy as np 4import matplotlib.pyplot as plt 5from tqdm import tqdm 6 7from src.utils import * 8 9 10class KeypointBasedTransform(keras.layers.Layer): 11 def __init__(self, batch_size, target_size=(60, 60), num_keypoints = 10, model=None, upscaler=None, **kwargs): 12 """ 13 Args: 14 batch_size (int): Default training batch size. 15 target_size (tuple): Target resolution for feature processing (H, W). 16 num_keypoints (int): Number of keypoints Keypoint Detector outputs. (Default: 10) 17 model (tf.keras.Model or tf.keras.Layer, optional): Custom motion prediction model. 18 upscaler (tf.keras.Model or tf.keras.Layer, optional): Custom upscaling model. 19 """ 20 super().__init__(**kwargs) 21 self.batch_size = batch_size 22 self.target_size = target_size 23 self.height, self.width = target_size 24 self.num_keypoints = num_keypoints 25 26 self.resize_layer = keras.layers.Resizing(self.height, self.width, interpolation="bilinear") 27 28 # Default motion field model 29 self.model = model if model is not None else keras.Sequential([ 30 keras.layers.Input(shape=(self.height, self.width, 22)), 31 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 32 keras.layers.Conv2D(64, 3, padding="same", activation="tanh"), 33 keras.layers.Conv2D(64, 3, padding="same", activation="tanh"), 34 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 35 keras.layers.Conv2D(11, 1, activation="linear") 36 ]) 37 38 self.mask = keras.layers.Conv2D(self.num_keypoints + 1, 3, padding="same", activation="relu") 39 self.occlusion = keras.layers.Conv2D(1, 3, padding="same", activation="relu") 40 41 # Default upscaler 42 self.upscaler = upscaler if upscaler is not None else keras.Sequential([ 43 keras.layers.Input(shape=(self.height, self.width, 1)), 44 keras.layers.Conv2D(64, 3, padding="same", activation="relu"), 45 keras.layers.UpSampling2D(size=2), 46 keras.layers.Conv2D(128, 3, padding="same", activation="tanh"), 47 keras.layers.UpSampling2D(size=2), 48 keras.layers.Conv2D(256, 3, padding="same", activation="tanh"), 49 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 50 keras.layers.Conv2D(1, 3, padding="same", activation="linear"), 51 keras.layers.Resizing(256, 256, interpolation="bilinear") 52 ]) 53 54 def call(self, inputs, inference=False): 55 source_image, source_kp, source_jac, driving_kp, driving_jac = inputs 56 57 resized_image = self.resize_layer(source_image) 58 59 sparse_motion_tensor = sparse_motion( 60 keras.ops.shape(resized_image[..., 0]), 61 source_kp, 62 driving_kp, 63 source_jac, 64 driving_jac 65 ) 66 67 batch_size = 1 if inference else self.batch_size 68 69 deformed_images = apply_sparse_motion(source_image, sparse_motion_tensor, batch_size=batch_size) 70 zeros = ops.zeros((batch_size, self.height, self.width, 1), dtype=deformed_images.dtype) 71 deformed_images = ops.concatenate([deformed_images, zeros], axis=-1) 72 73 source_density = kp2gaussian(source_kp, 0.01, (2, self.height, self.width), batch_size=batch_size) 74 driving_density = kp2gaussian(driving_kp, 0.01, (2, self.height, self.width), batch_size=batch_size) 75 76 motion_field = source_density - driving_density 77 motion_field = keras.ops.concatenate([ 78 keras.ops.zeros((batch_size, 1, self.height, self.width)), 79 motion_field 80 ], axis=1) 81 motion_field = keras.ops.transpose(motion_field, (0, 2, 3, 1)) 82 83 model_input = keras.ops.concatenate([motion_field, deformed_images], axis=-1) 84 density_output = self.model(model_input) 85 86 mask = self.mask(density_output) 87 88 zero_tensor = ops.zeros((batch_size, 1, self.height, self.width, 2), dtype=sparse_motion_tensor.dtype) 89 sparse_motion_tensor = ops.concatenate([sparse_motion_tensor, zero_tensor], axis=1) 90 91 grid_flow_map = ops.einsum("bnhwc,bhwn->bhwc", sparse_motion_tensor, mask) 92 occlusion = self.occlusion(density_output) 93 94 warped = grid_transform(resized_image, grid_flow_map, batch=batch_size) 95 warped *= occlusion 96 97 if inference: 98 return ( 99 self.upscaler(warped), 100 grid_flow_map, 101 keras.ops.mean(motion_field, axis=-1), 102 occlusion, 103 warped 104 ) 105 else: 106 return self.upscaler(warped)
11class KeypointBasedTransform(keras.layers.Layer): 12 def __init__(self, batch_size, target_size=(60, 60), num_keypoints = 10, model=None, upscaler=None, **kwargs): 13 """ 14 Args: 15 batch_size (int): Default training batch size. 16 target_size (tuple): Target resolution for feature processing (H, W). 17 num_keypoints (int): Number of keypoints Keypoint Detector outputs. (Default: 10) 18 model (tf.keras.Model or tf.keras.Layer, optional): Custom motion prediction model. 19 upscaler (tf.keras.Model or tf.keras.Layer, optional): Custom upscaling model. 20 """ 21 super().__init__(**kwargs) 22 self.batch_size = batch_size 23 self.target_size = target_size 24 self.height, self.width = target_size 25 self.num_keypoints = num_keypoints 26 27 self.resize_layer = keras.layers.Resizing(self.height, self.width, interpolation="bilinear") 28 29 # Default motion field model 30 self.model = model if model is not None else keras.Sequential([ 31 keras.layers.Input(shape=(self.height, self.width, 22)), 32 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 33 keras.layers.Conv2D(64, 3, padding="same", activation="tanh"), 34 keras.layers.Conv2D(64, 3, padding="same", activation="tanh"), 35 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 36 keras.layers.Conv2D(11, 1, activation="linear") 37 ]) 38 39 self.mask = keras.layers.Conv2D(self.num_keypoints + 1, 3, padding="same", activation="relu") 40 self.occlusion = keras.layers.Conv2D(1, 3, padding="same", activation="relu") 41 42 # Default upscaler 43 self.upscaler = upscaler if upscaler is not None else keras.Sequential([ 44 keras.layers.Input(shape=(self.height, self.width, 1)), 45 keras.layers.Conv2D(64, 3, padding="same", activation="relu"), 46 keras.layers.UpSampling2D(size=2), 47 keras.layers.Conv2D(128, 3, padding="same", activation="tanh"), 48 keras.layers.UpSampling2D(size=2), 49 keras.layers.Conv2D(256, 3, padding="same", activation="tanh"), 50 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 51 keras.layers.Conv2D(1, 3, padding="same", activation="linear"), 52 keras.layers.Resizing(256, 256, interpolation="bilinear") 53 ]) 54 55 def call(self, inputs, inference=False): 56 source_image, source_kp, source_jac, driving_kp, driving_jac = inputs 57 58 resized_image = self.resize_layer(source_image) 59 60 sparse_motion_tensor = sparse_motion( 61 keras.ops.shape(resized_image[..., 0]), 62 source_kp, 63 driving_kp, 64 source_jac, 65 driving_jac 66 ) 67 68 batch_size = 1 if inference else self.batch_size 69 70 deformed_images = apply_sparse_motion(source_image, sparse_motion_tensor, batch_size=batch_size) 71 zeros = ops.zeros((batch_size, self.height, self.width, 1), dtype=deformed_images.dtype) 72 deformed_images = ops.concatenate([deformed_images, zeros], axis=-1) 73 74 source_density = kp2gaussian(source_kp, 0.01, (2, self.height, self.width), batch_size=batch_size) 75 driving_density = kp2gaussian(driving_kp, 0.01, (2, self.height, self.width), batch_size=batch_size) 76 77 motion_field = source_density - driving_density 78 motion_field = keras.ops.concatenate([ 79 keras.ops.zeros((batch_size, 1, self.height, self.width)), 80 motion_field 81 ], axis=1) 82 motion_field = keras.ops.transpose(motion_field, (0, 2, 3, 1)) 83 84 model_input = keras.ops.concatenate([motion_field, deformed_images], axis=-1) 85 density_output = self.model(model_input) 86 87 mask = self.mask(density_output) 88 89 zero_tensor = ops.zeros((batch_size, 1, self.height, self.width, 2), dtype=sparse_motion_tensor.dtype) 90 sparse_motion_tensor = ops.concatenate([sparse_motion_tensor, zero_tensor], axis=1) 91 92 grid_flow_map = ops.einsum("bnhwc,bhwn->bhwc", sparse_motion_tensor, mask) 93 occlusion = self.occlusion(density_output) 94 95 warped = grid_transform(resized_image, grid_flow_map, batch=batch_size) 96 warped *= occlusion 97 98 if inference: 99 return ( 100 self.upscaler(warped), 101 grid_flow_map, 102 keras.ops.mean(motion_field, axis=-1), 103 occlusion, 104 warped 105 ) 106 else: 107 return self.upscaler(warped)
This is the class from which all layers inherit.
A layer is a callable object that takes as input one or more tensors and
that outputs one or more tensors. It involves computation, defined
in the call()
method, and a state (weight variables). State can be
created:
- in
__init__()
, for instance viaself.add_weight()
; - in the optional
build()
method, which is invoked by the first__call__()
to the layer, and supplies the shape(s) of the input(s), which may not have been known at initialization time.
Layers are recursively composable: If you assign a Layer instance as an
attribute of another Layer, the outer layer will start tracking the weights
created by the inner layer. Nested layers should be instantiated in the
__init__()
method or build()
method.
Users will just instantiate a layer and then treat it as a callable.
Args:
trainable: Boolean, whether the layer's variables should be trainable.
name: String name of the layer.
dtype: The dtype of the layer's computations and weights. Can also be a
keras.DTypePolicy
,
which allows the computation and
weight dtype to differ. Defaults to None
. None
means to use
keras.config.dtype_policy()
,
which is a float32
policy unless set to different value
(via keras.config.set_dtype_policy()
).
Attributes:
name: The name of the layer (string).
dtype: Dtype of the layer's weights. Alias of layer.variable_dtype
.
variable_dtype: Dtype of the layer's weights.
compute_dtype: The dtype of the layer's computations.
Layers automatically cast inputs to this dtype, which causes
the computations and output to also be in this dtype.
When mixed precision is used with a
keras.DTypePolicy
, this will be different
than variable_dtype
.
trainable_weights: List of variables to be included in backprop.
non_trainable_weights: List of variables that should not be
included in backprop.
weights: The concatenation of the lists trainable_weights and
non_trainable_weights (in this order).
trainable: Whether the layer should be trained (boolean), i.e.
whether its potentially-trainable weights should be returned
as part of layer.trainable_weights
.
input_spec: Optional (list of) InputSpec
object(s) specifying the
constraints on inputs that can be accepted by the layer.
We recommend that descendants of Layer
implement the following methods:
__init__()
: Defines custom layer attributes, and creates layer weights that do not depend on input shapes, usingadd_weight()
, or other state.build(self, input_shape)
: This method can be used to create weights that depend on the shape(s) of the input(s), usingadd_weight()
, or other state.__call__()
will automatically build the layer (if it has not been built yet) by callingbuild()
.call(self, *args, **kwargs)
: Called in__call__
after making surebuild()
has been called.call()
performs the logic of applying the layer to the input arguments. Two reserved keyword arguments you can optionally use incall()
are: 1.training
(boolean, whether the call is in inference mode or training mode). 2.mask
(boolean tensor encoding masked timesteps in the input, used e.g. in RNN layers). A typical signature for this method iscall(self, inputs)
, and user could optionally addtraining
andmask
if the layer need them.get_config(self)
: Returns a dictionary containing the configuration used to initialize this layer. If the keys differ from the arguments in__init__()
, then overridefrom_config(self)
as well. This method is used when saving the layer or a model that contains this layer.
Examples:
Here's a basic example: a layer with two variables, w
and b
,
that returns y = w . x + b
.
It shows how to implement build()
and call()
.
Variables set as attributes of a layer are tracked as weights
of the layers (in layer.weights
).
class SimpleDense(Layer):
def __init__(self, units=32):
super().__init__()
self.units = units
# Create the state of the layer (weights)
def build(self, input_shape):
self.kernel = self.add_weight(
shape=(input_shape[-1], self.units),
initializer="glorot_uniform",
trainable=True,
name="kernel",
)
self.bias = self.add_weight(
shape=(self.units,),
initializer="zeros",
trainable=True,
name="bias",
)
# Defines the computation
def call(self, inputs):
return ops.matmul(inputs, self.kernel) + self.bias
# Instantiates the layer.
linear_layer = SimpleDense(4)
# This will also call `build(input_shape)` and create the weights.
y = linear_layer(ops.ones((2, 2)))
assert len(linear_layer.weights) == 2
# These weights are trainable, so they're listed in `trainable_weights`:
assert len(linear_layer.trainable_weights) == 2
Besides trainable weights, updated via backpropagation during training,
layers can also have non-trainable weights. These weights are meant to
be updated manually during call()
. Here's a example layer that computes
the running sum of its inputs:
class ComputeSum(Layer):
def __init__(self, input_dim):
super(ComputeSum, self).__init__()
# Create a non-trainable weight.
self.total = self.add_weight(
shape=(),
initializer="zeros",
trainable=False,
name="total",
)
def call(self, inputs):
self.total.assign(self.total + ops.sum(inputs))
return self.total
my_sum = ComputeSum(2)
x = ops.ones((2, 2))
y = my_sum(x)
assert my_sum.weights == [my_sum.total]
assert my_sum.non_trainable_weights == [my_sum.total]
assert my_sum.trainable_weights == []
12 def __init__(self, batch_size, target_size=(60, 60), num_keypoints = 10, model=None, upscaler=None, **kwargs): 13 """ 14 Args: 15 batch_size (int): Default training batch size. 16 target_size (tuple): Target resolution for feature processing (H, W). 17 num_keypoints (int): Number of keypoints Keypoint Detector outputs. (Default: 10) 18 model (tf.keras.Model or tf.keras.Layer, optional): Custom motion prediction model. 19 upscaler (tf.keras.Model or tf.keras.Layer, optional): Custom upscaling model. 20 """ 21 super().__init__(**kwargs) 22 self.batch_size = batch_size 23 self.target_size = target_size 24 self.height, self.width = target_size 25 self.num_keypoints = num_keypoints 26 27 self.resize_layer = keras.layers.Resizing(self.height, self.width, interpolation="bilinear") 28 29 # Default motion field model 30 self.model = model if model is not None else keras.Sequential([ 31 keras.layers.Input(shape=(self.height, self.width, 22)), 32 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 33 keras.layers.Conv2D(64, 3, padding="same", activation="tanh"), 34 keras.layers.Conv2D(64, 3, padding="same", activation="tanh"), 35 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 36 keras.layers.Conv2D(11, 1, activation="linear") 37 ]) 38 39 self.mask = keras.layers.Conv2D(self.num_keypoints + 1, 3, padding="same", activation="relu") 40 self.occlusion = keras.layers.Conv2D(1, 3, padding="same", activation="relu") 41 42 # Default upscaler 43 self.upscaler = upscaler if upscaler is not None else keras.Sequential([ 44 keras.layers.Input(shape=(self.height, self.width, 1)), 45 keras.layers.Conv2D(64, 3, padding="same", activation="relu"), 46 keras.layers.UpSampling2D(size=2), 47 keras.layers.Conv2D(128, 3, padding="same", activation="tanh"), 48 keras.layers.UpSampling2D(size=2), 49 keras.layers.Conv2D(256, 3, padding="same", activation="tanh"), 50 keras.layers.Conv2D(32, 3, padding="same", activation="relu"), 51 keras.layers.Conv2D(1, 3, padding="same", activation="linear"), 52 keras.layers.Resizing(256, 256, interpolation="bilinear") 53 ])
Args: batch_size (int): Default training batch size. target_size (tuple): Target resolution for feature processing (H, W). num_keypoints (int): Number of keypoints Keypoint Detector outputs. (Default: 10) model (tf.keras.Model or tf.keras.Layer, optional): Custom motion prediction model. upscaler (tf.keras.Model or tf.keras.Layer, optional): Custom upscaling model.
55 def call(self, inputs, inference=False): 56 source_image, source_kp, source_jac, driving_kp, driving_jac = inputs 57 58 resized_image = self.resize_layer(source_image) 59 60 sparse_motion_tensor = sparse_motion( 61 keras.ops.shape(resized_image[..., 0]), 62 source_kp, 63 driving_kp, 64 source_jac, 65 driving_jac 66 ) 67 68 batch_size = 1 if inference else self.batch_size 69 70 deformed_images = apply_sparse_motion(source_image, sparse_motion_tensor, batch_size=batch_size) 71 zeros = ops.zeros((batch_size, self.height, self.width, 1), dtype=deformed_images.dtype) 72 deformed_images = ops.concatenate([deformed_images, zeros], axis=-1) 73 74 source_density = kp2gaussian(source_kp, 0.01, (2, self.height, self.width), batch_size=batch_size) 75 driving_density = kp2gaussian(driving_kp, 0.01, (2, self.height, self.width), batch_size=batch_size) 76 77 motion_field = source_density - driving_density 78 motion_field = keras.ops.concatenate([ 79 keras.ops.zeros((batch_size, 1, self.height, self.width)), 80 motion_field 81 ], axis=1) 82 motion_field = keras.ops.transpose(motion_field, (0, 2, 3, 1)) 83 84 model_input = keras.ops.concatenate([motion_field, deformed_images], axis=-1) 85 density_output = self.model(model_input) 86 87 mask = self.mask(density_output) 88 89 zero_tensor = ops.zeros((batch_size, 1, self.height, self.width, 2), dtype=sparse_motion_tensor.dtype) 90 sparse_motion_tensor = ops.concatenate([sparse_motion_tensor, zero_tensor], axis=1) 91 92 grid_flow_map = ops.einsum("bnhwc,bhwn->bhwc", sparse_motion_tensor, mask) 93 occlusion = self.occlusion(density_output) 94 95 warped = grid_transform(resized_image, grid_flow_map, batch=batch_size) 96 warped *= occlusion 97 98 if inference: 99 return ( 100 self.upscaler(warped), 101 grid_flow_map, 102 keras.ops.mean(motion_field, axis=-1), 103 occlusion, 104 warped 105 ) 106 else: 107 return self.upscaler(warped)