Исходный код risksutils.metrics
import numpy as np
from scipy.stats import norm
import pandas as pd
[документация]def information_value(df, feature, target, num_buck=10):
"""information value признака с целевой переменной target
**Аргументы**
df : pandas.DataFrame
таблица с данными
feature : str
признак
target : str
целевая переменная
num_buck : numeric
количество бакетов
**Результат**
information value : float
**Пример использования**
>>> import pandas as pd
>>> df = pd.DataFrame({'foo': [1, 1, 1, np.nan, np.nan],
... 'bar': [0, 0, 1, 0, 1]})
>>> information_value(df, 'foo', 'bar')
0.11552453009332433
"""
df = df.loc[df[target].dropna().index, [feature, target]]
all_tr = df[target].mean()
all_cnt = df.shape[0]
return (
df.assign(bucket=lambda x: np.ceil((x[feature].rank(pct=True) *
num_buck).fillna(-1)),
cnt=1)
.rename(columns={target: 'tr'})
.groupby('bucket')
.agg({'tr': 'mean', 'cnt': 'sum'})
.assign(tr=lambda x: np.clip(x['tr'], 0.001, 0.999))
.eval(' ( (tr/{all_tr}) - ((1-tr)/(1-{all_tr})) )'
' * ( log(tr/{all_tr}) - log((1-tr)/(1-{all_tr})) )'
' * ( cnt/{all_cnt})'
''.format(all_tr=all_tr, all_cnt=all_cnt))
.sum()
)
[документация]def information_value_binormal(auc):
"""information value из бинормального приближения через AUC
**Аргументы**
AUC : float
Area Under Roc Curve
**Результат**
information value : float
**Пример использования**
>>> information_value_binormal(0.5)
0.0
"""
return norm.ppf(auc) ** 2 * 2
[документация]def stability_index(df, feature, date, num_buck=10, date_freq='MS'):
"""Stability index для всех последовательных пар дат
**Аргументы**
df : pandas.DataFrame
таблица с данными
feature : str
признак
date : str
название поля со временем
num_buck : numeric
количество бакетов
date_ferq : str
Тип агрегации времени (по умолчанию 'MS' - начало месяца)
**Результат**
pd.Series
**Пример использования**
>>> df = pd.DataFrame({
... 'dt': pd.Series(['2000-01-01', '2000-01-01', '2000-01-01',
... '2000-02-01', '2000-02-02', '2000-02-01',
... '2000-04-02', '2000-04-03'],
... dtype='datetime64[ns]'),
... 'foo': ['a', 'a', np.nan, 'a', 'b', 'b', 'a', 'b']
... })
>>> stability_index(df, 'foo', 'dt')
dt
2000-02-01 6.489979
2000-04-01 0.115525
Name: si, dtype: float64
"""
return (
df.loc[:, [feature, date]]
.assign(bucket=lambda x: np.ceil((x[feature].rank(pct=True) * num_buck)
.fillna(-1)),
object_count=1)
.groupby([pd.Grouper(key=date, freq=date_freq), 'bucket'])
.agg({'object_count': 'sum'})
.pipe(_fill_zero_missing_index_pair)
.pipe(_compute_rates, date=date)
.pipe(_compute_si_from_rates, date=date)
)
def _fill_zero_missing_index_pair(df):
"""Заполняем нулями все не появившееся значения комбинаций индексов"""
return df.reindex(
pd.MultiIndex.from_product(df.index.levels, names=df.index.names),
fill_value=0
)
def _compute_rates(df, date):
"""Считаем доли объектов в каждом бакете"""
return (
df
.reset_index()
.assign(total_objects=lambda x: x.groupby(date)['object_count']
.transform(sum))
.eval('object_rate = object_count / total_objects', inplace=False)
.set_index([date, 'bucket'])
.drop('total_objects', axis=1)
)
def _compute_si_from_rates(df, date):
"""Считаем stability index исходя из долей наблюдений"""
all_dates = df.query('object_count > 0').reset_index(date)[date].unique()
all_dates.sort()
return (
df.loc[all_dates[1:]]
.assign(object_rate_prev=pd.Series(
np.array(df.loc[all_dates[:-1]]['object_rate']),
index=df.loc[all_dates[1:]].index
))
.reset_index(date)
.assign(object_rate_prev=lambda x: np.clip(x['object_rate_prev'],
0.001, 0.999),
object_rate=lambda x: np.clip(x['object_rate'], 0.001, 0.999))
.eval('si = ( object_rate - object_rate_prev )'
' * (log(object_rate) - log(object_rate_prev))', inplace=False)
.groupby(date)
['si']
.sum()
)