Source code for benchpots.datasets.nl_benchmarks

"""
Preprocessing func for nonlinear benchmarks.

"""

# Created by Sikai Zhang <matthew.szhang91@gmail.com>
# License: BSD-3-Clause

import nonlinear_benchmarks
import numpy as np
from sklearn.preprocessing import StandardScaler
from typing import Any, Optional, Sequence, Union

from ..utils.logging import logger, print_final_dataset_info
from ..utils.missingness import create_missingness
from ..utils.sliding import sliding_window
from ..utils.task_type import convert_processed_dataset_by_task_type


[docs] def preprocess_nl_benchmarks( dataset_name: str, rate: float, n_steps: int, pattern: str = "point", random_state: Optional[int] = None, task_type: str = "imputation", n_pred_steps: int = 1, forecast_feature_indices: Optional[Union[int, Sequence[int]]] = None, **kwargs: Any, ) -> dict: """Load and preprocess the dataset from nonlinear benchmarks. Parameters ---------- dataset_name: The name of the nonlinear benchmark dataset to be loaded. Must be one of [ "EMPS", "CED", "WienerHammerBenchMark", "Silverbox", "F16", "ParWH", "Cascaded_Tanks", ]. rate: The missing rate. n_steps: The number of time steps to in the generated data samples. Also the window size of the sliding window. pattern: The missing pattern to apply to the dataset. Must be one of ['point', 'subseq', 'block']. random_state: Controls the randomness of missingness generation. Pass an int for reproducible missingness masks across runs. task_type: Task type for postprocessing. Supported values are ['imputation', 'forecasting', 'classification', 'clustering', 'anomaly_detection']. n_pred_steps: Forecasting horizon. Effective only when task_type is 'forecasting'. forecast_feature_indices: Target feature indices for forecasting labels. If None, all features are used. Returns ------- processed_dataset : A dictionary containing the processed nonlinear benchmark datasets. """ assert 0 <= rate < 1, f"rate must be in [0, 1), but got {rate}" assert n_steps > 0, f"sample_n_steps must be larger than 0, but got {n_steps}" if dataset_name == "EMPS": train_val, test = nonlinear_benchmarks.EMPS() elif dataset_name == "CED": train_val, test = nonlinear_benchmarks.CED() elif dataset_name == "WienerHammerBenchMark": train_val, test = nonlinear_benchmarks.WienerHammerBenchMark() elif dataset_name == "Silverbox": train_val, test = nonlinear_benchmarks.Silverbox() elif dataset_name == "F16": train_val, test = nonlinear_benchmarks.F16() elif dataset_name == "ParWH": train_val, test = nonlinear_benchmarks.ParWH() elif dataset_name == "Cascaded_Tanks": train_val, test = nonlinear_benchmarks.Cascaded_Tanks() elif dataset_name == "BoucWen": train_val, test = nonlinear_benchmarks.not_splitted_benchmarks.BoucWen() elif dataset_name == "WienerHammerstein_Process_Noise": # The first dataset is generated by multisine exciations, which is more suitable for test # The second dataset is generated by sinesweep exciations, which is more suitable for training test, train_val = nonlinear_benchmarks.not_splitted_benchmarks.WienerHammerstein_Process_Noise() elif dataset_name == "Industrial_robot": train_val, test = nonlinear_benchmarks.not_splitted_benchmarks.Industrial_robot() else: raise ValueError( "dataset_name must be one of [" "'EMPS', 'CED', 'WienerHammerBenchMark'," "'Silverbox', 'F16', 'ParWH', 'Cascaded_Tanks'," "'BoucWen', 'WienerHammerstein_Process_Noise', 'Industrial_robot'], " f"but got {dataset_name}." ) if not isinstance(train_val, (tuple, list)): train_val = (train_val,) if not isinstance(test, (tuple, list)): test = (test,) if dataset_name in ["BoucWen", "WienerHammerstein_Process_Noise", "Industrial_robot"]: dt = 1.0 init_state_size = 50 else: dt = float(train_val[0].sampling_time) init_state_size = test[0].state_initialization_window_length validation_size = 0.2 train_val_all = [np.c_[x.u, x.y] for x in train_val] test_all = [np.c_[x.u, x.y] for x in test] train_all = [x[: round(len(x) * validation_size)] for x in train_val_all] val_all = [x[round(len(x) * validation_size) :] for x in train_val_all] scaler = StandardScaler() scaler.fit(np.vstack(train_all)) train_X = np.vstack([sliding_window(scaler.transform(x), n_steps) for x in train_all]) val_X = np.vstack([sliding_window(scaler.transform(x), n_steps) for x in val_all]) test_X = np.vstack([sliding_window(scaler.transform(x), n_steps) for x in test_all]) # assemble the final processed data into a dictionary processed_dataset = { # general info "n_steps": n_steps, "n_features": test_X.shape[-1], "scaler": scaler, # Sampling time (delta time) in seconds "dt": dt, # The maximum size of the initial state (i.e., y[:init_state_size]) recommended for use in prediction. "init_state_size": init_state_size, # train set "train_X": train_X, # val set "val_X": val_X, # test set "test_X": test_X, } processed_dataset = convert_processed_dataset_by_task_type( processed_dataset, task_type=task_type, n_pred_steps=n_pred_steps, forecast_feature_indices=forecast_feature_indices, ) if rate > 0: if random_state is not None and "random_state" not in kwargs: kwargs["random_state"] = random_state # hold out ground truth in the original data for evaluation train_X_ori = processed_dataset["train_X"] val_X_ori = processed_dataset["val_X"] test_X_ori = processed_dataset["test_X"] # mask values in the train set to keep the same with below validation and test sets train_X = create_missingness(processed_dataset["train_X"], rate, pattern, **kwargs) # mask values in the validation set as ground truth val_X = create_missingness(processed_dataset["val_X"], rate, pattern, **kwargs) # mask values in the test set as ground truth test_X = create_missingness(processed_dataset["test_X"], rate, pattern, **kwargs) processed_dataset["train_X"] = train_X processed_dataset["train_X_ori"] = train_X_ori processed_dataset["val_X"] = val_X processed_dataset["val_X_ori"] = val_X_ori processed_dataset["test_X"] = test_X processed_dataset["test_X_ori"] = test_X_ori else: logger.warning("rate is 0, no missing values are artificially added.") print_final_dataset_info(processed_dataset) return processed_dataset