Untitled
unknown
python
2 years ago
17 kB
8
Indexable
class AutoCalibrator:
def __init__(self, method, params_grid):
'''
Конструктор класса, в котором происходит подключение к GP,
выгрузка таблицы для первичной калибровки
и инициализация необходимых переменных.
'''
self.method = method
self.params_grid = params_grid
self.func_res = {}
self.best_func = None
self.sql_a = None
self.sql_b = None
self.sql_stage_2 = None
self.initialize_greenplum(1)
self.df = pd.read_csv(global_config.path_1)
def initialize_greenplum(self, stage):
'''
Метод подключения к GP для выгрузки таблицы.
'''
formatted_sql, path = self.prepare_sql(stage)
GP = Greenplum(formatted_sql, path, inflow.table_name, global_config.omega_login)
GP.start()
def prepare_sql(self, stage):
'''
Метод подготовки SQL-скрипта в зависимости от стадии калибровки.
'''
if stage == 1:
script = sql.stage_1.format(table_name = inflow.table_name,
table_name_ = inflow.table_name,
agreement_gen = inflow.agreement_gen,
report_gen = inflow.report_dt,
mob = inflow.mob_col,
score_col = inflow.score_col,
target = inflow.target_col,
debt = inflow.debt_col,
debt_ = inflow.debt_col,
amount = inflow.amount_col,
target_ = inflow.target_col,
score_col_ = inflow.score_col,
fact_table = inflow.fact_table,
score_table = global_config.score_table,
appl_id = inflow.fact_appl_id_col,
appl_id_ = inflow.score_appl_id_col
)
path = global_config.path_1
else:
script = sql.stage_2.format(table_name = inflow.table_name,
table_name_ = inflow.table_name,
agreement_gen = inflow.agreement_gen,
report_gen = inflow.report_dt,
mob = inflow.mob_col,
score_col = inflow.score_col,
target = inflow.target_col,
debt = inflow.debt_col,
debt_ = inflow.debt_col,
amount = inflow.amount_col,
target_ = inflow.target_col,
new_score = self.sql_stage_2,
fact_table = inflow.fact_table,
score_table = global_config.score_table,
appl_id = inflow.fact_appl_id_col,
appl_id_ = inflow.score_appl_id_col
)
path = global_config.path_2
return script, path
def calibrate(self):
'''
Метод запуска всех необходимых стадий калибровки.
'''
self.stage_1()
self.stage_2()
self.stage_final()
def stage_1(self):
'''
Метод первой стадии калибровки, в которой зависимости от метода
(что мы фиксируем: квантиль или моб) производится группировка таблицы
и её передача в методы построения калибровочных функций и подбвора
коэффициентов на каждый моб/квантиль.
'''
if self.method == 'mob':
for func in self.params_grid['func']:
func_res = pd.DataFrame()
for quantile in self.params_grid['quantile']:
df, mob, model, fact = self.preprocess(self.df, method = self.method, quantile = quantile)
r2, a, b = eval('self.'+func)(mob, fact)
df_iter_row = pd.DataFrame({
'quantile' : quantile,
'mean_model' : model.mean(),
'r2' : r2,
'a' : a,
'b': b
}, index = [quantile])
func_res = pd.concat([func_res, df_iter_row], axis = 0)
self.func_res[func] = func_res
else:
for func in self.params_grid['func']:
func_res = pd.DataFrame()
for mob in self.params_grid['mob']:
df, quantile, model, fact = self.preprocess(self.df, method = self.method, mob = mob)
r2, a, b = eval('self.'+func)(model, fact)
df_iter_row = pd.DataFrame({
'mob' : mob,
'r2' : r2,
'a' : a,
'b': b
}, index = [mob])
func_res = pd.concat([func_res, df_iter_row], axis = 0)
self.func_res[func] = func_res
def stage_2(self):
'''
Метод второй стадии калибровки, в которой зависимости от метода
(что мы фиксируем: квантиль или моб) производится аппроксимация
полученных коэффициентов на первой стадии.
'''
best_func_key = max(self.func_res, key=lambda k: self.func_res[k]['r2'].mean())
calibration_data = self.func_res[best_func_key]
results_a = self.evaluate_model(calibration_data, 'a')
results_b = self.evaluate_model(calibration_data, 'b')
best_func_a, best_a, best_b_a = self.find_best_model(results_a)
best_func_b, best_a_b, best_b = self.find_best_model(results_b)
if self.method == 'mob':
calibration_data['a_model'], self.sql_a = self.apply_func(best_func_a, calibration_data['mean_model'].to_numpy(), best_a, best_b_a, sql=True)
calibration_data['b_model'], self.sql_b = self.apply_func(best_func_b, calibration_data['mean_model'].to_numpy(), best_a_b, best_b, sql=True)
else:
calibration_data['a_model'], self.sql_a = self.apply_func(best_func_a, calibration_data['mob'].to_numpy(), best_a, best_b_a, sql=True)
calibration_data['b_model'], self.sql_b = self.apply_func(best_func_b, calibration_data['mob'].to_numpy(), best_a_b, best_b, sql=True)
self.plot_results(calibration_data.iloc[:, 0], calibration_data['a'], calibration_data['a_model'], f"Coef A = {self.sql_a}")
self.plot_results(calibration_data.iloc[:, 0], calibration_data['b'], calibration_data['b_model'], f"Coef B = {self.sql_b}")
self.sql_stage_2 = self.get_sql(best_func_key, self.sql_a, self.sql_b)
def stage_final(self):
'''
Финальный метод, в котором после получения необходимой
калибровочной функции от моба и скора производится подключение
к GP и выгрузка итоговой таблицы, а также вывод результатов
в разрезе мобов, квантилей, отчётных дат и дат выдач.
'''
self.initialize_greenplum(2)
final_df = pd.read_csv(global_config.path_2)
for col in ['mob', 'quantile', 'report_gen', 'agreement_gen']:
df, x, model, fact = self.preprocess(final_df, groupby_col = col)
self.plot_results(x, fact, model, col.upper())
def preprocess(self, df_calib, method = None, mob = None, quantile = None, groupby_col = None):
'''
Метод группировки калибровочной таблицы в зависимости от метода и стадии калибровки.
'''
if method == 'quantile':
df_calib = df_calib[df_calib['mob'] == mob]
df_calib = df_calib.groupby([method]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
quantile = np.array(df_calib['quantile'])
model = np.array(df_calib['score']/df_calib['cnt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, quantile, model, fact
if method == 'mob':
df_calib = df_calib[df_calib['quantile'] == quantile]
df_calib = df_calib.groupby([method]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
df_calib = df_calib[(df_calib['cnt']/df_calib['cnt'].sum())*100 >= 0.1]
mob = np.array(df_calib['mob'])
model = np.array(df_calib['score']/df_calib['cnt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, mob, model, fact
else:
df_calib = df_calib.groupby([groupby_col]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
x = np.array(df_calib[groupby_col])
model = np.array(df_calib['score']/df_calib['total_debt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, x, model, fact
def evaluate_model(self, model_data, parameter):
'''
Метод перебора калибровочных функций для аппроксимации коэффициентов.
'''
results = pd.DataFrame()
for func_name in self.params_grid['func']:
x = model_data['mean_model'].to_numpy() if self.method == 'mob' else model_data['mob'].to_numpy()
r2, a, b = eval(f'self.{func_name}')(x, model_data[parameter].to_numpy())
results = pd.concat([results, pd.DataFrame({
'func': func_name,
'r2': r2,
'a': a,
'b': b
}, index=[0])], ignore_index=True)
return results
def find_best_model(self, results):
'''
Метод нахождения наилучшей калибровочной функции.
'''
best_idx = results['r2'].idxmax()
best_func_name = results.loc[best_idx, 'func']
a, b = results.loc[best_idx, ['a', 'b']]
return best_func_name, a, b
def plot_results(self, group, fact, model, title):
'''
Метод вывода результирующих графиков калибровки.
'''
plt.figure(figsize=(20, 15))
plt.scatter(list(map(str, group)), fact, color = 'Black')
plt.plot(list(map(str, group)), fact, label = 'Fact', color = 'Blue')
plt.plot(list(map(str, group)), model, label = 'Model', color = 'Red')
plt.xticks(rotation=60)
plt.title(title)
plt.legend()
plt.show()
'''
Блок методов с функциями калибровки.
'''
def Linear(self, x, fact, plot = False):
linear_model = LinearRegression()
linear_model.fit(x.reshape(-1, 1), fact)
predicts = linear_model.predict(x.reshape(-1, 1))
rmse = mean_squared_error(fact, predicts, squared=False)
mape = mean_absolute_percentage_error(fact, predicts)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(x, x, fact, linear_model.coef_, linear_model.intercept_, 'Linear')
return r2, linear_model.coef_, linear_model.intercept_
def Ln(self, x, fact, plot = False):
if x[0] == 0:
x = x[1:]
fact = fact[1:]
coef = np.polyfit(np.log(x), fact, 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = coef[0]*np.log(x) + coef[1]
rmse = mean_squared_error(fact, predicts, squared=False)
mape = mean_absolute_percentage_error(fact, predicts)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, coef[0], coef[1], 'Ln')
return r2, coef[0], coef[1]
def Power(self, x, fact, plot = False):
if x[0] == 0:
x = x[1:]
fact = fact[1:]
coef = np.polyfit(np.log(x), np.log(fact), 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = np.exp(coef[1])*(x**coef[0])
rmse = mean_squared_error(fact, predicts, squared=False)
mape = mean_absolute_percentage_error(fact, predicts)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Power')
return r2, np.exp(coef[1]), coef[0]
def Exp(self, x, fact, plot = False):
coef = np.polyfit(x, np.log(fact), 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = np.exp(coef[1])*np.exp(coef[0]*x)
rmse = mean_squared_error(fact, predicts, squared=False)
mape = mean_absolute_percentage_error(fact, predicts)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Exp')
return r2, np.exp(coef[1]), coef[0]
def plot_calib(self, polyline, x, fact, a, b, func):
plt.figure(figsize=(20, 15))
plt.scatter(x, fact, color = 'Black')
plt.plot(polyline, self.apply_func(func, polyline, a, b), label = 'Model', color = 'Red')
plt.plot(x, fact, label = 'Fact', color = 'Blue')
plt.legend()
plt.show()
def apply_func(self, func, x, a, b, sql=False):
'''
Метод приминения калибровочных функций к данным.
'''
operations = {
'Linear': (lambda x, a, b: a*x+b, '({} * {} + {})'),
'Ln': (lambda x, a, b: a*np.log(x)+b, '({} * ln({}) + {})'),
'Power': (lambda x, a, b: a*(x**b), '({} * power({}, {}))'),
'Exp': (lambda x, a, b: a*np.exp(x*b), '({} * exp({} * {}))'),
}
calc, sql_template = operations[func]
result = calc(x, a, b)
if self.method == 'mob':
sql_str = sql_template.format(a, f'c.{inflow.score_col}', b)
else:
sql_str = sql_template.format(a, f'p.{inflow.mob_col}', b)
return (result, sql_str) if sql else result
def get_sql(self, best_func, func_a, func_b):
'''
Метод формирования итогового SQL-скрипта с калибровочной функцией.
'''
sql_patterns = {
'Linear': '{} * {} + {}',
'Ln': '{} * ln({}) + {}',
'Power': '{} * power({}, {})',
'Exp': '{} * exp({} * {})',
}
column = f'p.{inflow.mob_col}' if self.method == 'mob' else f'c.{inflow.score_col}'
sql_template = sql_patterns[best_func]
return sql_template.format(func_a, column, func_b)Editor is loading...
Leave a Comment