Source code for pypots.nn.modules.ts2vec.backbone

""" """

# Created by Wenjie Du <wenjay.du@gmail.com>
# License: BSD-3-Clause

from typing import Optional

import numpy as np
import sklearn
import torch
import torch.nn.functional as F
from packaging import version
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import Ridge
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from torch import nn

from .layers import DilatedConvEncoder
from .utils import (
    torch_pad_nan,
    generate_binomial_mask,
    generate_continuous_mask,
)

MASK_MODES = ["binomial", "continuous", "all_true", "all_false", "mask_last"]


[docs] class TS2VecEncoder(nn.Module): def __init__( self, n_features: int, n_pred_features: int, d_hidden: int, n_layers: int, mask_mode: str = "binomial", ): super().__init__() assert mask_mode in MASK_MODES, f"mask_mode should be one of {MASK_MODES}" self.n_features = n_features self.n_pred_features = n_pred_features self.d_hidden = d_hidden self.mask_mode = mask_mode self.input_fc = nn.Linear(n_features, d_hidden) self.feature_extractor = DilatedConvEncoder(d_hidden, [d_hidden] * n_layers + [n_pred_features], kernel_size=3) self.repr_dropout = nn.Dropout(p=0.1)
[docs] def forward(self, x, mask=None): # x: B x T x n_features nan_mask = ~x.isnan().any(axis=-1) x[~nan_mask] = 0 x = self.input_fc(x) # B x T x Ch # generate & apply mask if mask is None: if self.training: mask = self.mask_mode else: mask = "all_true" if mask == "binomial": mask = generate_binomial_mask(x.size(0), x.size(1)).to(x.device) elif mask == "continuous": mask = generate_continuous_mask(x.size(0), x.size(1)).to(x.device) elif mask == "all_true": mask = x.new_full((x.size(0), x.size(1)), True, dtype=torch.bool) elif mask == "all_false": mask = x.new_full((x.size(0), x.size(1)), False, dtype=torch.bool) elif mask == "mask_last": mask = x.new_full((x.size(0), x.size(1)), True, dtype=torch.bool) mask[:, -1] = False mask &= nan_mask x[~mask] = 0 # conv encoder x = x.transpose(1, 2) # B x Ch x T x = self.repr_dropout(self.feature_extractor(x)) # B x Co x T x = x.transpose(1, 2) # B x T x Co return x
def _eval_with_pooling(self, x, mask=None, slicing=None, encoding_window=None): out = self.forward(x, mask) if encoding_window == "full_series": if slicing is not None: out = out[:, slicing] out = F.max_pool1d( out.transpose(1, 2), kernel_size=out.size(1), ).transpose(1, 2) elif isinstance(encoding_window, int): out = F.max_pool1d( out.transpose(1, 2), kernel_size=encoding_window, stride=1, padding=encoding_window // 2 ).transpose(1, 2) if encoding_window % 2 == 0: out = out[:, :-1] if slicing is not None: out = out[:, slicing] elif encoding_window == "multiscale": p = 0 reprs = [] while (1 << p) + 1 < out.size(1): t_out = F.max_pool1d( out.transpose(1, 2), kernel_size=(1 << (p + 1)) + 1, stride=1, padding=1 << p ).transpose(1, 2) if slicing is not None: t_out = t_out[:, slicing] reprs.append(t_out) p += 1 out = torch.cat(reprs, dim=-1) else: if slicing is not None: out = out[:, slicing] return out.cpu()
[docs] @torch.no_grad() def encode( self, x: torch.Tensor, mask: Optional[str] = None, encoding_window: Optional[str] = None, causal: bool = False, sliding_length: Optional[int] = None, sliding_padding: int = 0, ) -> torch.Tensor: """Compute representations using the trained model. Parameters ---------- x: This should have a shape of (n_samples, n_steps, n_features). All missing data should be set to NaN. mask: The mask used by encoder can be specified with this parameter. This can be set to 'binomial', 'continuous', 'all_true', 'all_false' or 'mask_last'. encoding_window: When this param is specified, the computed representation would the max pooling over this window. This can be set to 'full_series', 'multiscale' or an integer specifying the pooling kernel size. causal: When this param is set to True, the future information would not be encoded into representation of each timestamp. sliding_length: The length of sliding window. When this param is specified, a sliding inference would be applied on the time series. sliding_padding: This param specifies the contextual data length used for inference every sliding windows. Returns ------- repr: The representations for data. """ n_samples, n_steps, _ = x.shape if sliding_length is not None: reprs = [] for i in range(0, n_steps, sliding_length): left = i - sliding_padding right = i + sliding_length + (sliding_padding if not causal else 0) x_sliding = torch_pad_nan( x[:, max(left, 0) : min(right, n_steps)], left=-left if left < 0 else 0, right=right - n_steps if right > n_steps else 0, dim=1, ) out = self._eval_with_pooling( x_sliding, mask, slicing=slice(sliding_padding, sliding_padding + sliding_length), encoding_window=encoding_window, ) reprs.append(out) reprs = torch.cat(reprs, dim=1) if encoding_window == "full_series": reprs = F.max_pool1d( reprs.transpose(1, 2).contiguous(), kernel_size=reprs.size(1), ).squeeze(1) else: reprs = self._eval_with_pooling(x, mask, encoding_window=encoding_window) if encoding_window == "full_series": reprs = reprs.squeeze(1) return reprs
@staticmethod def fit_svm(features, y, MAX_SAMPLES=10000): nb_classes = np.unique(y, return_counts=True)[1].shape[0] train_size = features.shape[0] svm = SVC(C=np.inf, gamma="scale") if train_size // nb_classes < 5 or train_size < 50: return svm.fit(features, y) else: grid_search = GridSearchCV( svm, { "C": [0.0001, 0.001, 0.01, 0.1, 1, 10, 100, 1000, 10000, np.inf], "kernel": ["rbf"], "degree": [3], "gamma": ["scale"], "coef0": [0], "shrinking": [True], "probability": [False], "tol": [0.001], "cache_size": [200], "class_weight": [None], "verbose": [False], "max_iter": [10000000], "decision_function_shape": ["ovr"], "random_state": [None], }, cv=5, n_jobs=5, ) # If the training set is too large, subsample MAX_SAMPLES examples if train_size > MAX_SAMPLES: split = train_test_split(features, y, train_size=MAX_SAMPLES, random_state=0, stratify=y) features = split[0] y = split[2] grid_search.fit(features, y) return grid_search.best_estimator_ @staticmethod def fit_lr(features, y, MAX_SAMPLES=100000): # If the training set is too large, subsample MAX_SAMPLES examples if features.shape[0] > MAX_SAMPLES: split = train_test_split(features, y, train_size=MAX_SAMPLES, random_state=0, stratify=y) features = split[0] y = split[2] if version.parse(sklearn.__version__) >= version.parse("1.7"): # To fix TypeError: LogisticRegression.__init__() got an unexpected keyword argument 'multi_class' # multi_class deprecated since sklearn 1.7 pipe = make_pipeline( StandardScaler(), sklearn.multiclass.OneVsRestClassifier(LogisticRegression(random_state=0, max_iter=1000000)), ) else: pipe = make_pipeline(StandardScaler(), LogisticRegression(random_state=0, max_iter=1000000)) pipe.fit(features, y) return pipe @staticmethod def fit_knn(features, y): pipe = make_pipeline(StandardScaler(), KNeighborsClassifier(n_neighbors=1)) pipe.fit(features, y) return pipe @staticmethod def fit_ridge(train_features, train_y, valid_features, valid_y, MAX_SAMPLES=100000): # If the training set is too large, subsample MAX_SAMPLES examples if train_features.shape[0] > MAX_SAMPLES: split = train_test_split(train_features, train_y, train_size=MAX_SAMPLES, random_state=0) train_features = split[0] train_y = split[2] if valid_features.shape[0] > MAX_SAMPLES: split = train_test_split(valid_features, valid_y, train_size=MAX_SAMPLES, random_state=0) valid_features = split[0] valid_y = split[2] alphas = [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000] valid_results = [] for alpha in alphas: lr = Ridge(alpha=alpha).fit(train_features, train_y) valid_pred = lr.predict(valid_features) score = np.sqrt(((valid_pred - valid_y) ** 2).mean()) + np.abs(valid_pred - valid_y).mean() valid_results.append(score) best_alpha = alphas[np.argmin(valid_results)] lr = Ridge(alpha=best_alpha) lr.fit(train_features, train_y) return lr