Source code for pygrinder.block_missing.block_missing

"""

"""

# Created by Wenjie Du <wenjay.du@gmail.com>
# License: BSD-3-Clause

import math
from typing import Union

import numpy as np
import torch
from tsdb.utils.logging import logger


def random_select_start_indices(
    block_width,
    feature_idx,
    step_idx,
    hit_rate,
    n_samples,
    n_steps,
    n_features,
) -> np.ndarray:
    all_feature_indices = [
        i * n_features + j for i in range(n_samples) for j in feature_idx
    ]

    if hit_rate > 1:
        logger.warning(f"hit_rate={hit_rate} > 1")

    all_feature_start_indices = [i * n_steps for i in all_feature_indices]
    selected_feature_start_indices = np.random.choice(
        all_feature_start_indices,
        math.ceil(len(all_feature_start_indices) * hit_rate),
        replace=hit_rate > 1,
    )
    selected_feature_start_indices = np.asarray(selected_feature_start_indices)

    step_shift = np.random.choice(
        step_idx,
        len(selected_feature_start_indices),
    )
    step_shift = np.asarray(step_shift)

    selected_start_indices = selected_feature_start_indices + step_shift
    selected_start_indices = [
        i + j * n_steps for i in selected_start_indices for j in range(block_width)
    ]
    return np.asarray(selected_start_indices)


def _block_missing_numpy(
    X: np.ndarray,
    factor: float,
    block_len: int,
    block_width: int,
    feature_idx: list = None,
    step_idx: list = None,
) -> np.ndarray:
    # clone X to ensure values of X out of this function not being affected
    X = np.copy(X)

    n_samples, n_steps, n_features = X.shape
    hit_rate = factor * n_steps * n_features / (block_len * block_width)
    start_indices = random_select_start_indices(
        block_width, feature_idx, step_idx, hit_rate, n_samples, n_steps, n_features
    )

    X = X.transpose(0, 2, 1)
    X = X.reshape(-1)
    for idx in start_indices:
        X[idx : idx + block_len] = np.nan

    X = X.reshape(n_samples, n_features, n_steps)
    X = X.transpose(0, 2, 1)
    return X


def _block_missing_torch(
    X: torch.Tensor,
    factor: float,
    block_len: int,
    block_width: int,
    feature_idx: list = None,
    step_idx: list = None,
) -> torch.Tensor:
    # clone X to ensure values of X out of this function not being affected
    X = torch.clone(X)

    n_samples, n_steps, n_features = X.shape
    hit_rate = factor * n_steps * n_features / (block_len * block_width)
    start_indices = random_select_start_indices(
        block_width, feature_idx, step_idx, hit_rate, n_samples, n_steps, n_features
    )

    X = X.transpose(1, 2)
    X = X.flatten()
    for idx in start_indices:
        X[idx : idx + block_len] = np.nan

    X = X.reshape(n_samples, n_features, n_steps)
    X = X.transpose(1, 2)
    return X


[docs] def block_missing( X: Union[np.ndarray, torch.Tensor], factor: float, block_len: int, block_width: int, feature_idx: list = None, step_idx: list = None, ) -> Union[np.ndarray, torch.Tensor]: """Create block missing data. Parameters ---------- X : Data vector. If X has any missing values, they should be numpy.nan. factor : The actual missing rate of block_missing is hard to be strictly controlled. Hence, we use ``factor`` to help adjust the final missing rate. block_len : The length of the mask block. block_width : The width of the mask block. feature_idx : The indices of features for missing block to star with. step_idx : The indices of steps for a missing block to start with. Returns ------- corrupted_X : Original X with artificial missing values. Both originally-missing and artificially-missing values are left as NaN. """ if isinstance(X, list): X = np.asarray(X) n_samples, n_steps, n_features = X.shape assert isinstance( block_len, int ), f"`block_len` must be type of int, but got {type(block_len)}" assert block_len <= n_steps, f"`seq_len` must be <= {n_steps}, but got {block_len}" assert isinstance( block_width, int ), f"`block_width` must be type of int, but got {type(block_width)}" assert ( block_width <= n_features ), f"`block_width` must be <= {n_features}, but got {block_width}" if feature_idx is not None: assert isinstance( feature_idx, list ), f"`feature_idx` must be type of list, but got {type(feature_idx)}" assert ( max(feature_idx) <= n_features ), f"values in `feature_idx` must be <= {n_features}, but got {max(feature_idx)}" else: feature_idx = list(range(n_features - block_width + 1)) if step_idx is not None: assert isinstance( step_idx, list ), f"`step_idx` must be type of list, but got {type(step_idx)}" assert ( max(step_idx) <= n_steps ), f"values in `step_idx` must be <= {n_steps}, but got {max(step_idx)}" assert ( n_steps - max(step_idx) >= block_len ), f"n_steps - max(step_idx) must be >= block_len, but got {n_steps - max(step_idx)}" else: step_idx = list(range(n_steps - block_len + 1)) if isinstance(X, np.ndarray): corrupted_X = _block_missing_numpy( X, factor, block_len, block_width, feature_idx, step_idx, ) elif isinstance(X, torch.Tensor): corrupted_X = _block_missing_torch( X, factor, block_len, block_width, feature_idx, step_idx, ) else: raise TypeError( f"X must be type of list/numpy.ndarray/torch.Tensor, but got {type(X)}" ) return corrupted_X