Untitled
unknown
python
a year ago
13 kB
7
Indexable
class AutoCalibrator: def __init__(self, method, params_grid): GP = Greenplum( sql.stage_1.format( table_name = inflow.table_name, table_name_ = inflow.table_name, agreement_gen = inflow.agreement_gen, report_gen = inflow.report_dt, mob = inflow.mob_col, score_col = inflow.score_col, target = inflow.target_col, debt = inflow.debt_col, debt_ = inflow.debt_col, amount = inflow.amount_col, target_ = inflow.target_col, score_col_ = inflow.score_col, fact_table = inflow.fact_table, score_table = global_config.score_table, appl_id = inflow.fact_appl_id_col, appl_id_ = inflow.score_appl_id_col ), global_config.path_1, inflow.table_name, global_config.omega_login ) GP.start() self.df = pd.read_csv(global_config.path_1) self.method = method self.params_grid = params_grid self.func_res = {} self.best_func = None self.sql_a = None self.sql_b = None self.sql_stage_2 = None def calibrate(self): self.stage_1() self.stage_2() self.stage_final() def stage_1(self): if self.method == 'mob': for func in self.params_grid['func']: func_res = pd.DataFrame() for quantile in self.params_grid['quantile']: df, mob, model, fact = self.preprocess(self.df, method = self.method, quantile = quantile) r2, a, b = eval('self.'+func)(mob, fact) df_iter_row = pd.DataFrame({ 'quantile' : quantile, 'mean_model' : model.mean(), 'r2' : r2, 'a' : a, 'b': b }, index = [quantile]) func_res = pd.concat([func_res, df_iter_row], axis = 0) self.func_res[func] = func_res def stage_2(self): self.best_func = max(self.func_res, key=lambda k: self.func_res[k]['r2'].mean()) df_calib = self.func_res[self.best_func] func_res_a = pd.DataFrame() func_res_b = pd.DataFrame() for func in self.params_grid['func']: r2, a, b = eval('self.'+func)(df_calib['mean_model'].to_numpy(), df_calib['a'].to_numpy(), plot = False) df_iter_row = pd.DataFrame({ 'func' : func, 'r2' : r2, 'a' : a, 'b': b }, index = [0]) func_res_a = pd.concat([func_res_a, df_iter_row], axis = 0).reset_index(drop = True) r2, a, b = eval('self.'+func)(df_calib['mean_model'].to_numpy(), df_calib['b'].to_numpy(), plot = False) df_iter_row = pd.DataFrame({ 'func' : func, 'r2' : r2, 'a' : a, 'b': b }, index = [0]) func_res_b = pd.concat([func_res_b, df_iter_row], axis = 0).reset_index(drop = True) best_func_a = func_res_a.loc[func_res_a['r2'].idxmax(), 'func'] best_func_b = func_res_b.loc[func_res_b['r2'].idxmax(), 'func'] df_calib['a_model'], self.sql_a = self.apply_func(best_func_a, df_calib['mean_model'].to_numpy(), func_res_a.loc[func_res_a['r2'].idxmax(), 'a'], func_res_a.loc[func_res_a['r2'].idxmax(), 'b'], sql = True) df_calib['b_model'], self.sql_b = self.apply_func(best_func_b, df_calib['mean_model'].to_numpy(), func_res_b.loc[func_res_b['r2'].idxmax(), 'a'], func_res_b.loc[func_res_b['r2'].idxmax(), 'b'], sql = True) plt.figure(figsize=(15,10)) plt.plot(df_calib['quantile'], df_calib['a'], label = 'Fact', color = 'Blue') plt.plot(df_calib['quantile'], df_calib['a_model'], label = 'Model', color = 'Red') plt.title(f"Coef A = {self.sql_a}") plt.legend() plt.show() plt.figure(figsize=(15,10)) plt.plot(df_calib['quantile'], df_calib['b'], label = 'Fact', color = 'Blue') plt.plot(df_calib['quantile'], df_calib['b_model'], label = 'Model', color = 'Red') plt.title(f"Coef B = {self.sql_b}") plt.legend() plt.show() self.sql_stage_2 = self.get_sql(self.best_func, self.sql_a, self.sql_b) def stage_final(self): GP = Greenplum( sql.stage_2.format( table_name = inflow.table_name, table_name_ = inflow.table_name, agreement_gen = inflow.agreement_gen, report_gen = inflow.report_dt, mob = inflow.mob_col, score_col = inflow.score_col, target = inflow.target_col, debt = inflow.debt_col, debt_ = inflow.debt_col, amount = inflow.amount_col, target_ = inflow.target_col, new_score = self.sql_stage_2, fact_table = inflow.fact_table, score_table = global_config.score_table, appl_id = inflow.fact_appl_id_col, appl_id_ = inflow.score_appl_id_col ), global_config.path_2, inflow.table_name, global_config.omega_login ) GP.start() final_df = pd.read_csv(global_config.path_2) for col in ['mob', 'quantile', 'report_gen', 'agreement_gen']: df, x, model, fact = self.preprocess(final_df, groupby_col = col) plt.figure(figsize=(15,10)) plt.scatter(x, fact, color = 'Black') plt.plot(x, fact, label = 'Fact', color = 'Blue') plt.plot(x, model, label = 'Model', color = 'Red') plt.legend() plt.show() def preprocess(self, df_calib, method = None, mob = None, quantile = None, groupby_col = None): if method == 'quantile': df_calib = df_calib[df_calib['mob'] == mob] df_calib = df_calib.groupby([method]) \ .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \ .reset_index() quantile = np.array(df_calib['quantile']) model = np.array(df_calib['score']/df_calib['cnt']) fact = np.array(df_calib['debt']/df_calib['total_debt']) return df_calib, quantile, model, fact if method == 'mob': df_calib = df_calib[df_calib['quantile'] == quantile] df_calib = df_calib.groupby([method]) \ .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \ .reset_index() df_calib = df_calib[(df_calib['cnt']/df_calib['cnt'].sum())*100 >= 0.1] mob = np.array(df_calib['mob']) model = np.array(df_calib['score']/df_calib['cnt']) fact = np.array(df_calib['debt']/df_calib['total_debt']) return df_calib, mob, model, fact else: df_calib = df_calib.groupby([groupby_col]) \ .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \ .reset_index() x = np.array(df_calib[groupby_col]) model = np.array(df_calib['score']/df_calib['total_debt']) fact = np.array(df_calib['debt']/df_calib['total_debt']) return df_calib, x, model, fact def Linear(self, x, fact, plot = False): linear_model = LinearRegression() linear_model.fit(x.reshape(-1, 1), fact) predicts = linear_model.predict(x.reshape(-1, 1)) rmse = mean_squared_error(fact, predicts, squared=False) mape = mean_absolute_percentage_error(fact, predicts) r2 = r2_score(fact, predicts) if plot == True: self.plot_calib(x, x, fact, linear_model.coef_, linear_model.intercept_, 'Linear') return r2, linear_model.coef_, linear_model.intercept_ def Ln(self, x, fact, plot = False): if x[0] == 0: x = x[1:] fact = fact[1:] coef = np.polyfit(np.log(x), fact, 1) coef = list(coef) polyline = np.linspace(np.min(x), np.max(x), 100) predicts = coef[0]*np.log(x) + coef[1] rmse = mean_squared_error(fact, predicts, squared=False) mape = mean_absolute_percentage_error(fact, predicts) r2 = r2_score(fact, predicts) if plot == True: self.plot_calib(polyline, x, fact, coef[0], coef[1], 'Ln') return r2, coef[0], coef[1] def Power(self, x, fact, plot = False): if x[0] == 0: x = x[1:] fact = fact[1:] coef = np.polyfit(np.log(x), np.log(fact), 1) coef = list(coef) polyline = np.linspace(np.min(x), np.max(x), 100) predicts = np.exp(coef[1])*(x**coef[0]) rmse = mean_squared_error(fact, predicts, squared=False) mape = mean_absolute_percentage_error(fact, predicts) r2 = r2_score(fact, predicts) if plot == True: self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Power') return r2, np.exp(coef[1]), coef[0] def Exp(self, x, fact, plot = False): coef = np.polyfit(x, np.log(fact), 1) coef = list(coef) polyline = np.linspace(np.min(x), np.max(x), 100) predicts = np.exp(coef[1])*np.exp(coef[0]*x) rmse = mean_squared_error(fact, predicts, squared=False) mape = mean_absolute_percentage_error(fact, predicts) r2 = r2_score(fact, predicts) if plot == True: self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Exp') return r2, np.exp(coef[1]), coef[0] def plot_calib(self, polyline, x, fact, a, b, func): plt.figure(figsize=(15,10)) plt.scatter(x, fact, color = 'Black') plt.plot(polyline, self.apply_func(func, polyline, a, b), label = 'Model', color = 'Red') plt.plot(x, fact, label = 'Fact', color = 'Blue') plt.legend() plt.show() def apply_func(self, func, x, a, b, sql = False): if func == 'Linear': result = a*x+b if self.method == 'mob': sql_str = f'({a} * c.inflow + {b})' else: sql_str = f'({a} * p.mobm + {b})' if func == 'Ln': result = a*np.log(x)+b if self.method == 'mob': sql_str = f'({a} * ln(c.inflow) + {b})' else: sql_str = f'({a} * ln(p.mobm) + {b})' if func == 'Power': result = a*(x**b) if self.method == 'mob': sql_str = f'({a} * power(c.inflow, {b}))' else: sql_str = f'({a} * power(p.mobm, {b}))' if func == 'Exp': result = a*np.exp(x*b) if self.method == 'mob': sql_str = f'({a} * exp(c.inflow * {b}))' else: sql_str = f'({a} * exp(p.mobm * {b}))' if sql == True: return result, sql_str return result def get_sql(self, best_func, func_a, func_b): if self.method == 'mob': if best_func == 'Linear': return f"{func_a} * p.mobm + {func_b}" if best_func == 'Ln': return f"{func_a} * ln(p.mobm) + {func_b}" if best_func == 'Power': return f"{func_a} * power(p.mobm, {func_b})" if best_func == 'Exp': return f"{func_a} * exp(p.mobm * {func_b})" else: if best_func == 'Linear': return f"{func_a} * c.inflow + {func_b}" if best_func == 'Ln': return f"{func_a} * ln(c.inflow) + {func_b}" if best_func == 'Power': return f"{func_a} * power(c.inflow, {func_b})" if best_func == 'Exp': return f"{func_a} * exp(c.inflow * {func_b})"
Editor is loading...
Leave a Comment