"""
Preprocessing func for nonlinear benchmarks.
"""
# Created by Sikai Zhang <matthew.szhang91@gmail.com>
# License: BSD-3-Clause
import nonlinear_benchmarks
import numpy as np
from sklearn.preprocessing import StandardScaler
from typing import Any, Optional, Sequence, Union
from ..utils.logging import logger, print_final_dataset_info
from ..utils.missingness import create_missingness
from ..utils.sliding import sliding_window
from ..utils.task_type import convert_processed_dataset_by_task_type
[docs]
def preprocess_nl_benchmarks(
dataset_name: str,
rate: float,
n_steps: int,
pattern: str = "point",
random_state: Optional[int] = None,
task_type: str = "imputation",
n_pred_steps: int = 1,
forecast_feature_indices: Optional[Union[int, Sequence[int]]] = None,
**kwargs: Any,
) -> dict:
"""Load and preprocess the dataset from nonlinear benchmarks.
Parameters
----------
dataset_name:
The name of the nonlinear benchmark dataset to be loaded.
Must be one of [
"EMPS",
"CED",
"WienerHammerBenchMark",
"Silverbox",
"F16",
"ParWH",
"Cascaded_Tanks",
].
rate:
The missing rate.
n_steps:
The number of time steps to in the generated data samples.
Also the window size of the sliding window.
pattern:
The missing pattern to apply to the dataset.
Must be one of ['point', 'subseq', 'block'].
random_state:
Controls the randomness of missingness generation.
Pass an int for reproducible missingness masks across runs.
task_type:
Task type for postprocessing. Supported values are
['imputation', 'forecasting', 'classification', 'clustering', 'anomaly_detection'].
n_pred_steps:
Forecasting horizon. Effective only when task_type is 'forecasting'.
forecast_feature_indices:
Target feature indices for forecasting labels. If None, all features are used.
Returns
-------
processed_dataset :
A dictionary containing the processed nonlinear benchmark datasets.
"""
assert 0 <= rate < 1, f"rate must be in [0, 1), but got {rate}"
assert n_steps > 0, f"sample_n_steps must be larger than 0, but got {n_steps}"
if dataset_name == "EMPS":
train_val, test = nonlinear_benchmarks.EMPS()
elif dataset_name == "CED":
train_val, test = nonlinear_benchmarks.CED()
elif dataset_name == "WienerHammerBenchMark":
train_val, test = nonlinear_benchmarks.WienerHammerBenchMark()
elif dataset_name == "Silverbox":
train_val, test = nonlinear_benchmarks.Silverbox()
elif dataset_name == "F16":
train_val, test = nonlinear_benchmarks.F16()
elif dataset_name == "ParWH":
train_val, test = nonlinear_benchmarks.ParWH()
elif dataset_name == "Cascaded_Tanks":
train_val, test = nonlinear_benchmarks.Cascaded_Tanks()
elif dataset_name == "BoucWen":
train_val, test = nonlinear_benchmarks.not_splitted_benchmarks.BoucWen()
elif dataset_name == "WienerHammerstein_Process_Noise":
# The first dataset is generated by multisine exciations, which is more suitable for test
# The second dataset is generated by sinesweep exciations, which is more suitable for training
test, train_val = nonlinear_benchmarks.not_splitted_benchmarks.WienerHammerstein_Process_Noise()
elif dataset_name == "Industrial_robot":
train_val, test = nonlinear_benchmarks.not_splitted_benchmarks.Industrial_robot()
else:
raise ValueError(
"dataset_name must be one of ["
"'EMPS', 'CED', 'WienerHammerBenchMark',"
"'Silverbox', 'F16', 'ParWH', 'Cascaded_Tanks',"
"'BoucWen', 'WienerHammerstein_Process_Noise', 'Industrial_robot'], "
f"but got {dataset_name}."
)
if not isinstance(train_val, (tuple, list)):
train_val = (train_val,)
if not isinstance(test, (tuple, list)):
test = (test,)
if dataset_name in ["BoucWen", "WienerHammerstein_Process_Noise", "Industrial_robot"]:
dt = 1.0
init_state_size = 50
else:
dt = float(train_val[0].sampling_time)
init_state_size = test[0].state_initialization_window_length
validation_size = 0.2
train_val_all = [np.c_[x.u, x.y] for x in train_val]
test_all = [np.c_[x.u, x.y] for x in test]
train_all = [x[: round(len(x) * validation_size)] for x in train_val_all]
val_all = [x[round(len(x) * validation_size) :] for x in train_val_all]
scaler = StandardScaler()
scaler.fit(np.vstack(train_all))
train_X = np.vstack([sliding_window(scaler.transform(x), n_steps) for x in train_all])
val_X = np.vstack([sliding_window(scaler.transform(x), n_steps) for x in val_all])
test_X = np.vstack([sliding_window(scaler.transform(x), n_steps) for x in test_all])
# assemble the final processed data into a dictionary
processed_dataset = {
# general info
"n_steps": n_steps,
"n_features": test_X.shape[-1],
"scaler": scaler,
# Sampling time (delta time) in seconds
"dt": dt,
# The maximum size of the initial state (i.e., y[:init_state_size]) recommended for use in prediction.
"init_state_size": init_state_size,
# train set
"train_X": train_X,
# val set
"val_X": val_X,
# test set
"test_X": test_X,
}
processed_dataset = convert_processed_dataset_by_task_type(
processed_dataset,
task_type=task_type,
n_pred_steps=n_pred_steps,
forecast_feature_indices=forecast_feature_indices,
)
if rate > 0:
if random_state is not None and "random_state" not in kwargs:
kwargs["random_state"] = random_state
# hold out ground truth in the original data for evaluation
train_X_ori = processed_dataset["train_X"]
val_X_ori = processed_dataset["val_X"]
test_X_ori = processed_dataset["test_X"]
# mask values in the train set to keep the same with below validation and test sets
train_X = create_missingness(processed_dataset["train_X"], rate, pattern, **kwargs)
# mask values in the validation set as ground truth
val_X = create_missingness(processed_dataset["val_X"], rate, pattern, **kwargs)
# mask values in the test set as ground truth
test_X = create_missingness(processed_dataset["test_X"], rate, pattern, **kwargs)
processed_dataset["train_X"] = train_X
processed_dataset["train_X_ori"] = train_X_ori
processed_dataset["val_X"] = val_X
processed_dataset["val_X_ori"] = val_X_ori
processed_dataset["test_X"] = test_X
processed_dataset["test_X_ori"] = test_X_ori
else:
logger.warning("rate is 0, no missing values are artificially added.")
print_final_dataset_info(processed_dataset)
return processed_dataset