Source code for pypots.anomaly_detection.transformer.model
"""
The implementation of Transformer for the partially-observed time-series anomaly detection task.
"""
# Created by Yiyuan Yang <yyy1997sjz@gmail.com>
# License: BSD-3-Clause
from typing import Union, Optional
import torch
from torch.utils.data import DataLoader
from ..base import BaseNNDetector
from ...data.checking import key_in_data_set
from ...imputation.transformer.core import _Transformer
from ...imputation.saits.data import DatasetForSAITS
from ...nn.modules.loss import Criterion, MAE, MSE
from ...optim.adam import Adam
from ...optim.base import Optimizer
from ...utils.logging import logger
[docs]
class Transformer(BaseNNDetector):
"""The PyTorch implementation of the Transformer model for the anomaly detection task.
Transformer is originally proposed by Vaswani et al. in :cite:`vaswani2017Transformer`,
and gets re-implemented for partially-observed time-series modeling by Du et al. in :cite:`du2023SAITS`.
Here we adapt it specifically for anomaly detection tasks.
Parameters
----------
n_steps : int
The number of time steps in each input time-series sample.
n_features : int
The number of features (dimensions) in each input time-series sample.
anomaly_rate : float
The expected anomaly rate within the dataset, between (0, 1). Used to determine detection thresholds.
n_layers : int
The number of stacked Transformer encoder layers.
d_model : int
The dimensionality of inputs and outputs inside the model's backbone.
It is also the input dimension to the multi-head self-attention blocks.
n_heads : int
The number of parallel heads used in multi-head self-attention mechanisms.
d_k : int
The dimensionality of key and query vectors in the attention mechanism.
Must satisfy d_model = n_heads * d_k.
d_v : int
The dimensionality of value vectors in the attention mechanism.
d_ffn : int
The dimensionality of the hidden layer inside the position-wise Feed-Forward Network (FFN).
dropout : float, optional
Dropout probability applied across fully connected layers. Default is 0.
attn_dropout : float, optional
Dropout probability applied inside attention mechanisms. Default is 0.
ORT_weight : int, optional
Weight coefficient for the ORT (Observation Reconstruction Task) loss component.
MIT_weight : int, optional
Weight coefficient for the MIT (Missingness Imputation Task) loss component.
batch_size : int, optional
Number of samples in each training batch. Default is 32.
epochs : int, optional
Maximum number of epochs to train the model. Default is 100.
patience : int, optional
Number of epochs to wait without improvement before early stopping is triggered.
If None, early stopping is disabled.
training_loss : Criterion or type, optional
Loss function used for training. If not specified, defaults to Mean Absolute Error (MAE).
validation_metric : Criterion or type, optional
Metric used to evaluate model performance on validation set. Defaults to Mean Squared Error (MSE).
optimizer : Optimizer or type, optional
Optimizer used for training the model. Defaults to a custom implementation of Adam.
num_workers : int, optional
Number of worker subprocesses to use for data loading. 0 means no subprocesses (i.e., main process only).
device : str, torch.device, or list, optional
Device(s) on which the model runs, e.g., 'cuda:0', 'cpu', or list of CUDA devices for multi-GPU training.
If None, the model automatically selects GPU if available, otherwise CPU.
saving_path : str, optional
Directory path for saving trained model checkpoints and TensorBoard logs. No saving if None.
model_saving_strategy : str or None, optional
Strategy for saving model checkpoints:
- None: Do not save any model.
- "best": Save only the best-performing model.
- "better": Save model when validation performance improves.
- "all": Save model at every epoch.
verbose : bool, optional
Whether to print detailed training logs during model training. Default is True.
"""
def __init__(
self,
n_steps: int,
n_features: int,
anomaly_rate: float,
n_layers: int,
d_model: int,
n_heads: int,
d_k: int,
d_v: int,
d_ffn: int,
dropout: float = 0,
attn_dropout: float = 0,
ORT_weight: int = 1,
MIT_weight: int = 1,
batch_size: int = 32,
epochs: int = 100,
patience: Optional[int] = None,
training_loss: Union[Criterion, type] = MAE,
validation_metric: Union[Criterion, type] = MSE,
optimizer: Union[Optimizer, type] = Adam,
num_workers: int = 0,
device: Optional[Union[str, torch.device, list]] = None,
saving_path: str = None,
model_saving_strategy: Optional[str] = "best",
verbose: bool = True,
):
"""
Initialize the Transformer anomaly detector.
"""
# Initialize the parent class BaseNNDetector
super().__init__(
anomaly_rate=anomaly_rate,
training_loss=training_loss,
validation_metric=validation_metric,
batch_size=batch_size,
epochs=epochs,
patience=patience,
num_workers=num_workers,
device=device,
saving_path=saving_path,
model_saving_strategy=model_saving_strategy,
verbose=verbose,
)
# Validate model structure: d_model must match n_heads * d_k
if d_model != n_heads * d_k:
logger.warning(
f"‼️ d_model must equal n_heads * d_k. Received: d_model={d_model}, n_heads={n_heads}, d_k={d_k}."
)
d_model = n_heads * d_k
logger.warning(f"⚠️ d_model is reset to {d_model}")
# Save model configuration
self.n_steps = n_steps
self.n_features = n_features
self.n_layers = n_layers
self.d_model = d_model
self.n_heads = n_heads
self.d_k = d_k
self.d_v = d_v
self.d_ffn = d_ffn
self.dropout = dropout
self.attn_dropout = attn_dropout
self.ORT_weight = ORT_weight
self.MIT_weight = MIT_weight
# Instantiate the Transformer model
self.model = _Transformer(
n_steps=self.n_steps,
n_features=self.n_features,
n_layers=self.n_layers,
d_model=self.d_model,
n_heads=self.n_heads,
d_k=self.d_k,
d_v=self.d_v,
d_ffn=self.d_ffn,
dropout=self.dropout,
attn_dropout=self.attn_dropout,
ORT_weight=self.ORT_weight,
MIT_weight=self.MIT_weight,
training_loss=self.training_loss,
validation_metric=self.validation_metric,
)
# Move model to devices (CPU/GPU)
self._send_model_to_given_device()
# Print model size
self._print_model_size()
# Initialize optimizer
if isinstance(optimizer, Optimizer):
self.optimizer = optimizer
else:
self.optimizer = optimizer()
assert isinstance(self.optimizer, Optimizer)
self.optimizer.init_optimizer(self.model.parameters())
def _assemble_input_for_training(self, data: list) -> dict:
"""
Prepare input batch for training.
Returns
-------
dict
A dictionary with 'X', 'missing_mask', 'X_ori', and 'indicating_mask'.
"""
indices, X, missing_mask, X_ori, indicating_mask = self._send_data_to_given_device(data)
return {
"X": X,
"missing_mask": missing_mask,
"X_ori": X_ori,
"indicating_mask": indicating_mask,
}
def _assemble_input_for_validating(self, data: list) -> dict:
"""
Prepare input batch for validation.
Returns
-------
dict
Same as training input.
"""
return self._assemble_input_for_training(data)
def _assemble_input_for_testing(self, data: list) -> dict:
"""
Prepare input batch for testing.
Returns
-------
dict
A dictionary containing 'X' and 'missing_mask'.
"""
indices, X, missing_mask = self._send_data_to_given_device(data)
return {
"X": X,
"missing_mask": missing_mask,
}
[docs]
def fit(
self,
train_set: Union[dict, str],
val_set: Optional[Union[dict, str]] = None,
file_type: str = "hdf5",
) -> None:
"""
Train the Transformer model for anomaly detection.
Parameters
----------
train_set : dict or str
Training dataset or path to it.
val_set : dict or str, optional
Validation dataset or path to it. Must include 'X_ori'.
file_type : str, optional
File type if loading from disk. Default is "hdf5".
"""
self.train_set = train_set
# Wrap training dataset
train_dataset = DatasetForSAITS(train_set, return_X_ori=False, return_y=False, file_type=file_type)
train_dataloader = DataLoader(
train_dataset,
batch_size=self.batch_size,
shuffle=True,
num_workers=self.num_workers,
)
# Wrap validation dataset if available
val_dataloader = None
if val_set is not None:
if not key_in_data_set("X_ori", val_set):
raise ValueError("val_set must contain 'X_ori' for validation.")
val_dataset = DatasetForSAITS(val_set, return_X_ori=True, return_y=False, file_type=file_type)
val_dataloader = DataLoader(
val_dataset,
batch_size=self.batch_size,
shuffle=False,
num_workers=self.num_workers,
)
# Train the model and restore the best model
self._train_model(train_dataloader, val_dataloader)
self.model.load_state_dict(self.best_model_dict)
# Save model if necessary
self._auto_save_model_if_necessary(confirm_saving=self.model_saving_strategy == "best")