Untitled
unknown
python
2 years ago
13 kB
16
Indexable
class AutoCalibrator:
def __init__(self, method, params_grid):
GP = Greenplum(
sql.stage_1.format(
table_name = inflow.table_name,
table_name_ = inflow.table_name,
agreement_gen = inflow.agreement_gen,
report_gen = inflow.report_dt,
mob = inflow.mob_col,
score_col = inflow.score_col,
target = inflow.target_col,
debt = inflow.debt_col,
debt_ = inflow.debt_col,
amount = inflow.amount_col,
target_ = inflow.target_col,
score_col_ = inflow.score_col,
fact_table = inflow.fact_table,
score_table = global_config.score_table,
appl_id = inflow.fact_appl_id_col,
appl_id_ = inflow.score_appl_id_col
),
global_config.path_1,
inflow.table_name,
global_config.omega_login
)
GP.start()
self.df = pd.read_csv(global_config.path_1)
self.method = method
self.params_grid = params_grid
self.func_res = {}
self.best_func = None
self.sql_a = None
self.sql_b = None
self.sql_stage_2 = None
def calibrate(self):
self.stage_1()
self.stage_2()
self.stage_final()
def stage_1(self):
if self.method == 'mob':
for func in self.params_grid['func']:
func_res = pd.DataFrame()
for quantile in self.params_grid['quantile']:
df, mob, model, fact = self.preprocess(self.df, method = self.method, quantile = quantile)
r2, a, b = eval('self.'+func)(mob, fact)
df_iter_row = pd.DataFrame({
'quantile' : quantile,
'mean_model' : model.mean(),
'r2' : r2,
'a' : a,
'b': b
}, index = [quantile])
func_res = pd.concat([func_res, df_iter_row], axis = 0)
self.func_res[func] = func_res
def stage_2(self):
self.best_func = max(self.func_res, key=lambda k: self.func_res[k]['r2'].mean())
df_calib = self.func_res[self.best_func]
func_res_a = pd.DataFrame()
func_res_b = pd.DataFrame()
for func in self.params_grid['func']:
r2, a, b = eval('self.'+func)(df_calib['mean_model'].to_numpy(), df_calib['a'].to_numpy(), plot = False)
df_iter_row = pd.DataFrame({
'func' : func,
'r2' : r2,
'a' : a,
'b': b
}, index = [0])
func_res_a = pd.concat([func_res_a, df_iter_row], axis = 0).reset_index(drop = True)
r2, a, b = eval('self.'+func)(df_calib['mean_model'].to_numpy(), df_calib['b'].to_numpy(), plot = False)
df_iter_row = pd.DataFrame({
'func' : func,
'r2' : r2,
'a' : a,
'b': b
}, index = [0])
func_res_b = pd.concat([func_res_b, df_iter_row], axis = 0).reset_index(drop = True)
best_func_a = func_res_a.loc[func_res_a['r2'].idxmax(), 'func']
best_func_b = func_res_b.loc[func_res_b['r2'].idxmax(), 'func']
df_calib['a_model'], self.sql_a = self.apply_func(best_func_a, df_calib['mean_model'].to_numpy(),
func_res_a.loc[func_res_a['r2'].idxmax(), 'a'],
func_res_a.loc[func_res_a['r2'].idxmax(), 'b'], sql = True)
df_calib['b_model'], self.sql_b = self.apply_func(best_func_b, df_calib['mean_model'].to_numpy(),
func_res_b.loc[func_res_b['r2'].idxmax(), 'a'],
func_res_b.loc[func_res_b['r2'].idxmax(), 'b'], sql = True)
plt.figure(figsize=(15,10))
plt.plot(df_calib['quantile'], df_calib['a'], label = 'Fact', color = 'Blue')
plt.plot(df_calib['quantile'], df_calib['a_model'], label = 'Model', color = 'Red')
plt.title(f"Coef A = {self.sql_a}")
plt.legend()
plt.show()
plt.figure(figsize=(15,10))
plt.plot(df_calib['quantile'], df_calib['b'], label = 'Fact', color = 'Blue')
plt.plot(df_calib['quantile'], df_calib['b_model'], label = 'Model', color = 'Red')
plt.title(f"Coef B = {self.sql_b}")
plt.legend()
plt.show()
self.sql_stage_2 = self.get_sql(self.best_func, self.sql_a, self.sql_b)
def stage_final(self):
GP = Greenplum(
sql.stage_2.format(
table_name = inflow.table_name,
table_name_ = inflow.table_name,
agreement_gen = inflow.agreement_gen,
report_gen = inflow.report_dt,
mob = inflow.mob_col,
score_col = inflow.score_col,
target = inflow.target_col,
debt = inflow.debt_col,
debt_ = inflow.debt_col,
amount = inflow.amount_col,
target_ = inflow.target_col,
new_score = self.sql_stage_2,
fact_table = inflow.fact_table,
score_table = global_config.score_table,
appl_id = inflow.fact_appl_id_col,
appl_id_ = inflow.score_appl_id_col
),
global_config.path_2,
inflow.table_name,
global_config.omega_login
)
GP.start()
final_df = pd.read_csv(global_config.path_2)
for col in ['mob', 'quantile', 'report_gen', 'agreement_gen']:
df, x, model, fact = self.preprocess(final_df, groupby_col = col)
plt.figure(figsize=(15,10))
plt.scatter(x, fact, color = 'Black')
plt.plot(x, fact, label = 'Fact', color = 'Blue')
plt.plot(x, model, label = 'Model', color = 'Red')
plt.legend()
plt.show()
def preprocess(self, df_calib, method = None, mob = None, quantile = None, groupby_col = None):
if method == 'quantile':
df_calib = df_calib[df_calib['mob'] == mob]
df_calib = df_calib.groupby([method]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
quantile = np.array(df_calib['quantile'])
model = np.array(df_calib['score']/df_calib['cnt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, quantile, model, fact
if method == 'mob':
df_calib = df_calib[df_calib['quantile'] == quantile]
df_calib = df_calib.groupby([method]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
df_calib = df_calib[(df_calib['cnt']/df_calib['cnt'].sum())*100 >= 0.1]
mob = np.array(df_calib['mob'])
model = np.array(df_calib['score']/df_calib['cnt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, mob, model, fact
else:
df_calib = df_calib.groupby([groupby_col]) \
.agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
.reset_index()
x = np.array(df_calib[groupby_col])
model = np.array(df_calib['score']/df_calib['total_debt'])
fact = np.array(df_calib['debt']/df_calib['total_debt'])
return df_calib, x, model, fact
def Linear(self, x, fact, plot = False):
linear_model = LinearRegression()
linear_model.fit(x.reshape(-1, 1), fact)
predicts = linear_model.predict(x.reshape(-1, 1))
rmse = mean_squared_error(fact, predicts, squared=False)
mape = mean_absolute_percentage_error(fact, predicts)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(x, x, fact, linear_model.coef_, linear_model.intercept_, 'Linear')
return r2, linear_model.coef_, linear_model.intercept_
def Ln(self, x, fact, plot = False):
if x[0] == 0:
x = x[1:]
fact = fact[1:]
coef = np.polyfit(np.log(x), fact, 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = coef[0]*np.log(x) + coef[1]
rmse = mean_squared_error(fact, predicts, squared=False)
mape = mean_absolute_percentage_error(fact, predicts)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, coef[0], coef[1], 'Ln')
return r2, coef[0], coef[1]
def Power(self, x, fact, plot = False):
if x[0] == 0:
x = x[1:]
fact = fact[1:]
coef = np.polyfit(np.log(x), np.log(fact), 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = np.exp(coef[1])*(x**coef[0])
rmse = mean_squared_error(fact, predicts, squared=False)
mape = mean_absolute_percentage_error(fact, predicts)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Power')
return r2, np.exp(coef[1]), coef[0]
def Exp(self, x, fact, plot = False):
coef = np.polyfit(x, np.log(fact), 1)
coef = list(coef)
polyline = np.linspace(np.min(x), np.max(x), 100)
predicts = np.exp(coef[1])*np.exp(coef[0]*x)
rmse = mean_squared_error(fact, predicts, squared=False)
mape = mean_absolute_percentage_error(fact, predicts)
r2 = r2_score(fact, predicts)
if plot == True:
self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Exp')
return r2, np.exp(coef[1]), coef[0]
def plot_calib(self, polyline, x, fact, a, b, func):
plt.figure(figsize=(15,10))
plt.scatter(x, fact, color = 'Black')
plt.plot(polyline, self.apply_func(func, polyline, a, b), label = 'Model', color = 'Red')
plt.plot(x, fact, label = 'Fact', color = 'Blue')
plt.legend()
plt.show()
def apply_func(self, func, x, a, b, sql = False):
if func == 'Linear':
result = a*x+b
if self.method == 'mob':
sql_str = f'({a} * c.inflow + {b})'
else:
sql_str = f'({a} * p.mobm + {b})'
if func == 'Ln':
result = a*np.log(x)+b
if self.method == 'mob':
sql_str = f'({a} * ln(c.inflow) + {b})'
else:
sql_str = f'({a} * ln(p.mobm) + {b})'
if func == 'Power':
result = a*(x**b)
if self.method == 'mob':
sql_str = f'({a} * power(c.inflow, {b}))'
else:
sql_str = f'({a} * power(p.mobm, {b}))'
if func == 'Exp':
result = a*np.exp(x*b)
if self.method == 'mob':
sql_str = f'({a} * exp(c.inflow * {b}))'
else:
sql_str = f'({a} * exp(p.mobm * {b}))'
if sql == True:
return result, sql_str
return result
def get_sql(self, best_func, func_a, func_b):
if self.method == 'mob':
if best_func == 'Linear':
return f"{func_a} * p.mobm + {func_b}"
if best_func == 'Ln':
return f"{func_a} * ln(p.mobm) + {func_b}"
if best_func == 'Power':
return f"{func_a} * power(p.mobm, {func_b})"
if best_func == 'Exp':
return f"{func_a} * exp(p.mobm * {func_b})"
else:
if best_func == 'Linear':
return f"{func_a} * c.inflow + {func_b}"
if best_func == 'Ln':
return f"{func_a} * ln(c.inflow) + {func_b}"
if best_func == 'Power':
return f"{func_a} * power(c.inflow, {func_b})"
if best_func == 'Exp':
return f"{func_a} * exp(c.inflow * {func_b})" Editor is loading...
Leave a Comment