import pandas as pd
import numpy as np
from scipy import stats
from statsmodels.stats.multitest import multipletests
from itertools import combinations
from collections import deque
from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
from tqdm.auto import tqdm
from typing import Optional, Tuple
from IPython.display import display
from ._dist import Dist
from .._utils import SEQUENCE_LIKE
from .._logr import _z_log
###################################################################
[docs]
class CatAna(Dist):
'''
Utilizing both Statsmodels and Scipy modules to perform univariate and
Multi-variate analysis and visualizations to guide handling multi-level
categorical features.
Notes
-----
- Categorical features are considered irrespective of their dtype, either string
or numbers, users of this class are advised to explicitly specify whether the
columns are categorical using ``cat_cols`` parameter as this will affect both
grouping and underlying calculations.
- If not explicitly specified then it is assumed that if features ``cols`` are
categorical then ``target`` must be numeric and vise versa. This is to dictate
the direction of categorical grouping of numeric data and method of calculating
mutual information score `MI`.
- Categories are preprocessed to highlight `rare` levels; only frequent levels are
displayed as is while others are grouped into a single level called `rare`. This
behavior can be controlled by ``top_n`` and ``rare_thresh`` parameters.
For severely imbalanced datasets, ensure that ``rare_thresh`` parameter account
for the minority class when plotting conditional distributions.
- Missing values `NaN` in categorical features are not removed by default, rather,
considered as a separate level called `missing`. If `missing` values happens to
be also rare, then they will be labeled as `rare` instead of `missing`.
- Missing values `NaN` in numeric columns are imputed with mean value of respective
column, if desired otherwise, use ``nans_d`` parameter or do imputation before
using this class.
- `Scikit-learn` algorithm for Mutual Information score treats discrete features
differently from continuous features, thus, anything that must have a `float`
dtype is not `discrete` and will be flagged as such when calculating `MI` score
when ``target`` is continuous. It is advised to specify appropriate dtypes for
numeric features before using this class.
- For Ordinal Categorical Features its better to check correlations and regression
analysis.
- Mind the ``frac`` parameter when using the internal preprocessed DataFrame.
Parameters
----------
df: pandas dataframe
data source
cols: sequence (lists, tuples, NumPy arrays or Pandas Base Index)
column names of features to analyze. Better use homogeneous subsets, example:
either all categorical or all numeric features; categorical subset can have
multiple dtypes (object or numeric) depending on the nature of the feature
target: str
column name of target variable
cat_cols: Bool or None
indicate whether ``cols`` are categorical in nature or not and in-turn the
direction of categorical grouping of numeric data, for Example: for
`binary target`, numeric ``cols`` are to be grouped by the categorical
``target`` and vise versa. If `None`, inferred automatically.
rare_thresh, top_n: float, int
max cardinality beyond which lvls are grouped and analysed as a single level;
If ``rare_thresh`` = 0 or ``top_n`` < 2 then all levels are analysed as is,
otherwise these levels are grouped as a single `rare` level.
Notes
-----
- both are independent, for example considering categorical level to be
`rare` (1% - 5%) can still result in a high cardinality categorical
feature (> N levels)
- missing values will be displayed as `rare` rather than `missing` if the
new `missing` category is below ``rare_thresh``
nans_d: dict or None
dictionary where keys are column names and values are missing `nan`
replacements. To perform multiple imputation for several numeric ``cols``.
frac: float or None
fraction of dataframe to use as a sample
for analysis:
- 0 < ``frac`` < 1 returns a random sample with size ``frac``.
- ``frac`` = 1 returns shuffled dataframe.
- ``frac`` > 1 up-sample the dataframe, sampling of the same row more
than once.
random_state: int
for reproducibility, controls the random number generator for ``frac``
parameter and when calculating mutual information scores.
figsize: tuple
dimensions of matplotlib figure (width, height)
n_rows: int
number of rows in matplotlib subplot figure
n_cols: int
number of columns in matplotlib subplot figure
silent: Bool
solicit user input for continuation during iterative plotting. If `True`,
plotting proceeds without user interaction.
hide_p_bar: Bool
triggers hiding progress bar (tqdm module); Default `False`
Attributes
----------
z_inf_out : numpy array
excluded columns having `inf` values, if any.
z_nans: numpy array
numeric column names where imputation of `nan` values took place.
z_df: pandas dataframe
preprocessed dataframe that was used internally
z_freq_lvls_map: dict
where keys are column(s) name(s) and values are frequent levels. Only
applicable when ``cols`` are categorical
xludd_feats: numpy array
categorical column names excluded from the analysis being dominated by
rare levels. Only applicable when ``cols`` are categorical
'''
def __init__(self,
df: pd.DataFrame,
cols: SEQUENCE_LIKE,
target: str,
cat_cols: Optional[bool] = None,
rare_thresh: float = 0.05,
top_n: int = 25,
nans_d: Optional[dict] = None,
frac: Optional[float] = None,
random_state: int = 45,
figsize: Optional[Tuple[int, int]] = None,
n_rows: Optional[int] = None,
n_cols: Optional[int] = None,
silent: bool = False,
hide_p_bar: bool = False):
# Additional input checks
if nans_d:
if not isinstance(nans_d, dict):
raise TypeError("Please pass 'nans_d' parameter as a dictionary. Example: {'column name': imputation value}")
if np.any(~np.isin(np.array(list(nans_d.keys())), df.columns)):
raise KeyError("Missing columns! Please ensure that all columns to impute are included in the DataFrame")
super().__init__(df, cols, target, cat_cols, rare_thresh, top_n, frac, random_state, figsize, n_cols, n_rows, silent, hide_p_bar)
# remove target from input columns, if any
self._cols = np.array(self._cols)
if self._target in self._cols:
self._cols = self._cols[~(self._cols == self._target)]
_z_log.info(f"target column '{self._target}' has been removed from input columns")
# handling infs and nanas
# numeric features vs Categorical Target
if not self._cat_cols and not np.any([self.z_df_[col].dtype.kind in 'bO' for col in self._cols]):
self._target_lvls = self.z_df_[self._target].unique() # unique categories for ANOVA and PostHoc
self._slash_n_impute(nans_d) # attributes
[docs]
def ana_owva(self, alpha: float = 0.05, disp_res: bool = True) -> pd.DataFrame:
"""
One-way ANOVA & Kruskal-Wallis H (non-parametric equivalent of the One-Way ANOVA)
for Numeric vs Categorical Features.
null: mean/median of all groups are equal
Parameters
----------
alpha: float
Significance alpha for rejecting null hypothesis.
Reject null if p-value < ``alpha``
disp_res: bool
triggers displaying ANOVA results DataFrame
Attributes
----------
zefct_df: pandas dataframe
dataframe showing effect of categorical feature on ``target``
distribution. Only applicable when ``cols`` are categorical
Returns
-------
anova_df: pandas dataframe
dataframe of ANOVA and related assumptions results
"""
# input check
if self._cat_cols and (self.z_df_[self._target].nunique() <= 20 or self.z_df_[self._target].dtype.kind in 'bO'):
raise TypeError("`cols` and `target` are both Categorical, consider using Chi2 test of independence instead")
if not self._cat_cols:
if np.any([self.z_df_[col].dtype.kind in 'bO' for col in self._cols]):
raise TypeError("Some columns are categorical, Please ensure 'cat_cols' parameter is set to 'True'")
if self.z_df_[self._target].nunique() > 20:
raise TypeError("`cols` and `target` are both Numeric, consider using regression and correlation tests instead")
if any(self._cols):
summary = {} # ANOVA results container
var_dict = {} # Mean variation container
self._p_val = {} # p-value container for equal variances assumption tests in latter post-hoc
for col in tqdm(self._cols, desc = 'Ongoing ANOVA....', disable = self._hide_p_bar):
# target always categorical
mask, feat, target = self._grouping(col, self._target)
if self._cat_cols: # categorical feats, numeric target
n_lvls = mask[target].unique()
# Mutual Info Score
# These numbers are not percentages rather an indication of relative importance
# Features of high importance have high Score
m_i = mutual_info_regression(mask[target].factorize()[0][:, np.newaxis], mask[feat],
discrete_features = True,
random_state = self._random_state)[0].round(4)
# Mean variation within groups(conditional) compared to global(prior)
# variations within different groups of a single feature can indicate significance to prediction results
# the closer to global the less significant
var_dict[col] = mask.groupby(target)[feat].mean().to_frame()
else: # numeric feats, categorical target
n_lvls = self._target_lvls
# Mutual Info Score
m_i = mutual_info_classif(mask[[feat]], mask[target],
discrete_features = False if mask[feat].dtype.kind == 'f' else True,
random_state = self._random_state)[0].round(4)
# groups/lvls
groups = [mask[feat][mask[target] == g] for g in n_lvls] # (cat cols, num target) --> feat = target, target = col
# One-way ANOVA
# Assume normal distribution of the residuals
# null: mean of all groups are equal
f, p_f = stats.f_oneway(*groups)
# Kruskal-Wallis H (non-parametric equivalent of the One-Way ANOVA)
# Does not assume a normal distribution of the residuals
# null: median of all groups are equal
h, p_h = stats.kruskal(*groups)
# accept reject ANOVA/Kruskal
eq_mean_anova = p_f > alpha
eq_median_kruskal = p_h > alpha
# Test ANOVA Assumptions
# Checking homogeneity of variance across samples
# ANOVA assumes Equal Variance
# assuming data is not normally distributed
p_v = stats.levene(*groups).pvalue
# non-parametric(distribution free) when populations are identical
p_flg = stats.fligner(*groups).pvalue
# null: all input samples have equal variances
# Reject Null if P val <= 0.05
eq_var_lev = p_v > alpha
eq_var_flg = p_flg > alpha
# feature stats
sample_size = [len(_) for _ in groups]
n_frequent_lvls, max_sample, min_sample = len(n_lvls), np.max(sample_size), np.min(sample_size)
# update 'equal variance' p-value container
self._p_val[col] = {'levene': p_v, 'fligner': p_flg}
# update ANOVA container
summary[col] = {
'f_stat_ANOVA': f, 'p_val_f': p_f, 'eq_mean': eq_mean_anova,
'h_stat_Kruskal': h, 'p_val_h': p_h, 'eq_median': eq_median_kruskal,
'p_val_lev': p_v, 'eq_var_lev': eq_var_lev,
'p_val_flg': p_flg, 'eq_var_flg': eq_var_flg,
'max_sample_size': max_sample,
'min_sample_size': min_sample,
'n_frequent_lvls': n_frequent_lvls,
'm_i_score': m_i
}
# ANOVA/Krus dataframe
anova_df = pd.DataFrame(summary)
if hasattr(self, "z_nans_"):# flag imputations
anova_df.loc['imputed'] = [col in self.z_nans_ for col in anova_df.columns]
if any(anova_df.loc['f_stat_ANOVA'].values == np.inf) or any(np.isnan(list(anova_df.loc['f_stat_ANOVA'].values))):
_z_log.warning("Check the groups in features having 'inf' and 'nan' results.")
if self._cat_cols:
# columns for post_hoc analysis
self._pos_hoc_cols = anova_df.columns[~(anova_df.loc['n_frequent_lvls'] == 2)]
# attributes
# conditional vs prior target mean
self.zefct_df_ = self._var_df(var_dict, self.z_df_, self._target).sort_values('ratio', ascending = False)
if disp_res:
display('** One-way ANOVA / Kruskal-Wallis H and their related assumptions assessment **',
'Note: `False` means Reject Null; there is at least one group/lvl with '
'mean/median differences that are statistically significant',
anova_df.style.apply(lambda x: ['background: olive' if not x.eq_mean else '' for i in x],
subset = pd.IndexSlice['eq_mean':'eq_mean'])\
.apply(lambda x: ['background: grey' if not x.eq_median else '' for i in x],
subset = pd.IndexSlice['eq_median':'eq_median']).format(precision = 3, thousands = ','))
return anova_df
else:
_z_log.info("no columns to analyze! Please check input columns")
[docs]
def ana_post(self, equal_var: str = 'levene', alternative: str = 'two-sided', alpha: float = 0.05,
multi_tst_corrc: str = 'bonf', disp_res: bool = True) -> pd.DataFrame:
"""
post-hoc analysis(T-test and Mann–Whitney U test) for categorical features
having more than two levels.
Parameters
----------
equal_var: str
method to apply when checking for equal variance assumption prior to
calculating the T-test. One of `levene` or `fligner`. `levene` tests
equal variance assumption assuming data is not normally distributed,
`Fligner-Killeen's` test is distribution free when populations are
identical
alternative: str
defines the alternative
hypothesis
* **two-sided**: distributions underlying the samples are unequal
* **less**: the distribution underlying the first sample is less
than the distribution underlying the second sample
* **greater**: the distribution underlying the first sample is
greater the distribution underlying the second sample.
alpha: float
pre-adjusted alpha(significance level) for rejecting null hypothesis,
will also be used in multiple comparison corrections. Reject null if
p-value < ``alpha``
multi_tst_corrc: str
method used for testing and adjusting pvalues from statsmodels
multipletests
- **bonferroni**: one-step correction
- **sidak**: one-step correction
- **holm-sidak**: step down method using Sidak adjustments
- **holm**: step-down method using Bonferroni adjustments
- **simes-hochberg**: step-up method (independent)
- **hommel**: closed method based on Simes tests (non-negative)
- **fdr_bh**: Benjamini/Hochberg (non-negative)
- **fdr_by**: Benjamini/Yekutieli (negative)
- **fdr_tsbh**: two stage fdr correction (non-negative)
- **fdr_tsbky**: two stage fdr correction (non-negative)
disp_res: bool
triggers displaying results DataFrame, only for features having no
significant results
Attributes
----------
xludd_phoc_feats: numpy array
column names excluded from the analysis having only two frequent levels.
Only applicable when ``cols`` are categorical
Returns
--------
post_hoc_df: pandas dataframe
dataframe of Post-Hoc analysis results
"""
# input check
try:
self._p_val
except:
raise AttributeError("Please run ANOVA first!")
if not self._cat_cols and self.z_df_[self._target].nunique() <= 2: # cat target with only two lvls
raise TypeError("Post Hoc Analysis only applicable for categorical features having more than 2 levels")
if equal_var not in ['levene', 'fligner']:
raise ValueError("'equal_var' parameter must be one of the following arguments: 'levene' or 'fligner' "
f"however, '{equal_var}' was received!")
if self._cat_cols: # cat features vs num target
# run post hoc only if more than two levels for columns already analyzed
# Note: `missing` and `rare` levels are inclusive
cols = self._pos_hoc_cols
# attributes
self.xludd_phoc_feats_ = self._cols[~np.isin(self._cols, cols)]
if self.xludd_phoc_feats_ :
_z_log.info(f"{len(self.xludd_phoc_feats_):,.0f} Feature(s) Having Only 2 Frequent Levels "
"Were Excluded From Post Hoc Analysis")
else: # num features vs cat target
cols = self._cols
comb = self._target_lvls # for calculating n-possible combinations
mask = self.z_df_
if any(cols):
post_hoc = [] # test results container
for col in tqdm(cols, desc = 'Ongoing Post Hoc Analysis....', disable = self._hide_p_bar):
temp_post_hoc = [] # column wise test container
mask, feat, target = self._grouping(col, self._target)
if self._cat_cols:
comb = mask[target].unique()
equal_var_ = True if self._p_val[col][equal_var] > 0.05 else False
# post-hoc
# null: 2 independent samples have identical distributions
for i in combinations(comb, 2):
# t-test based on equal variance analysis
# Welch's t-test is used assuming no-qual variance
t, p_t = stats.ttest_ind(mask[feat][mask[target] == i[0]],
mask[feat][mask[target] == i[1]],
equal_var = equal_var_, alternative = alternative,
random_state = self._random_state)
# Mann–Whitney U test (Nonparametric version of two-sided t-test)
mw, p_mw = stats.mannwhitneyu(mask[feat][mask[target] == i[0]],
mask[feat][mask[target] == i[1]],
alternative = alternative)
# update temp test container
if self._cat_cols:
temp_post_hoc.append([target, i[0], i[1], round(t,2), round(p_t,4), round(mw,2), round(p_mw,4)])
else: # numeric feats vs target column. Groups(combs) are that of target's
temp_post_hoc.append([feat, i[0], i[1], round(t,2), round(p_t,4), round(mw,2), round(p_mw,4)])
# there are two ways for adjusting the statistical inference of multiple comparisons.
# first, adjusting P-values directly by adjusting the observed P value for each hypothesis/group
# while keeping the significance level (alpha, i.e: 0.05) unchanged; comparing adjusted P-values to original alpha
# second, adjusting alpha while leaving observed P-values as is; comparing adjusted alpha to the observed P-values
# `multipletests` provide a range of results including: adjusted P-vals, adjusted alpha(FWER) and accept/reject text
# example of bonferroni adjustment of observed P-values: np.minimum(array of unadjusted P-values * N groups, 1)
# example of bonferroni adjustment of alpha: alpha / N groups
pv_t = np.array(temp_post_hoc)[:, 4:5].ravel().astype(float) # unadjusted T-TEST P-values
mul_ts_t = multipletests(pv_t, alpha = alpha, method = multi_tst_corrc) # includes adjusted P-values
# update temp test container
deque(map(list.append, temp_post_hoc, np.repeat(mul_ts_t[2], len(mul_ts_t[0]))), 0) # adjusted FWER(alpha) Sidak
deque(map(list.append, temp_post_hoc, np.repeat(mul_ts_t[3], len(mul_ts_t[0]))), 0) # adjusted FWER(alpha) Bonferroni
deque(map(list.append, temp_post_hoc, mul_ts_t[1]), 0) # corrected P-values of multiple tests
deque(map(list.append, temp_post_hoc, mul_ts_t[0]), 0) # accept/reject text of T-TEST's null
# update tests container
post_hoc.extend(temp_post_hoc)
# post-hoc dataframe
post_hoc_df = pd.DataFrame(post_hoc, columns = ['feature', 'group_one', 'group_two',
't_stat', 'p_val_t', 'mw_stat', 'p_val_mw',
'FWER_Sidak', 'FWER_Bonf', 'adj_p_val_t',
f'reject_t_{multi_tst_corrc}'])
# re-arrange columns
o_cols = post_hoc_df.columns
post_hoc_df = post_hoc_df[np.r_[o_cols[:5], o_cols[9:], o_cols[5:9]]]
if disp_res:
false_mask = post_hoc_df[post_hoc_df[f'reject_t_{multi_tst_corrc}'] == False]
if false_mask.shape[0]:
display(f'** Post-hoc results using `{multi_tst_corrc}` correction for groups/lvls having no '
'significant results **',
'Note: `False` means Accept Null; difference in conditional distribution given these groups/lvls '
'is not statistically significantl',
false_mask.T.style\
.apply(lambda x: ['background: olive' if not x.iloc[-1] else '' for i in x],
subset = pd.IndexSlice[f'reject_t_{multi_tst_corrc}':f'reject_t_{multi_tst_corrc}'])\
.format(precision = 3, thousands = ',').hide(axis = 1))
else:
display('** Conditional distribution of all groups/lvls has statistically significant differences **')
return post_hoc_df
else:
_z_log.info("no columns to analyze! Please check input columns")
[docs]
def ana_chi2(self, alpha: float = 0.05, disp_res: bool = True) -> pd.DataFrame:
"""
Chi2 test of independence between two categorical variables, best suited for
nominal data.
null: categorical variables are independent
Notes
-----
- An often quoted guideline for the validity of this calculation is that
the test should be used only if the observed and expected frequencies
in each cell are at least 5.
- This is a test for the independence of different categories of a
population. The test is only meaningful when the dimension of
`observed` is two or more. Applying the test to a one-dimensional
table will always result in `expected` equal to `observed` and a
chi-square statistic equal to 0.
- This function does not handle masked arrays, because the calculation
does not make sense with missing values.
Parameters
----------
alpha: float
Significance alpha for rejecting null hypothesis.
Reject null if p-value < ``alpha``
disp_res: bool
triggers displaying results DataFrame
Attributes
----------
zefct_df: pandas dataframe
dataframe showing effect of categorical feature on target distribution.
Only applicable when ``cols`` are categorical
z_crss_tabs: dict
where keys are analyzed columns and values are their corresponding
cross_tabs. Only applicable when ``cols`` are categorical
Returns
-------
chi2_df: pandas dataframe
dataframe of chi2 analysis results
"""
# input check - triggers grouping which can be bypassed by 'rare_thresh' and 'top_n'
if not self._cat_cols:
if self.z_df_[self._target].nunique() > 20:
_z_log.info("Chi2 test of independence is best suited for two categorical variables.")
else:
_z_log.info("'cat_cols' is set to 'False', rare and missing categorical levels are not preprocessed.")
# sorting discrete labels to report on posterior vs prior changes
# string class labels follows order of appearance
if self.z_df_[self._target].dtype.kind not in 'bO':
self._lbl = np.sort(self.z_df_[self._target].unique())
else:
self._lbl = self.z_df_[self._target].unique()
if any(self._cols):
var_dict = {} # Mean variation container
chi2_res = {} # chi2 results container
self.z_crss_tabs_ = {} # cross_tabs container
for col in tqdm(self._cols, desc = 'Ongoing Independence Test(Chi2)....', disable = self._hide_p_bar):
mask, feat, target = self._grouping(col, self._target)
# calculate chi2 statistic and p-value from contingency table
crss_tab = pd.crosstab(mask[feat], mask[target])
chi2, p_chi2 = stats.chi2_contingency(crss_tab)[:2]
# null: both categories are independent
acc_rej = p_chi2 <= alpha
# `min_smpl` captures the smallest sample size
# across all crosstab cells; rule of thumb is that
# chi2 test should be used only if the observed and
# expected frequencies in each cell are at least 5.
chi2_res[col] = {'chi2': chi2, 'p_val': p_chi2, 'dependent': acc_rej, 'min_sample_size': min(crss_tab.min())}
# attributes
self.z_crss_tabs_[col] = crss_tab
# Mean variation within groups(conditional) compared to global(prior)
# the closer to global the less significant
# treat multiclass as binary
# TODO: better handling of multiclass to display more results
if self._cat_cols: # why here? so it wont mess with `comb` dtype calls from `ana_post`
var_dict[col] = (crss_tab / crss_tab.sum()).iloc[-1].to_frame()
# chi2 dataframe
chi2_df = pd.DataFrame(chi2_res)
# attributes
if var_dict and not hasattr(self, 'zefct_df_'):
# P(Class|feature)
self.zefct_df_ = self._var_df(var_dict, self.z_df_, self._target).sort_values('ratio', ascending = False)
if disp_res:
# Display Results
# Note when using subset: Only label-based slicing is supported, any valid indexer to .loc will work
display('** Chi2 test of independence results **',
'Note: `True` means Reject Null; categorical variables are dependent **',
chi2_df.style.apply(lambda x: ['background: olive' if x.dependent else '' for i in x],
subset = pd.IndexSlice['dependent':'dependent']).format(precision = 3, thousands = ','))
return chi2_df
else:
_z_log.info("No columns to analyze! Please check input columns")
def _var_df(self, var_dict: dict,
df: pd.DataFrame,
target: str) -> pd.DataFrame:
"""
Construct a DataFrame highligting Conditional and Prior ``target`` mean
differences for each categorical feature.
Parameters
----------
var_dict: dictionary
where keys are categorical column names and values are
DataFrames of conditional ``target`` distribution. Example:
{col:df.groupby(col)[target].mean().to_frame()}
df: pandas dataframe,
data source
target: str
target column name
Returns
-------
var_df: pandas dataframe
dataframe showing effect of categorical feature on ``target`` distribution
"""
if df[target].nunique() > 20 and df[target].dtype.kind in 'ifc': # regression target
last_lbl = ''
prior = df[target].mean()
target_lvl = target
else:
last_lbl = self._lbl[-1]
prior = df[target].value_counts(dropna = False, normalize = True).loc[last_lbl]
target_lvl = f'{target}_{last_lbl}'
var_df = pd.concat(var_dict).reset_index().rename(columns = {'level_0': 'feature', 'level_1': 'lvl',
last_lbl: target_lvl,
})
# global mean
var_df['prior'] = prior
# Example: for binary classification:
# +ve = prior < posteriori (group wise), this means that this group is
# More likely to be a positive example(1) and vise versa. The magnitude
# (size of difference) is more important that the sign (+/-)
var_df['diff'] = var_df[target_lvl] - var_df['prior']
# Example: for binary classification:
# 1 means prior = posteriori (group wise) implying no impact on prediction.
# If > 1 (1.5 for example) then this group is 1.5 times more likely to be a positive example
# (aka this group is 50% more likely to be a positive example, its 50% higher than global mean)";
# If < 1 (0.75 for example) then this group is 25% less likely to be a positive example and so on
var_df['ratio'] = var_df[target_lvl] / var_df['prior']
return var_df