"""Torch models defining encoder, decoder, Generator and Discriminator.
Code adapted from https://github.com/samet-akcay/ganomaly.
"""
# Copyright (c) 2018-2022 Samet Akcay, Durham University, UK
# SPDX-License-Identifier: MIT
#
# Copyright (C) 2020-2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import math
import torch
from torch import Tensor, nn
from anomalib.data.utils.image import pad_nextpow2
[docs]class Encoder(nn.Module):
"""Encoder Network.
Args:
input_size (tuple[int, int]): Size of input image
latent_vec_size (int): Size of latent vector z
num_input_channels (int): Number of input channels in the image
n_features (int): Number of features per convolution layer
extra_layers (int): Number of extra layers since the network uses only a single encoder layer by default.
Defaults to 0.
"""
def __init__(
self,
input_size: tuple[int, int],
latent_vec_size: int,
num_input_channels: int,
n_features: int,
extra_layers: int = 0,
add_final_conv_layer: bool = True,
) -> None:
super().__init__()
self.input_layers = nn.Sequential()
self.input_layers.add_module(
f"initial-conv-{num_input_channels}-{n_features}",
nn.Conv2d(num_input_channels, n_features, kernel_size=4, stride=2, padding=4, bias=False),
)
self.input_layers.add_module(f"initial-relu-{n_features}", nn.LeakyReLU(0.2, inplace=True))
# Extra Layers
self.extra_layers = nn.Sequential()
for layer in range(extra_layers):
self.extra_layers.add_module(
f"extra-layers-{layer}-{n_features}-conv",
nn.Conv2d(n_features, n_features, kernel_size=3, stride=1, padding=1, bias=False),
)
self.extra_layers.add_module(f"extra-layers-{layer}-{n_features}-batchnorm", nn.BatchNorm2d(n_features))
self.extra_layers.add_module(f"extra-layers-{layer}-{n_features}-relu", nn.LeakyReLU(0.2, inplace=True))
# Create pyramid features to reach latent vector
self.pyramid_features = nn.Sequential()
pyramid_dim = min(*input_size) // 2 # Use the smaller dimension to create pyramid.
while pyramid_dim > 4:
in_features = n_features
out_features = n_features * 2
self.pyramid_features.add_module(
f"pyramid-{in_features}-{out_features}-conv",
nn.Conv2d(in_features, out_features, kernel_size=4, stride=2, padding=1, bias=False),
)
self.pyramid_features.add_module(f"pyramid-{out_features}-batchnorm", nn.BatchNorm2d(out_features))
self.pyramid_features.add_module(f"pyramid-{out_features}-relu", nn.LeakyReLU(0.2, inplace=True))
n_features = out_features
pyramid_dim = pyramid_dim // 2
# Final conv
if add_final_conv_layer:
self.final_conv_layer = nn.Conv2d(
n_features,
latent_vec_size,
kernel_size=4,
stride=1,
padding=0,
bias=False,
)
[docs] def forward(self, input_tensor: Tensor) -> Tensor:
"""Return latent vectors."""
output = self.input_layers(input_tensor)
output = self.extra_layers(output)
output = self.pyramid_features(output)
if self.final_conv_layer is not None:
output = self.final_conv_layer(output)
return output
[docs]class Decoder(nn.Module):
"""Decoder Network.
Args:
input_size (tuple[int, int]): Size of input image
latent_vec_size (int): Size of latent vector z
num_input_channels (int): Number of input channels in the image
n_features (int): Number of features per convolution layer
extra_layers (int): Number of extra layers since the network uses only a single encoder layer by default.
Defaults to 0.
"""
def __init__(
self,
input_size: tuple[int, int],
latent_vec_size: int,
num_input_channels: int,
n_features: int,
extra_layers: int = 0,
) -> None:
super().__init__()
self.latent_input = nn.Sequential()
# Calculate input channel size to recreate inverse pyramid
exp_factor = math.ceil(math.log(min(input_size) // 2, 2)) - 2
n_input_features = n_features * (2**exp_factor)
# CNN layer for latent vector input
self.latent_input.add_module(
f"initial-{latent_vec_size}-{n_input_features}-convt",
nn.ConvTranspose2d(
latent_vec_size,
n_input_features,
kernel_size=4,
stride=1,
padding=0,
bias=False,
),
)
self.latent_input.add_module(f"initial-{n_input_features}-batchnorm", nn.BatchNorm2d(n_input_features))
self.latent_input.add_module(f"initial-{n_input_features}-relu", nn.ReLU(True))
# Create inverse pyramid
self.inverse_pyramid = nn.Sequential()
pyramid_dim = min(*input_size) // 2 # Use the smaller dimension to create pyramid.
while pyramid_dim > 4:
in_features = n_input_features
out_features = n_input_features // 2
self.inverse_pyramid.add_module(
f"pyramid-{in_features}-{out_features}-convt",
nn.ConvTranspose2d(
in_features,
out_features,
kernel_size=4,
stride=2,
padding=1,
bias=False,
),
)
self.inverse_pyramid.add_module(f"pyramid-{out_features}-batchnorm", nn.BatchNorm2d(out_features))
self.inverse_pyramid.add_module(f"pyramid-{out_features}-relu", nn.ReLU(True))
n_input_features = out_features
pyramid_dim = pyramid_dim // 2
# Extra Layers
self.extra_layers = nn.Sequential()
for layer in range(extra_layers):
self.extra_layers.add_module(
f"extra-layers-{layer}-{n_input_features}-conv",
nn.Conv2d(n_input_features, n_input_features, kernel_size=3, stride=1, padding=1, bias=False),
)
self.extra_layers.add_module(
f"extra-layers-{layer}-{n_input_features}-batchnorm", nn.BatchNorm2d(n_input_features)
)
self.extra_layers.add_module(
f"extra-layers-{layer}-{n_input_features}-relu", nn.LeakyReLU(0.2, inplace=True)
)
# Final layers
self.final_layers = nn.Sequential()
self.final_layers.add_module(
f"final-{n_input_features}-{num_input_channels}-convt",
nn.ConvTranspose2d(
n_input_features,
num_input_channels,
kernel_size=4,
stride=2,
padding=1,
bias=False,
),
)
self.final_layers.add_module(f"final-{num_input_channels}-tanh", nn.Tanh())
[docs] def forward(self, input_tensor: Tensor) -> Tensor:
"""Return generated image."""
output = self.latent_input(input_tensor)
output = self.inverse_pyramid(output)
output = self.extra_layers(output)
output = self.final_layers(output)
return output
[docs]class Discriminator(nn.Module):
"""Discriminator.
Made of only one encoder layer which takes x and x_hat to produce a score.
Args:
input_size (tuple[int, int]): Input image size.
num_input_channels (int): Number of image channels.
n_features (int): Number of feature maps in each convolution layer.
extra_layers (int, optional): Add extra intermediate layers. Defaults to 0.
"""
def __init__(
self, input_size: tuple[int, int], num_input_channels: int, n_features: int, extra_layers: int = 0
) -> None:
super().__init__()
encoder = Encoder(input_size, 1, num_input_channels, n_features, extra_layers)
layers = []
for block in encoder.children():
if isinstance(block, nn.Sequential):
layers.extend(list(block.children()))
else:
layers.append(block)
self.features = nn.Sequential(*layers[:-1])
self.classifier = nn.Sequential(layers[-1])
self.classifier.add_module("Sigmoid", nn.Sigmoid())
[docs] def forward(self, input_tensor: Tensor) -> tuple[Tensor, Tensor]:
"""Return class of object and features."""
features = self.features(input_tensor)
classifier = self.classifier(features)
classifier = classifier.view(-1, 1).squeeze(1)
return classifier, features
[docs]class Generator(nn.Module):
"""Generator model.
Made of an encoder-decoder-encoder architecture.
Args:
input_size (tuple[int, int]): Size of input data.
latent_vec_size (int): Dimension of latent vector produced between the first encoder-decoder.
num_input_channels (int): Number of channels in input image.
n_features (int): Number of feature maps in each convolution layer.
extra_layers (int, optional): Extra intermediate layers in the encoder/decoder. Defaults to 0.
add_final_conv_layer (bool, optional): Add a final convolution layer in the decoder. Defaults to True.
"""
def __init__(
self,
input_size: tuple[int, int],
latent_vec_size: int,
num_input_channels: int,
n_features: int,
extra_layers: int = 0,
add_final_conv_layer: bool = True,
) -> None:
super().__init__()
self.encoder1 = Encoder(
input_size, latent_vec_size, num_input_channels, n_features, extra_layers, add_final_conv_layer
)
self.decoder = Decoder(input_size, latent_vec_size, num_input_channels, n_features, extra_layers)
self.encoder2 = Encoder(
input_size, latent_vec_size, num_input_channels, n_features, extra_layers, add_final_conv_layer
)
[docs] def forward(self, input_tensor: Tensor) -> tuple[Tensor, Tensor, Tensor]:
"""Return generated image and the latent vectors."""
latent_i = self.encoder1(input_tensor)
gen_image = self.decoder(latent_i)
latent_o = self.encoder2(gen_image)
return gen_image, latent_i, latent_o
[docs]class GanomalyModel(nn.Module):
"""Ganomaly Model.
Args:
input_size (tuple[int, int]): Input dimension.
num_input_channels (int): Number of input channels.
n_features (int): Number of features layers in the CNNs.
latent_vec_size (int): Size of autoencoder latent vector.
extra_layers (int, optional): Number of extra layers for encoder/decoder. Defaults to 0.
add_final_conv_layer (bool, optional): Add convolution layer at the end. Defaults to True.
"""
def __init__(
self,
input_size: tuple[int, int],
num_input_channels: int,
n_features: int,
latent_vec_size: int,
extra_layers: int = 0,
add_final_conv_layer: bool = True,
) -> None:
super().__init__()
self.generator: Generator = Generator(
input_size=input_size,
latent_vec_size=latent_vec_size,
num_input_channels=num_input_channels,
n_features=n_features,
extra_layers=extra_layers,
add_final_conv_layer=add_final_conv_layer,
)
self.discriminator: Discriminator = Discriminator(
input_size=input_size,
num_input_channels=num_input_channels,
n_features=n_features,
extra_layers=extra_layers,
)
self.weights_init(self.generator)
self.weights_init(self.discriminator)
[docs] @staticmethod
def weights_init(module: nn.Module) -> None:
"""Initialize DCGAN weights.
Args:
module (nn.Module): [description]
"""
classname = module.__class__.__name__
if classname.find("Conv") != -1:
nn.init.normal_(module.weight.data, 0.0, 0.02)
elif classname.find("BatchNorm") != -1:
nn.init.normal_(module.weight.data, 1.0, 0.02)
nn.init.constant_(module.bias.data, 0)
[docs] def forward(self, batch: Tensor) -> tuple[Tensor, Tensor, Tensor, Tensor] | Tensor:
"""Get scores for batch.
Args:
batch (Tensor): Images
Returns:
Tensor: Regeneration scores.
"""
padded_batch = pad_nextpow2(batch)
fake, latent_i, latent_o = self.generator(padded_batch)
if self.training:
return padded_batch, fake, latent_i, latent_o
return torch.mean(torch.pow((latent_i - latent_o), 2), dim=1).view(-1) # convert nx1x1 to n