Source code for benchpots.datasets.physionet_2019

"""
Preprocessing func for the dataset PhysioNet2019.

"""

# Created by Yiyuan Yang <yyy1997sjz@gmail.com> and Wenjie Du <wenjay.du@gmail.com>
# License: BSD-3-Clause

from typing import Any, Optional, Sequence, Union

import numpy as np
import pandas as pd
import tsdb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from ..utils.logging import logger, print_final_dataset_info
from ..utils.missingness import create_missingness
from ..utils.task_type import convert_processed_dataset_by_task_type


[docs] def preprocess_physionet2019( subset: str, rate: float, pattern: str = "point", features: Optional[list] = None, random_state: Optional[int] = None, task_type: str = "imputation", n_pred_steps: int = 1, forecast_feature_indices: Optional[Union[int, Sequence[int]]] = None, **kwargs: Any, ) -> dict: """Load and preprocess the dataset PhysionNet2019. Parameters ---------- subset: The name of the subset dataset to be loaded. Must be one of ['all', 'training_setA', 'training_setB']. rate: The missing rate. pattern: The missing pattern to apply to the dataset. Must be one of ['point', 'subseq', 'block']. features: The features to be used in the dataset. If None, all features except the static features will be used. random_state: Controls the randomness of the train/validation/test split. Pass an int for reproducible splits across runs. task_type: Task type for postprocessing. Supported values are ['imputation', 'forecasting', 'classification', 'clustering', 'anomaly_detection']. n_pred_steps: Forecasting horizon. Effective only when task_type is 'forecasting'. forecast_feature_indices: Target feature indices for forecasting labels. If None, all features are used. Returns ------- processed_dataset : A dictionary containing the processed PhysionNet2019. """ def apply_func( df_temp: pd.DataFrame, ) -> Optional[pd.DataFrame]: # pad and truncate to set the max length of samples as 48 if len(df_temp) < 48: return None else: df_temp = df_temp.set_index("ICULOS").sort_index().reset_index() df_temp = df_temp.iloc[:48] # truncate return df_temp all_subsets = ["all", "training_setA", "training_setB"] assert subset in all_subsets, f"subset should be one of {all_subsets}, but got {subset}" assert 0 <= rate < 1, f"rate must be in [0, 1), but got {rate}" # read the raw data data = tsdb.load("physionet_2019") all_features = set(data["training_setA"].columns) label_feature = "SepsisLabel" # feature SepsisLabel contains labels indicating whether patients get sepsis time_feature = "ICULOS" # ICU length-of-stay (hours since ICU admit) if subset != "all": df = data[subset] X = df.reset_index(drop=True) else: df = pd.concat([data["training_setA"], data["training_setB"]], sort=True) X = df.reset_index(drop=True) if features is None: # if features are not specified, we use all features except the static features, e.g. age X = X.drop(data["static_features"], axis=1) else: # if features are specified by users, only use the specified features # check if the given features are valid features_set = set(features) if not all_features.issuperset(features_set): intersection_feats = all_features.intersection(features_set) difference = features_set.difference(intersection_feats) raise ValueError(f"Given features contain invalid features that not in the dataset: {difference}") # check if the given features contain necessary features for preprocessing if "RecordID" not in features: features.append("RecordID") if label_feature not in features: features.append(label_feature) if time_feature not in features: features.append(time_feature) # select the specified features finally X = X[features] X = X.groupby("RecordID").apply(apply_func) # pandas versions differ on whether group keys are kept as columns after groupby-apply. X = X.drop(columns=["RecordID"], errors="ignore") X = X.reset_index() X = X.drop(["level_1"], axis=1) before_cols = X.columns.tolist() X = X.dropna(axis=1, how="all") # drop columns that are all NaN after_cols = X.columns.tolist() if before_cols != after_cols: logger.info(f"Dropped all-nan columns: {set(before_cols) - set(after_cols)}") # split the dataset into the train, val, and test sets # Cast to numpy array for sklearn compatibility when pandas returns extension arrays (e.g., pyarrow-backed). all_recordID = np.asarray(X["RecordID"].unique()) train_set_ids, test_set_ids = train_test_split(all_recordID, test_size=0.2, random_state=random_state) train_set_ids, val_set_ids = train_test_split(train_set_ids, test_size=0.2, random_state=random_state) train_set_ids.sort() val_set_ids.sort() test_set_ids.sort() train_set = X[X["RecordID"].isin(train_set_ids)].sort_values(["RecordID", time_feature]) val_set = X[X["RecordID"].isin(val_set_ids)].sort_values(["RecordID", time_feature]) test_set = X[X["RecordID"].isin(test_set_ids)].sort_values(["RecordID", time_feature]) train_y = train_set[[time_feature, label_feature]] val_y = val_set[[time_feature, label_feature]] test_y = test_set[[time_feature, label_feature]] # remove useless columns and turn into numpy arrays train_set = train_set.drop(["RecordID", time_feature, label_feature], axis=1) val_set = val_set.drop(["RecordID", time_feature, label_feature], axis=1) test_set = test_set.drop(["RecordID", time_feature, label_feature], axis=1) train_X, val_X, test_X = ( train_set.to_numpy(), val_set.to_numpy(), test_set.to_numpy(), ) # normalization scaler = StandardScaler() train_X = scaler.fit_transform(train_X) val_X = scaler.transform(val_X) test_X = scaler.transform(test_X) # reshape into time series samples train_X = train_X.reshape(len(train_set_ids), 48, -1) val_X = val_X.reshape(len(val_set_ids), 48, -1) test_X = test_X.reshape(len(test_set_ids), 48, -1) # fetch labels for train/val/test sets train_y, val_y, test_y = train_y.to_numpy(), val_y.to_numpy(), test_y.to_numpy() # assemble the final processed data into a dictionary processed_dataset = { # general info "n_classes": 2, "n_steps": 48, "n_features": train_X.shape[-1], "scaler": scaler, # train set "train_X": train_X, "train_y": train_y, # val set "val_X": val_X, "val_y": val_y, # test set "test_X": test_X, "test_y": test_y, } processed_dataset = convert_processed_dataset_by_task_type( processed_dataset, task_type=task_type, n_pred_steps=n_pred_steps, forecast_feature_indices=forecast_feature_indices, ) if rate > 0: logger.warning( "Note that physionet_2019 has sparse observations in the time series, " "hence we don't add additional missing values to the training dataset. " ) # hold out ground truth in the original data for evaluation val_X_ori = processed_dataset["val_X"] test_X_ori = processed_dataset["test_X"] # mask values in the validation set as ground truth val_X = create_missingness(processed_dataset["val_X"], rate, pattern, **kwargs) # mask values in the test set as ground truth test_X = create_missingness(processed_dataset["test_X"], rate, pattern, **kwargs) processed_dataset["val_X"] = val_X processed_dataset["val_X_ori"] = val_X_ori processed_dataset["test_X"] = test_X processed_dataset["test_X_ori"] = test_X_ori test_X_indicating_mask = np.isnan(test_X_ori) ^ np.isnan(test_X) logger.info( f"{test_X_indicating_mask.sum()} values masked out in the test set as ground truth, " f"take {test_X_indicating_mask.sum() / (~np.isnan(test_X_ori)).sum():.2%} of the original observed values" ) else: logger.warning("rate is 0, no missing values are artificially added.") print_final_dataset_info(processed_dataset) return processed_dataset