import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from tqdm.auto import tqdm
from typing import Optional, Union, Tuple
from .._utils import SEQUENCE_LIKE, PipeLineMixin
from .._logr import _z_log
###################################################################
[docs]
class Olrs(PipeLineMixin, TransformerMixin, BaseEstimator): # Sklearn docs: mixins: left, BaseEstimator: right for proper MRO.
"""
Identify and handle outlier values, identification is done by calculating a
threshold beyond which an observation is considered to be an outlier,
using one of the following methods:
- Gaussian approximation:
outliers are captured based on distance from the mean.
e.g.: x data point is an outlier if: (mean - 3 * std) > x or x > (mean + 3 * std)
where 3 is distance from mean.
- Inter-quantile range proximity rule (IQR):
outliers are identified based on distance from IQR(Q3-Q1). e.g.: x data point is an
outlier if: (q1 - 1.5 * iqr) > x or x > (q3 + 1.5 * iqr)
where 1.5 is distance from IQR.
- Median Absolute Deviation from the median (MAD-median rule):
same formula as Gaussian approximation for highlighting outliers, however, replacing
mean with median and std with MAD which is suitable for skewed data. See notes below.
- Quantiles:
outliers are identified using a specific quantile values.
e.g.: x data point is an outlier if: (.05) > x or x > (1 - .05)
where [.05, 1-.05] are the 5th and 95th quantile.
Handling is done using Winsorization that is transforming the data by limiting the
extreme values `outliers`, to a certain arbitrary value. The arbitrary value are the
thresholds from one of the methods mentioned above beyond which outliers are
labeled.
e.g.: if ``distance`` = .1 and ``method`` = `q` then it's 80% winsorization as
all data below the 10th quantile is set to the 10th quantile, and data above the
90th quantile is set to the 90th quantile, thus 20% of data is reassigned.
Winsorizing is different from trimming because the extreme values are not removed,
but are instead replaced by other values.
Notes
-----
- default ``distance`` under `mad` method is not scaled. If it is desired to use
MAD as a robust replacement for the standard deviation of normal distribution,
then multiply the distance by 1.4826 before passing it to ``distance`` parameter.
e.g.: (3 * std) becomes ((3 * 1.4826) * MAD)
- If the data is normally distributed then Gaussian approximation method is best
suited for identifying outliers, otherwise, rest of methods works on normal and
non-normal data.
- data passed to `fit` and `transform` methods will be converted to a DataFrame if
not one already, default behavior is to bypass ``cols`` and ``mapping`` parameter
if any of names not found in the DataFrame and transform all numeric columns instead.
So be mindful of names used in ``cols`` and ``mapping`` parameters as column names
will be generic in that case.
Parameters
----------
cols: sequence (lists, tuples, NumPy arrays or Pandas Base Index) or None
column names of numeric features. If `None` then ``cols`` is ignored
and all numeric columns will be inferred and transformed this also applies
if any of the ``cols`` not found in the DataFrame or thier data types are not
numeric.
mapping: dict or None
Dictionary for mapping different outlier labeling method to each column, it must
have the following structure: {'column name':(method, distance)} and follows the
same logic of ``method`` and ``distance`` parameters. Columns could be independent
of ``cols`` parameter and will be merged during fit.
method: str
method to label outliers, one of [`gaus`, `qr`, `mad`, `q`]:
- **gaus**: Gaussian approximation
- **iqr**: Inter-quantile range proximity rule (IQR)
- **mad**: Median Absoulte Deviation from the median
- **q**: data quantiles
distance: float or None
override default distance to label outliers, default is:
{`gaus`: 3, `iqr`: 1.5, `mad`: 1, `q`: .05}.
Note
-----
when ``method`` = `q`:
- distance indicates the quantiles. Example: if ``distance`` = .05, data
will be capped at 5th and 95th percentiles.
- Outliers will be removed up to a maximum of the 20th percentiles. Thus,
'distance' takes values between 0 and 0.2
tail: str
specify direction to handle outliers, One of `both`, `right`, `left`.
- **both** for outliers at both ends of the distribution
- **right** for outliers at the right end of the distribution
- **left** for outliers at the left end of the distribution
hide_p_bar: Bool
triggers hiding progress bar (tqdm module); Default `False`
"""
def __init__(self,
cols: Optional[SEQUENCE_LIKE] = None,
mapping: Optional[dict] = None,
method: str = 'iqr',
distance: Optional[float] = None,
tail: str = 'both',
hide_p_bar: bool = False):
# Sklearn docs: no parameter validation in `__init__` rather in `fit`
self.cols = cols
self.mapping = mapping
self.method = method
self.distance = distance
self.tail = tail
self.hide_p_bar = hide_p_bar
[docs]
def fit(self,
X: Union[np.generic, np.ndarray, pd.DataFrame],
y = None,
labels: Optional[Union[list, pd.Index]] = None,
disp_res: bool = False):
"""
Calculate thresholds beyond which values are labeled as outliers
Parameters
----------
X: np.ndarray or pd.DataFrame
data source to use in outlier threshold calculation, usually the
training data. `Ndarray` will be converted to a DataFrame with
generic column names.
y: None
There is no need of a target in this transformer, yet the pipeline
API requires this parameter.
labels: list, Pandas Base Index or None
labels to use as column names when converting ``X`` to a DataFrame.
If `None`, generic names will be generated
disp_res: bool
triggers displaying capping thresholds per each column
Attributes
----------
z_inf_out : numpy array
excluded columns having `inf` values, if any.
z_thrsh_df : Pandas DataFrame
method used and capping thresholds per feature
feature_names_in_: numpy array
Feature names in.
n_features_in_: numpy array
Number of feature in
"""
# Input checks
t_ = ['both', 'right', 'left']
m_ = ['gaus', 'iqr', 'mad', 'q']
self._labels = labels
X = self._array_to_df(X, labels = self._labels) # confirm input data is a DataFrame
if self.cols is not None:
if not isinstance(self.cols, (list, tuple, np.ndarray, pd.Index)):
raise TypeError("'cols' parameter accepts a sequence e.g: Lists, Tuples, NumPy Arrays or Pandas Base Index. "
f"However, '{type(self.cols)}' was received!")
# if `cols` passed with errors force to none, this trigger transforming all numeric columns
if np.any(~np.isin(self.cols, X.columns)) or np.any([X[col].dtype.kind not in 'ifc' for col in self.cols]):
_z_log.info("Missing and/or None Numeric columns. `cols` parameter will be ignored")
self.cols = None
if self.mapping is not None and len(self.mapping): # len to account for {} if multiple `fit` calls on single instance
if not isinstance(self.mapping, dict):
raise TypeError("please pass 'mapping' parameter as dictionary where "
"keys are column names and values are tuples ('method', distance)")
dvs = list(self.mapping.values())
if (np.array([type(v) for v in dvs]) != tuple).any() or (np.array([len(v) for v in dvs]) != 2).any():
raise TypeError("'mapping' values must be tuples of ('method', distance)")
if np.any(~np.isin(np.array(dvs)[:,0], m_)):
raise ValueError("all 'method' values in 'mapping' parameter must be "
f"one of the following arguments: {m_}.")
qs = np.array([v[1] for v in dvs if 'q' in v])
if any(qs.ravel().astype(float) > 0.2):
raise ValueError("Invalid Mapping. Only acceptable distance values are between 0 and 0.2 "
"when using 'q' method.")
maped_cols = list(self.mapping)
if np.any(~np.isin(maped_cols, X.columns)) or np.any([X[col].dtype.kind not in 'ifc' for col in maped_cols]):
_z_log.info("Missing and/or None Numeric columns. `mapping` parameter will be ignored")
self.mapping = {}
else:
self.mapping = {} # ensure empty dict for merging latter
if self.method not in m_:
raise ValueError("'method' parameter must be one of the following arguments: "
f"{m_}, however, '{self.method}' was received!")
if self.distance is not None:
if self.distance <= 0:
raise ValueError('distance takes only positive numbers')
if self.method == 'q' and self.distance > 0.2:
raise ValueError("Only acceptable distance values are between 0 and 0.2 "
"when using 'q' method.")
if self.tail not in t_:
raise ValueError("'tail' parameter must be one of the following arguments: "
f"{t_}, however, '{self.tail}' was received!")
if self._from_array or self.cols is None: # select all numeric columns, covers if forced to None or original is ndarray
self.cols = X.select_dtypes('number').columns
all_cols = np.array(list(set(np.r_[self.cols, list(self.mapping)]))) # merge all, assuming different columns
inf_mask = np.isinf(X[all_cols]).any()
if inf_mask.any():
self.z_inf_out_ = all_cols[inf_mask] # attributes
_z_log.info("Some columns contain 'inf' values and will be excluded.")
# update both inputs
self.cols = np.array(self.cols)[~np.isin(self.cols, self.z_inf_out_)]
self.mapping = {col: self.mapping[col] for col in self.mapping if col not in self.z_inf_out_}
if X[all_cols].isnull().any().any():
_z_log.info("Some columns contain `null` values, these values will be ignored.")
# prepare for iterations
# default distance to label outliers
param_dict = {'gaus': 3, 'iqr': 1.5, 'mad': 1, 'q': .05}
# override defaults distance, if any
if self.distance:
param_dict[self.method] = self.distance
# prepare for iteration
# if different columns mapped they will be merged
# if same they will be overwritten
# if no mapping it will be ignored
self._m_d_map = {col: (self.method, param_dict[self.method]) for col in self.cols} | self.mapping
for col in tqdm(self._m_d_map, desc = f'Spotting Outliers....', disable = self.hide_p_bar):
# method and distance for each column
m, dist = self._m_d_map[col][0], self._m_d_map[col][1]
if m == 'gaus':
# ddof = 0 divides by N instead of N-1
col_mean, col_std = X[col].mean(), X[col].std(ddof = 0)
thresh_min = col_mean - (dist * col_std)
thresh_max = col_mean + (dist * col_std)
elif m == 'iqr':
q1, q3 = X[col].quantile(0.25), X[col].quantile(0.75)
thresh_min = q1 - dist * (q3 - q1)
thresh_max = q3 + dist * (q3 - q1)
elif m == 'mad':
col_median = X[col].median()
mad = (X[col] - col_median).abs().median()
thresh_min = col_median - (dist * mad)
thresh_max = col_median + (dist * mad)
elif m == 'q':
thresh_min, thresh_max = X[col].quantile(dist), X[col].quantile(1 - dist)
# update mapping with boundaries
self._m_d_map[col] = self._m_d_map[col] + (thresh_min, thresh_max)
# attributes
self.z_thrsh_df_ = pd.DataFrame(self._m_d_map, index = ('method', 'distance', 'lower', 'upper'))
# for `transform` input validation
# using final feature set after
# removing invalid inputs, if any
self.feature_names_in_ = self.z_thrsh_df_.columns
self.n_features_in_ = self.z_thrsh_df_.shape[1]
if disp_res:
display(self.z_thrsh_df_)
return self