Untitled

mail@pastecode.io avatar
unknown
python
22 days ago
13 kB
4
Indexable
Never
class AutoCalibrator:
    
    def __init__(self, method, params_grid):
        
        GP = Greenplum(
            sql.stage_1.format(
                table_name = inflow.table_name, 
                table_name_ = inflow.table_name,
                agreement_gen = inflow.agreement_gen,
                report_gen = inflow.report_dt,
                mob = inflow.mob_col,
                score_col = inflow.score_col,
                target = inflow.target_col,
                debt = inflow.debt_col,
                debt_ = inflow.debt_col,
                amount = inflow.amount_col,
                target_ = inflow.target_col,
                score_col_ = inflow.score_col,
                fact_table = inflow.fact_table,
                score_table = global_config.score_table,
                appl_id = inflow.fact_appl_id_col,
                appl_id_ = inflow.score_appl_id_col
            ), 
            global_config.path_1, 
            inflow.table_name,
            global_config.omega_login 
        )
        
        GP.start()
        
        self.df = pd.read_csv(global_config.path_1)
        self.method = method
        self.params_grid = params_grid
        self.func_res = {}
        self.best_func = None
        self.sql_a = None
        self.sql_b = None
        self.sql_stage_2 = None
    
    def calibrate(self):
        
        self.stage_1()
        self.stage_2()
        self.stage_final()
    
    def stage_1(self):
        
        if self.method == 'mob':
            
            for func in self.params_grid['func']:
                func_res = pd.DataFrame()
                for quantile in self.params_grid['quantile']:
                    df, mob, model, fact = self.preprocess(self.df, method = self.method, quantile = quantile)
                    r2, a, b = eval('self.'+func)(mob, fact)
                    df_iter_row = pd.DataFrame({
                        'quantile' : quantile,
                        'mean_model' : model.mean(),
                        'r2' : r2,
                        'a' : a,
                        'b': b
                    }, index = [quantile])
                    func_res = pd.concat([func_res, df_iter_row], axis = 0)
             
                self.func_res[func] = func_res
    
    def stage_2(self):
        
        self.best_func = max(self.func_res, key=lambda k: self.func_res[k]['r2'].mean())
        df_calib =  self.func_res[self.best_func]
        func_res_a = pd.DataFrame()
        func_res_b = pd.DataFrame()
        
        for func in self.params_grid['func']:
            
            r2, a, b = eval('self.'+func)(df_calib['mean_model'].to_numpy(), df_calib['a'].to_numpy(), plot = False)
            df_iter_row = pd.DataFrame({
                'func' : func,
                'r2' : r2,
                'a' : a,
                'b': b
            }, index = [0])
            func_res_a = pd.concat([func_res_a, df_iter_row], axis = 0).reset_index(drop = True)
            
            r2, a, b = eval('self.'+func)(df_calib['mean_model'].to_numpy(), df_calib['b'].to_numpy(), plot = False)
            df_iter_row = pd.DataFrame({
                'func' : func,
                'r2' : r2,
                'a' : a,
                'b': b
            }, index = [0])
            func_res_b = pd.concat([func_res_b, df_iter_row], axis = 0).reset_index(drop = True)
            
        best_func_a = func_res_a.loc[func_res_a['r2'].idxmax(), 'func']
        best_func_b = func_res_b.loc[func_res_b['r2'].idxmax(), 'func']
        
        df_calib['a_model'], self.sql_a = self.apply_func(best_func_a, df_calib['mean_model'].to_numpy(), 
                                              func_res_a.loc[func_res_a['r2'].idxmax(), 'a'], 
                                              func_res_a.loc[func_res_a['r2'].idxmax(), 'b'],  sql = True)
        
        df_calib['b_model'], self.sql_b = self.apply_func(best_func_b, df_calib['mean_model'].to_numpy(), 
                                              func_res_b.loc[func_res_b['r2'].idxmax(), 'a'], 
                                              func_res_b.loc[func_res_b['r2'].idxmax(), 'b'], sql = True)
        
        plt.figure(figsize=(15,10))
        plt.plot(df_calib['quantile'], df_calib['a'], label = 'Fact', color = 'Blue')
        plt.plot(df_calib['quantile'], df_calib['a_model'], label = 'Model', color = 'Red')
        plt.title(f"Coef A = {self.sql_a}")
        plt.legend()
        plt.show()
        
        plt.figure(figsize=(15,10))
        plt.plot(df_calib['quantile'], df_calib['b'], label = 'Fact', color = 'Blue')
        plt.plot(df_calib['quantile'], df_calib['b_model'], label = 'Model', color = 'Red')
        plt.title(f"Coef B = {self.sql_b}")
        plt.legend()
        plt.show()
        
        self.sql_stage_2 = self.get_sql(self.best_func, self.sql_a, self.sql_b)
        
    def stage_final(self):
        
        GP = Greenplum(
            sql.stage_2.format(
                table_name = inflow.table_name, 
                table_name_ = inflow.table_name,
                agreement_gen = inflow.agreement_gen,
                report_gen = inflow.report_dt,
                mob = inflow.mob_col,
                score_col = inflow.score_col,
                target = inflow.target_col,
                debt = inflow.debt_col,
                debt_ = inflow.debt_col,
                amount = inflow.amount_col,
                target_ = inflow.target_col,
                new_score = self.sql_stage_2,
                fact_table = inflow.fact_table,
                score_table = global_config.score_table,
                appl_id = inflow.fact_appl_id_col,
                appl_id_ = inflow.score_appl_id_col
            ), 
            global_config.path_2, 
            inflow.table_name,
            global_config.omega_login 
        )
        
        GP.start()
        final_df = pd.read_csv(global_config.path_2)
        
        for col in ['mob', 'quantile', 'report_gen', 'agreement_gen']:
            df, x, model, fact = self.preprocess(final_df, groupby_col = col)
            plt.figure(figsize=(15,10))
            plt.scatter(x, fact, color = 'Black')
            plt.plot(x, fact, label = 'Fact', color = 'Blue')
            plt.plot(x, model, label = 'Model', color = 'Red')
            plt.legend()
            plt.show()
        
    def preprocess(self, df_calib, method = None, mob = None, quantile = None, groupby_col = None):

        if method == 'quantile':
            
            df_calib = df_calib[df_calib['mob'] == mob]
            df_calib = df_calib.groupby([method]) \
                .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
                .reset_index()
            
            quantile = np.array(df_calib['quantile'])
            model = np.array(df_calib['score']/df_calib['cnt'])
            fact = np.array(df_calib['debt']/df_calib['total_debt'])
            
            return df_calib, quantile, model, fact
        
        if method == 'mob':
            
            df_calib = df_calib[df_calib['quantile'] == quantile]
            df_calib = df_calib.groupby([method]) \
                .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
                .reset_index()
            
            df_calib = df_calib[(df_calib['cnt']/df_calib['cnt'].sum())*100 >= 0.1]
            
            mob = np.array(df_calib['mob'])
            model = np.array(df_calib['score']/df_calib['cnt'])
            fact = np.array(df_calib['debt']/df_calib['total_debt'])
            
            return df_calib, mob, model, fact
        
        else:
            
            df_calib = df_calib.groupby([groupby_col]) \
            .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
            .reset_index()
            
            x = np.array(df_calib[groupby_col])
            model = np.array(df_calib['score']/df_calib['total_debt'])
            fact = np.array(df_calib['debt']/df_calib['total_debt'])
            
            return df_calib, x, model, fact
            
    
    def Linear(self, x, fact, plot = False):
        
        linear_model = LinearRegression()
        linear_model.fit(x.reshape(-1, 1), fact)
        predicts = linear_model.predict(x.reshape(-1, 1))
        
        rmse = mean_squared_error(fact, predicts,  squared=False)
        mape = mean_absolute_percentage_error(fact, predicts)
        r2 = r2_score(fact, predicts)
        
        if plot == True:
            self.plot_calib(x, x, fact,  linear_model.coef_, linear_model.intercept_, 'Linear')
        
        return r2, linear_model.coef_, linear_model.intercept_
    
    def Ln(self, x, fact, plot = False):
        
        if x[0] == 0:
            x = x[1:]
            fact = fact[1:]
            
        coef = np.polyfit(np.log(x), fact, 1)
        coef = list(coef)
        polyline = np.linspace(np.min(x), np.max(x), 100)
        predicts = coef[0]*np.log(x) + coef[1]
        
        rmse = mean_squared_error(fact, predicts,  squared=False)
        mape = mean_absolute_percentage_error(fact, predicts)
        r2 = r2_score(fact, predicts)
        
        if plot == True:
            self.plot_calib(polyline, x, fact,  coef[0], coef[1], 'Ln')
        
        return r2, coef[0], coef[1]
    
    def Power(self, x, fact, plot = False):
        
        if x[0] == 0:
            x = x[1:]
            fact = fact[1:]
        
        coef = np.polyfit(np.log(x), np.log(fact), 1)
        coef = list(coef)
        polyline = np.linspace(np.min(x), np.max(x), 100)
        predicts = np.exp(coef[1])*(x**coef[0])
        
        rmse = mean_squared_error(fact, predicts,  squared=False)
        mape = mean_absolute_percentage_error(fact, predicts)
        r2 = r2_score(fact, predicts)
        
        if plot == True:
            self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Power')
        
        return r2, np.exp(coef[1]), coef[0]
    
    def Exp(self, x, fact, plot = False):
        
        coef = np.polyfit(x, np.log(fact), 1)
        coef = list(coef)
        polyline = np.linspace(np.min(x), np.max(x), 100)
        predicts = np.exp(coef[1])*np.exp(coef[0]*x)
        
        rmse = mean_squared_error(fact, predicts,  squared=False)
        mape = mean_absolute_percentage_error(fact, predicts)
        r2 = r2_score(fact, predicts)
        
        if plot == True:
            self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Exp')
        
        return r2, np.exp(coef[1]), coef[0]
    
    def plot_calib(self, polyline, x, fact, a, b, func):
        
        plt.figure(figsize=(15,10))
        plt.scatter(x, fact, color = 'Black')
        plt.plot(polyline, self.apply_func(func, polyline, a, b), label = 'Model', color = 'Red')
        plt.plot(x, fact, label = 'Fact', color = 'Blue')
        plt.legend()
        plt.show()
    
    def apply_func(self, func, x, a, b, sql = False):
        
        if func == 'Linear':
            result = a*x+b
            if self.method == 'mob':
                sql_str = f'({a} * c.inflow + {b})'
            else:
                sql_str = f'({a} * p.mobm + {b})'
                
        if func == 'Ln':
            result = a*np.log(x)+b
            if self.method == 'mob':
                sql_str = f'({a} * ln(c.inflow) + {b})'
            else:
                sql_str = f'({a} * ln(p.mobm) + {b})'
                
        if func == 'Power':
            result = a*(x**b)
            if self.method == 'mob':
                sql_str = f'({a} * power(c.inflow, {b}))'
            else:
                sql_str = f'({a} * power(p.mobm, {b}))'
                
        if func == 'Exp':
            result = a*np.exp(x*b)
            if self.method == 'mob':
                sql_str = f'({a} * exp(c.inflow * {b}))'
            else:
                sql_str = f'({a} * exp(p.mobm * {b}))'
        
        if sql == True:
            return result, sql_str
        
        return result
    
    def get_sql(self, best_func, func_a, func_b):
        
        if self.method == 'mob':
            if best_func == 'Linear':
                return f"{func_a} * p.mobm + {func_b}"
            if best_func == 'Ln':
                return f"{func_a} * ln(p.mobm) + {func_b}"
            if best_func == 'Power':
                return f"{func_a} * power(p.mobm, {func_b})"
            if best_func == 'Exp':
                return f"{func_a} * exp(p.mobm * {func_b})"
            
        else:
            if best_func == 'Linear':
                return f"{func_a} * c.inflow + {func_b}"
            if best_func == 'Ln':
                return f"{func_a} * ln(c.inflow) + {func_b}"
            if best_func == 'Power':
                return f"{func_a} * power(c.inflow, {func_b})"
            if best_func == 'Exp':
                return f"{func_a} * exp(c.inflow * {func_b})"        
Leave a Comment