"""
Preprocessing func for the generated random walk dataset.
"""
# Created by Wenjie Du <wenjay.du@gmail.com>
# License: BSD-3-Clause
import math
from typing import Any, Optional, Sequence, Tuple, Union
import numpy as np
from pygrinder import mcar
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state
from ..utils.logging import logger, print_final_dataset_info
from ..utils.missingness import create_missingness
from ..utils.task_type import convert_processed_dataset_by_task_type
def gene_complete_random_walk(
n_samples: int = 1000,
n_steps: int = 24,
n_features: int = 10,
mu: float = 0.0,
std: float = 1.0,
random_state: Optional[int] = None,
) -> np.ndarray:
"""Generate complete random walk time-series data, i.e. having no missing values.
Parameters
----------
n_samples : int, default=1000
The number of training time-series samples to generate.
n_steps: int, default=24
The number of time steps (length) of generated time-series samples.
n_features : int, default=10
The number of features (dimensions) of generated time-series samples.
mu : float, default=0.0
Mean of the normal distribution, which random walk steps are sampled from.
std : float, default=1.0
Standard deviation of the normal distribution, which random walk steps are sampled from.
random_state : int, default=None
Random seed for data generation.
Returns
-------
ts_samples: array, shape of [n_samples, n_steps, n_features]
Generated random walk time series.
"""
seed = check_random_state(random_state)
ts_samples = np.zeros([n_samples, n_steps, n_features])
random_values = seed.randn(n_samples, n_steps, n_features) * std + mu
ts_samples[:, 0, :] = random_values[:, 0, :]
for t in range(1, n_steps):
ts_samples[:, t, :] = ts_samples[:, t - 1, :] + random_values[:, t, :]
ts_samples = np.asarray(ts_samples)
return ts_samples
def gene_complete_random_walk_with_anomalies(
n_samples: int = 1000,
n_steps: int = 24,
n_features: int = 10,
mu: float = 0.0,
std: float = 1.0,
anomaly_rate: float = 0.1,
anomaly_scale_factor: float = 2.0,
random_state: Optional[int] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate random walk time-series data for the anomaly-detection task.
Parameters
----------
n_samples : int, default=1000
The number of training time-series samples to generate.
n_features : int, default=10
The number of features (dimensions) of generated time-series samples.
n_steps: int, default=24
The number of time steps (length) of generated time-series samples.
mu : float, default=0.0
Mean of the normal distribution, which random walk steps are sampled from.
std : float, default=1.0
Standard deviation of the normal distribution, which random walk steps are sampled from.
anomaly_rate : float, default=0.1
Proportion of anomaly samples in all samples.
anomaly_scale_factor : float, default=2.0
Scale factor for value scaling to create anomaly points in time series samples.
random_state : int, default=None
Random seed for data generation.
Returns
-------
X : array, shape of [n_samples, n_steps, n_features]
Generated time-series data.
y : array, shape of [n_samples]
Labels indicating if time-series samples are anomalies.
"""
assert 0 < anomaly_rate < 1, f"anomaly_proportion should be >0 and <1, but got {anomaly_rate}"
seed = check_random_state(random_state)
n_total_steps = n_samples * n_steps
X = seed.randn(n_total_steps, n_features) * std + mu
n_anomaly = math.floor(n_total_steps * anomaly_rate)
anomaly_indices = seed.choice(n_total_steps, size=n_anomaly, replace=False)
flatten_X = X.flatten()
min_val = flatten_X.min()
max_val = flatten_X.max()
max_difference = min_val - max_val
for a_i in anomaly_indices:
anomaly_sample = X[a_i]
# which feature to be anomaly
feat_idx = seed.choice(a=n_features, size=1, replace=False)
anomaly_sample[feat_idx] = mu + seed.uniform(
low=min_val - anomaly_scale_factor * max_difference,
high=max_val + anomaly_scale_factor * max_difference,
)
X[a_i] = anomaly_sample
# create labels
y = np.zeros(n_total_steps)
y[anomaly_indices] = 1
X = X.reshape(n_samples, n_steps, n_features)
y = y.reshape(n_samples, n_steps, 1)
# shuffling
indices = np.arange(n_samples)
seed.shuffle(indices)
X = X[indices]
y = y[indices]
return X, y
def gene_complete_random_walk_for_classification(
n_classes: int = 2,
n_samples_each_class: int = 500,
n_steps: int = 24,
n_features: int = 10,
anomaly_rate: float = 0,
shuffle: bool = True,
random_state: Optional[int] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Generate complete random walk time-series data for the classification task.
Parameters
----------
n_classes : int, must >=1, default=2
Number of classes (types) of the generated data.
n_samples_each_class : int, default=500
Number of samples for each class to generate.
n_steps : int, default=24
Number of time steps in each sample.
n_features : int, default=10
Number of features.
anomaly_rate : float, default=0
Proportion of anomaly samples in all samples.
Default as 0 means no anomaly samples are generated.
shuffle : bool, default=True
Whether to shuffle generated samples.
If not, you can separate samples of each class according to `n_samples_each_class`.
For example,
X_class0=X[:n_samples_each_class],
X_class1=X[n_samples_each_class:n_samples_each_class*2]
random_state : int, default=None
Random seed for data generation.
Returns
-------
X : array, shape of [n_samples, n_steps, n_features]
Generated time-series data.
y : array, shape of [n_samples]
Labels indicating classes of time-series samples.
"""
assert n_classes > 1, f"n_classes should be >1, but got {n_classes}"
assert 0 <= anomaly_rate < 1, f"anomaly_rate should be in [0,1), but got {anomaly_rate}"
ts_collector = []
label_collector = []
anomaly_label_collector = []
mu = 0
std = 1
for c_ in range(n_classes):
if anomaly_rate > 0:
ts_samples, anomaly_labels = gene_complete_random_walk_with_anomalies(
n_samples=n_samples_each_class,
n_steps=n_steps,
n_features=n_features,
mu=mu,
std=std,
anomaly_rate=anomaly_rate,
random_state=random_state,
)
anomaly_label_collector.extend(anomaly_labels)
else:
ts_samples = gene_complete_random_walk(
n_samples=n_samples_each_class,
n_steps=n_steps,
n_features=n_features,
mu=mu,
std=std,
random_state=random_state,
)
label_samples = np.asarray([1 for _ in range(n_samples_each_class)]) * c_
ts_collector.extend(ts_samples)
label_collector.extend(label_samples)
mu += 1
X = np.asarray(ts_collector)
y = np.asarray(label_collector)
anomaly_y = np.asarray(anomaly_label_collector)
# if shuffling, then shuffle the order of samples
if shuffle:
rng = check_random_state(random_state)
indices = np.arange(len(X))
rng.shuffle(indices)
X = X[indices]
y = y[indices]
anomaly_y = anomaly_y[indices] if len(anomaly_y) > 0 else anomaly_y
return X, y, anomaly_y
[docs]
def preprocess_random_walk(
n_steps: int = 24,
n_features: int = 10,
n_classes: int = 2,
n_samples_each_class: int = 1000,
anomaly_rate: float = 0,
missing_rate: float = 0.1,
pattern: str = "point",
random_state: Optional[int] = None,
task_type: str = "imputation",
n_pred_steps: int = 1,
forecast_feature_indices: Optional[Union[int, Sequence[int]]] = None,
**kwargs: Any,
) -> dict:
"""Generate a random-walk data.
Parameters
----------
n_steps : int, default=24
Number of time steps in each sample.
n_features : int, default=10
Number of features.
n_classes : int, default=2
Number of classes (types) of the generated data.
n_samples_each_class : int, default=1000
Number of samples for each class to generate.
anomaly_rate : float, default=0
Proportion of anomaly samples in all samples.
Default as 0 means no anomaly samples are generated.
missing_rate : float, default=0.1
The rate of randomly missing values to generate, should be in [0,1).
pattern :
The missing pattern to apply to the dataset.
Must be one of ['point', 'subseq', 'block'].
random_state:
Controls the randomness for generated samples and train/validation/test splits.
Pass an int for reproducible outputs across runs.
task_type:
Task type for postprocessing. Supported values are
['imputation', 'forecasting', 'classification', 'clustering', 'anomaly_detection'].
n_pred_steps:
Forecasting horizon. Effective only when task_type is 'forecasting'.
forecast_feature_indices:
Target feature indices for forecasting labels. If None, all features are used.
Returns
-------
data: dict,
A dictionary containing the generated data.
"""
assert 0 <= anomaly_rate < 1, f"anomaly_rate should be in [0,1), but got {anomaly_rate}"
assert 0 <= missing_rate < 1, f"missing_rate must be in [0,1), but got {missing_rate}"
# generate samples
X, y, anomaly_y = gene_complete_random_walk_for_classification(
n_classes=n_classes,
n_samples_each_class=n_samples_each_class,
n_steps=n_steps,
n_features=n_features,
anomaly_rate=anomaly_rate,
random_state=random_state,
)
# split into train/val/test sets
if anomaly_rate > 0:
train_X, test_X, train_y, test_y, train_anomaly_y, test_anomaly_y = train_test_split(
X, y, anomaly_y, test_size=0.2, random_state=random_state
)
train_X, val_X, train_y, val_y, train_anomaly_y, val_anomaly_y = train_test_split(
train_X, train_y, train_anomaly_y, test_size=0.2, random_state=random_state
)
else:
train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.2, random_state=random_state)
train_X, val_X, train_y, val_y = train_test_split(train_X, train_y, test_size=0.2, random_state=random_state)
if missing_rate > 0:
# create random missing values
train_X_ori = train_X
train_X = mcar(train_X, missing_rate)
# test set is left to mask after normalization
train_X = train_X.reshape(-1, n_features)
val_X = val_X.reshape(-1, n_features)
test_X = test_X.reshape(-1, n_features)
# normalization
scaler = StandardScaler()
train_X = scaler.fit_transform(train_X)
val_X = scaler.transform(val_X)
test_X = scaler.transform(test_X)
# reshape into time series samples
train_X = train_X.reshape(-1, n_steps, n_features)
val_X = val_X.reshape(-1, n_steps, n_features)
test_X = test_X.reshape(-1, n_steps, n_features)
if missing_rate > 0:
train_X_ori = scaler.transform(train_X_ori.reshape(-1, n_features)).reshape(-1, n_steps, n_features)
processed_dataset = {
# general info
"n_classes": n_classes,
"n_steps": n_steps,
"n_features": n_features,
"scaler": scaler,
# train set
"train_X": train_X,
"train_y": train_y,
# val set
"val_X": val_X,
"val_y": val_y,
# test set
"test_X": test_X,
"test_y": test_y,
}
if anomaly_rate > 0:
processed_dataset["train_anomaly_y"] = train_anomaly_y
processed_dataset["val_anomaly_y"] = val_anomaly_y
processed_dataset["test_anomaly_y"] = test_anomaly_y
if missing_rate > 0:
# hold out ground truth in the original data for evaluation
processed_dataset["train_X_ori"] = train_X_ori
processed_dataset["val_X_ori"] = val_X
processed_dataset["test_X_ori"] = test_X
processed_dataset = convert_processed_dataset_by_task_type(
processed_dataset,
task_type=task_type,
n_pred_steps=n_pred_steps,
forecast_feature_indices=forecast_feature_indices,
)
if missing_rate > 0:
# mask values in the train set to keep the same with below validation and test sets
train_X = create_missingness(processed_dataset["train_X"], missing_rate, pattern, **kwargs)
# mask values in the validation set as ground truth
val_X = create_missingness(processed_dataset["val_X"], missing_rate, pattern, **kwargs)
# mask values in the test set as ground truth
test_X = create_missingness(processed_dataset["test_X"], missing_rate, pattern, **kwargs)
processed_dataset["train_X"] = train_X
processed_dataset["val_X"] = val_X
processed_dataset["test_X"] = test_X
else:
logger.warning("rate is 0, no missing values are artificially added.")
print_final_dataset_info(processed_dataset)
return processed_dataset