src.models.generator

  1import keras
  2import keras.ops as ops
  3import numpy as np
  4import matplotlib.pyplot as plt
  5from tqdm import tqdm
  6
  7from src.utils import *
  8
  9
 10class KeypointBasedTransform(keras.layers.Layer):
 11    def __init__(self, batch_size, target_size=(60, 60), num_keypoints = 10, model=None, upscaler=None, **kwargs):
 12        """
 13        Args:
 14            batch_size (int): Default training batch size.
 15            target_size (tuple): Target resolution for feature processing (H, W).
 16            num_keypoints (int): Number of keypoints Keypoint Detector outputs. (Default: 10)
 17            model (tf.keras.Model or tf.keras.Layer, optional): Custom motion prediction model.
 18            upscaler (tf.keras.Model or tf.keras.Layer, optional): Custom upscaling model.
 19        """
 20        super().__init__(**kwargs)
 21        self.batch_size = batch_size
 22        self.target_size = target_size
 23        self.height, self.width = target_size
 24        self.num_keypoints = num_keypoints
 25
 26        self.resize_layer = keras.layers.Resizing(self.height, self.width, interpolation="bilinear")
 27
 28        # Default motion field model
 29        self.model = model if model is not None else keras.Sequential([
 30            keras.layers.Input(shape=(self.height, self.width, 22)),
 31            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
 32            keras.layers.Conv2D(64, 3, padding="same", activation="tanh"),
 33            keras.layers.Conv2D(64, 3, padding="same", activation="tanh"),
 34            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
 35            keras.layers.Conv2D(11, 1, activation="linear")
 36        ])
 37
 38        self.mask = keras.layers.Conv2D(self.num_keypoints + 1, 3, padding="same", activation="relu")
 39        self.occlusion = keras.layers.Conv2D(1, 3, padding="same", activation="relu")
 40
 41        # Default upscaler
 42        self.upscaler = upscaler if upscaler is not None else keras.Sequential([
 43            keras.layers.Input(shape=(self.height, self.width, 1)),
 44            keras.layers.Conv2D(64, 3, padding="same", activation="relu"),
 45            keras.layers.UpSampling2D(size=2),
 46            keras.layers.Conv2D(128, 3, padding="same", activation="tanh"),
 47            keras.layers.UpSampling2D(size=2),
 48            keras.layers.Conv2D(256, 3, padding="same", activation="tanh"),
 49            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
 50            keras.layers.Conv2D(1, 3, padding="same", activation="linear"),
 51            keras.layers.Resizing(256, 256, interpolation="bilinear")
 52        ])
 53
 54    def call(self, inputs, inference=False):
 55        source_image, source_kp, source_jac, driving_kp, driving_jac = inputs
 56
 57        resized_image = self.resize_layer(source_image)
 58
 59        sparse_motion_tensor = sparse_motion(
 60            keras.ops.shape(resized_image[..., 0]),
 61            source_kp,
 62            driving_kp,
 63            source_jac,
 64            driving_jac
 65        )
 66
 67        batch_size = 1 if inference else self.batch_size
 68
 69        deformed_images = apply_sparse_motion(source_image, sparse_motion_tensor, batch_size=batch_size)
 70        zeros = ops.zeros((batch_size, self.height, self.width, 1), dtype=deformed_images.dtype)
 71        deformed_images = ops.concatenate([deformed_images, zeros], axis=-1)
 72
 73        source_density = kp2gaussian(source_kp, 0.01, (2, self.height, self.width), batch_size=batch_size)
 74        driving_density = kp2gaussian(driving_kp, 0.01, (2, self.height, self.width), batch_size=batch_size)
 75
 76        motion_field = source_density - driving_density
 77        motion_field = keras.ops.concatenate([
 78            keras.ops.zeros((batch_size, 1, self.height, self.width)), 
 79            motion_field
 80        ], axis=1)
 81        motion_field = keras.ops.transpose(motion_field, (0, 2, 3, 1))
 82
 83        model_input = keras.ops.concatenate([motion_field, deformed_images], axis=-1)
 84        density_output = self.model(model_input)
 85
 86        mask = self.mask(density_output)
 87
 88        zero_tensor = ops.zeros((batch_size, 1, self.height, self.width, 2), dtype=sparse_motion_tensor.dtype)
 89        sparse_motion_tensor = ops.concatenate([sparse_motion_tensor, zero_tensor], axis=1)
 90
 91        grid_flow_map = ops.einsum("bnhwc,bhwn->bhwc", sparse_motion_tensor, mask)
 92        occlusion = self.occlusion(density_output)
 93
 94        warped = grid_transform(resized_image, grid_flow_map, batch=batch_size)
 95        warped *= occlusion
 96
 97        if inference:
 98            return (
 99                self.upscaler(warped),
100                grid_flow_map,
101                keras.ops.mean(motion_field, axis=-1),
102                occlusion,
103                warped
104            )
105        else:
106            return self.upscaler(warped)
class KeypointBasedTransform(keras.src.layers.layer.Layer):
 11class KeypointBasedTransform(keras.layers.Layer):
 12    def __init__(self, batch_size, target_size=(60, 60), num_keypoints = 10, model=None, upscaler=None, **kwargs):
 13        """
 14        Args:
 15            batch_size (int): Default training batch size.
 16            target_size (tuple): Target resolution for feature processing (H, W).
 17            num_keypoints (int): Number of keypoints Keypoint Detector outputs. (Default: 10)
 18            model (tf.keras.Model or tf.keras.Layer, optional): Custom motion prediction model.
 19            upscaler (tf.keras.Model or tf.keras.Layer, optional): Custom upscaling model.
 20        """
 21        super().__init__(**kwargs)
 22        self.batch_size = batch_size
 23        self.target_size = target_size
 24        self.height, self.width = target_size
 25        self.num_keypoints = num_keypoints
 26
 27        self.resize_layer = keras.layers.Resizing(self.height, self.width, interpolation="bilinear")
 28
 29        # Default motion field model
 30        self.model = model if model is not None else keras.Sequential([
 31            keras.layers.Input(shape=(self.height, self.width, 22)),
 32            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
 33            keras.layers.Conv2D(64, 3, padding="same", activation="tanh"),
 34            keras.layers.Conv2D(64, 3, padding="same", activation="tanh"),
 35            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
 36            keras.layers.Conv2D(11, 1, activation="linear")
 37        ])
 38
 39        self.mask = keras.layers.Conv2D(self.num_keypoints + 1, 3, padding="same", activation="relu")
 40        self.occlusion = keras.layers.Conv2D(1, 3, padding="same", activation="relu")
 41
 42        # Default upscaler
 43        self.upscaler = upscaler if upscaler is not None else keras.Sequential([
 44            keras.layers.Input(shape=(self.height, self.width, 1)),
 45            keras.layers.Conv2D(64, 3, padding="same", activation="relu"),
 46            keras.layers.UpSampling2D(size=2),
 47            keras.layers.Conv2D(128, 3, padding="same", activation="tanh"),
 48            keras.layers.UpSampling2D(size=2),
 49            keras.layers.Conv2D(256, 3, padding="same", activation="tanh"),
 50            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
 51            keras.layers.Conv2D(1, 3, padding="same", activation="linear"),
 52            keras.layers.Resizing(256, 256, interpolation="bilinear")
 53        ])
 54
 55    def call(self, inputs, inference=False):
 56        source_image, source_kp, source_jac, driving_kp, driving_jac = inputs
 57
 58        resized_image = self.resize_layer(source_image)
 59
 60        sparse_motion_tensor = sparse_motion(
 61            keras.ops.shape(resized_image[..., 0]),
 62            source_kp,
 63            driving_kp,
 64            source_jac,
 65            driving_jac
 66        )
 67
 68        batch_size = 1 if inference else self.batch_size
 69
 70        deformed_images = apply_sparse_motion(source_image, sparse_motion_tensor, batch_size=batch_size)
 71        zeros = ops.zeros((batch_size, self.height, self.width, 1), dtype=deformed_images.dtype)
 72        deformed_images = ops.concatenate([deformed_images, zeros], axis=-1)
 73
 74        source_density = kp2gaussian(source_kp, 0.01, (2, self.height, self.width), batch_size=batch_size)
 75        driving_density = kp2gaussian(driving_kp, 0.01, (2, self.height, self.width), batch_size=batch_size)
 76
 77        motion_field = source_density - driving_density
 78        motion_field = keras.ops.concatenate([
 79            keras.ops.zeros((batch_size, 1, self.height, self.width)), 
 80            motion_field
 81        ], axis=1)
 82        motion_field = keras.ops.transpose(motion_field, (0, 2, 3, 1))
 83
 84        model_input = keras.ops.concatenate([motion_field, deformed_images], axis=-1)
 85        density_output = self.model(model_input)
 86
 87        mask = self.mask(density_output)
 88
 89        zero_tensor = ops.zeros((batch_size, 1, self.height, self.width, 2), dtype=sparse_motion_tensor.dtype)
 90        sparse_motion_tensor = ops.concatenate([sparse_motion_tensor, zero_tensor], axis=1)
 91
 92        grid_flow_map = ops.einsum("bnhwc,bhwn->bhwc", sparse_motion_tensor, mask)
 93        occlusion = self.occlusion(density_output)
 94
 95        warped = grid_transform(resized_image, grid_flow_map, batch=batch_size)
 96        warped *= occlusion
 97
 98        if inference:
 99            return (
100                self.upscaler(warped),
101                grid_flow_map,
102                keras.ops.mean(motion_field, axis=-1),
103                occlusion,
104                warped
105            )
106        else:
107            return self.upscaler(warped)

This is the class from which all layers inherit.

A layer is a callable object that takes as input one or more tensors and that outputs one or more tensors. It involves computation, defined in the call() method, and a state (weight variables). State can be created:

  • in __init__(), for instance via self.add_weight();
  • in the optional build() method, which is invoked by the first __call__() to the layer, and supplies the shape(s) of the input(s), which may not have been known at initialization time.

Layers are recursively composable: If you assign a Layer instance as an attribute of another Layer, the outer layer will start tracking the weights created by the inner layer. Nested layers should be instantiated in the __init__() method or build() method.

Users will just instantiate a layer and then treat it as a callable.

Args: trainable: Boolean, whether the layer's variables should be trainable. name: String name of the layer. dtype: The dtype of the layer's computations and weights. Can also be a keras.DTypePolicy, which allows the computation and weight dtype to differ. Defaults to None. None means to use keras.config.dtype_policy(), which is a float32 policy unless set to different value (via keras.config.set_dtype_policy()).

Attributes: name: The name of the layer (string). dtype: Dtype of the layer's weights. Alias of layer.variable_dtype. variable_dtype: Dtype of the layer's weights. compute_dtype: The dtype of the layer's computations. Layers automatically cast inputs to this dtype, which causes the computations and output to also be in this dtype. When mixed precision is used with a keras.DTypePolicy, this will be different than variable_dtype. trainable_weights: List of variables to be included in backprop. non_trainable_weights: List of variables that should not be included in backprop. weights: The concatenation of the lists trainable_weights and non_trainable_weights (in this order). trainable: Whether the layer should be trained (boolean), i.e. whether its potentially-trainable weights should be returned as part of layer.trainable_weights. input_spec: Optional (list of) InputSpec object(s) specifying the constraints on inputs that can be accepted by the layer.

We recommend that descendants of Layer implement the following methods:

  • __init__(): Defines custom layer attributes, and creates layer weights that do not depend on input shapes, using add_weight(), or other state.
  • build(self, input_shape): This method can be used to create weights that depend on the shape(s) of the input(s), using add_weight(), or other state. __call__() will automatically build the layer (if it has not been built yet) by calling build().
  • call(self, *args, **kwargs): Called in __call__ after making sure build() has been called. call() performs the logic of applying the layer to the input arguments. Two reserved keyword arguments you can optionally use in call() are: 1. training (boolean, whether the call is in inference mode or training mode). 2. mask (boolean tensor encoding masked timesteps in the input, used e.g. in RNN layers). A typical signature for this method is call(self, inputs), and user could optionally add training and mask if the layer need them.
  • get_config(self): Returns a dictionary containing the configuration used to initialize this layer. If the keys differ from the arguments in __init__(), then override from_config(self) as well. This method is used when saving the layer or a model that contains this layer.

Examples:

Here's a basic example: a layer with two variables, w and b, that returns y = w . x + b. It shows how to implement build() and call(). Variables set as attributes of a layer are tracked as weights of the layers (in layer.weights).

class SimpleDense(Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    # Create the state of the layer (weights)
    def build(self, input_shape):
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="glorot_uniform",
            trainable=True,
            name="kernel",
        )
        self.bias = self.add_weight(
            shape=(self.units,),
            initializer="zeros",
            trainable=True,
            name="bias",
        )

    # Defines the computation
    def call(self, inputs):
        return ops.matmul(inputs, self.kernel) + self.bias

# Instantiates the layer.
linear_layer = SimpleDense(4)

# This will also call `build(input_shape)` and create the weights.
y = linear_layer(ops.ones((2, 2)))
assert len(linear_layer.weights) == 2

# These weights are trainable, so they're listed in `trainable_weights`:
assert len(linear_layer.trainable_weights) == 2

Besides trainable weights, updated via backpropagation during training, layers can also have non-trainable weights. These weights are meant to be updated manually during call(). Here's a example layer that computes the running sum of its inputs:

class ComputeSum(Layer):

  def __init__(self, input_dim):
      super(ComputeSum, self).__init__()
      # Create a non-trainable weight.
      self.total = self.add_weight(
        shape=(),
        initializer="zeros",
        trainable=False,
        name="total",
      )

  def call(self, inputs):
      self.total.assign(self.total + ops.sum(inputs))
      return self.total

my_sum = ComputeSum(2)
x = ops.ones((2, 2))
y = my_sum(x)

assert my_sum.weights == [my_sum.total]
assert my_sum.non_trainable_weights == [my_sum.total]
assert my_sum.trainable_weights == []
KeypointBasedTransform( batch_size, target_size=(60, 60), num_keypoints=10, model=None, upscaler=None, **kwargs)
12    def __init__(self, batch_size, target_size=(60, 60), num_keypoints = 10, model=None, upscaler=None, **kwargs):
13        """
14        Args:
15            batch_size (int): Default training batch size.
16            target_size (tuple): Target resolution for feature processing (H, W).
17            num_keypoints (int): Number of keypoints Keypoint Detector outputs. (Default: 10)
18            model (tf.keras.Model or tf.keras.Layer, optional): Custom motion prediction model.
19            upscaler (tf.keras.Model or tf.keras.Layer, optional): Custom upscaling model.
20        """
21        super().__init__(**kwargs)
22        self.batch_size = batch_size
23        self.target_size = target_size
24        self.height, self.width = target_size
25        self.num_keypoints = num_keypoints
26
27        self.resize_layer = keras.layers.Resizing(self.height, self.width, interpolation="bilinear")
28
29        # Default motion field model
30        self.model = model if model is not None else keras.Sequential([
31            keras.layers.Input(shape=(self.height, self.width, 22)),
32            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
33            keras.layers.Conv2D(64, 3, padding="same", activation="tanh"),
34            keras.layers.Conv2D(64, 3, padding="same", activation="tanh"),
35            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
36            keras.layers.Conv2D(11, 1, activation="linear")
37        ])
38
39        self.mask = keras.layers.Conv2D(self.num_keypoints + 1, 3, padding="same", activation="relu")
40        self.occlusion = keras.layers.Conv2D(1, 3, padding="same", activation="relu")
41
42        # Default upscaler
43        self.upscaler = upscaler if upscaler is not None else keras.Sequential([
44            keras.layers.Input(shape=(self.height, self.width, 1)),
45            keras.layers.Conv2D(64, 3, padding="same", activation="relu"),
46            keras.layers.UpSampling2D(size=2),
47            keras.layers.Conv2D(128, 3, padding="same", activation="tanh"),
48            keras.layers.UpSampling2D(size=2),
49            keras.layers.Conv2D(256, 3, padding="same", activation="tanh"),
50            keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
51            keras.layers.Conv2D(1, 3, padding="same", activation="linear"),
52            keras.layers.Resizing(256, 256, interpolation="bilinear")
53        ])

Args: batch_size (int): Default training batch size. target_size (tuple): Target resolution for feature processing (H, W). num_keypoints (int): Number of keypoints Keypoint Detector outputs. (Default: 10) model (tf.keras.Model or tf.keras.Layer, optional): Custom motion prediction model. upscaler (tf.keras.Model or tf.keras.Layer, optional): Custom upscaling model.

batch_size
target_size
num_keypoints
resize_layer
model
mask
occlusion
upscaler
def call(self, inputs, inference=False):
 55    def call(self, inputs, inference=False):
 56        source_image, source_kp, source_jac, driving_kp, driving_jac = inputs
 57
 58        resized_image = self.resize_layer(source_image)
 59
 60        sparse_motion_tensor = sparse_motion(
 61            keras.ops.shape(resized_image[..., 0]),
 62            source_kp,
 63            driving_kp,
 64            source_jac,
 65            driving_jac
 66        )
 67
 68        batch_size = 1 if inference else self.batch_size
 69
 70        deformed_images = apply_sparse_motion(source_image, sparse_motion_tensor, batch_size=batch_size)
 71        zeros = ops.zeros((batch_size, self.height, self.width, 1), dtype=deformed_images.dtype)
 72        deformed_images = ops.concatenate([deformed_images, zeros], axis=-1)
 73
 74        source_density = kp2gaussian(source_kp, 0.01, (2, self.height, self.width), batch_size=batch_size)
 75        driving_density = kp2gaussian(driving_kp, 0.01, (2, self.height, self.width), batch_size=batch_size)
 76
 77        motion_field = source_density - driving_density
 78        motion_field = keras.ops.concatenate([
 79            keras.ops.zeros((batch_size, 1, self.height, self.width)), 
 80            motion_field
 81        ], axis=1)
 82        motion_field = keras.ops.transpose(motion_field, (0, 2, 3, 1))
 83
 84        model_input = keras.ops.concatenate([motion_field, deformed_images], axis=-1)
 85        density_output = self.model(model_input)
 86
 87        mask = self.mask(density_output)
 88
 89        zero_tensor = ops.zeros((batch_size, 1, self.height, self.width, 2), dtype=sparse_motion_tensor.dtype)
 90        sparse_motion_tensor = ops.concatenate([sparse_motion_tensor, zero_tensor], axis=1)
 91
 92        grid_flow_map = ops.einsum("bnhwc,bhwn->bhwc", sparse_motion_tensor, mask)
 93        occlusion = self.occlusion(density_output)
 94
 95        warped = grid_transform(resized_image, grid_flow_map, batch=batch_size)
 96        warped *= occlusion
 97
 98        if inference:
 99            return (
100                self.upscaler(warped),
101                grid_flow_map,
102                keras.ops.mean(motion_field, axis=-1),
103                occlusion,
104                warped
105            )
106        else:
107            return self.upscaler(warped)