Untitled

mail@pastecode.io avatar
unknown
python
22 days ago
17 kB
2
Indexable
Never
class AutoCalibrator:

    def __init__(self, method, params_grid):
        '''
        Конструктор класса, в котором происходит подключение к GP, 
        выгрузка таблицы для первичной калибровки 
        и инициализация необходимых переменных.
        '''
        self.method = method
        self.params_grid = params_grid
        self.func_res = {}
        self.best_func = None
        self.sql_a = None
        self.sql_b = None
        self.sql_stage_2 = None
        self.initialize_greenplum(1)
        self.df = pd.read_csv(global_config.path_1)

    def initialize_greenplum(self, stage):
        '''
        Метод подключения к GP для выгрузки таблицы.
        '''
        formatted_sql, path = self.prepare_sql(stage)
        GP = Greenplum(formatted_sql, path, inflow.table_name, global_config.omega_login)
        GP.start()

    def prepare_sql(self, stage):
        '''
        Метод подготовки SQL-скрипта в зависимости от стадии калибровки.
        '''
        if stage == 1:
            script = sql.stage_1.format(table_name = inflow.table_name, 
                                     table_name_ = inflow.table_name,
                                     agreement_gen = inflow.agreement_gen,
                                     report_gen = inflow.report_dt,
                                     mob = inflow.mob_col,
                                     score_col = inflow.score_col,
                                     target = inflow.target_col,
                                     debt = inflow.debt_col,
                                     debt_ = inflow.debt_col,
                                     amount = inflow.amount_col,
                                     target_ = inflow.target_col,
                                     score_col_ = inflow.score_col,
                                     fact_table = inflow.fact_table,
                                     score_table = global_config.score_table,
                                     appl_id = inflow.fact_appl_id_col,
                                     appl_id_ = inflow.score_appl_id_col
                                    )
            path = global_config.path_1
        else:
            script = sql.stage_2.format(table_name = inflow.table_name, 
                                     table_name_ = inflow.table_name,
                                     agreement_gen = inflow.agreement_gen,
                                     report_gen = inflow.report_dt,
                                     mob = inflow.mob_col,
                                     score_col = inflow.score_col,
                                     target = inflow.target_col,
                                     debt = inflow.debt_col,
                                     debt_ = inflow.debt_col,
                                     amount = inflow.amount_col,
                                     target_ = inflow.target_col,
                                     new_score = self.sql_stage_2,
                                     fact_table = inflow.fact_table,
                                     score_table = global_config.score_table,
                                     appl_id = inflow.fact_appl_id_col,
                                     appl_id_ = inflow.score_appl_id_col
                                    )
            path = global_config.path_2
        return script, path
    
    def calibrate(self):
        '''
        Метод запуска всех необходимых стадий калибровки.
        '''
        self.stage_1()
        self.stage_2()
        self.stage_final()
    
    def stage_1(self):
        '''
        Метод первой стадии калибровки, в которой зависимости от метода 
        (что мы фиксируем: квантиль или моб) производится группировка таблицы 
        и её передача в методы построения калибровочных функций и подбвора
        коэффициентов на каждый моб/квантиль.
        '''
        if self.method == 'mob':
            
            for func in self.params_grid['func']:
                func_res = pd.DataFrame()
                for quantile in self.params_grid['quantile']:
                    df, mob, model, fact = self.preprocess(self.df, method = self.method, quantile = quantile)
                    r2, a, b = eval('self.'+func)(mob, fact)
                    df_iter_row = pd.DataFrame({
                        'quantile' : quantile,
                        'mean_model' : model.mean(),
                        'r2' : r2,
                        'a' : a,
                        'b': b
                    }, index = [quantile])
                    func_res = pd.concat([func_res, df_iter_row], axis = 0)
             
                self.func_res[func] = func_res
        else:
            for func in self.params_grid['func']:
                func_res = pd.DataFrame()
                for mob in self.params_grid['mob']:
                    df, quantile, model, fact = self.preprocess(self.df, method = self.method, mob = mob)
                    r2, a, b = eval('self.'+func)(model, fact)
                    df_iter_row = pd.DataFrame({
                        'mob' : mob,
                        'r2' : r2,
                        'a' : a,
                        'b': b
                    }, index = [mob])
                    func_res = pd.concat([func_res, df_iter_row], axis = 0)

                self.func_res[func] = func_res

    def stage_2(self):
        '''
        Метод второй стадии калибровки, в которой зависимости от метода 
        (что мы фиксируем: квантиль или моб) производится аппроксимация 
        полученных коэффициентов на первой стадии.
        '''
        best_func_key = max(self.func_res, key=lambda k: self.func_res[k]['r2'].mean())
        calibration_data = self.func_res[best_func_key]

        results_a = self.evaluate_model(calibration_data, 'a')
        results_b = self.evaluate_model(calibration_data, 'b')

        best_func_a, best_a, best_b_a = self.find_best_model(results_a)
        best_func_b, best_a_b, best_b = self.find_best_model(results_b)
        
        if self.method == 'mob':
            calibration_data['a_model'], self.sql_a = self.apply_func(best_func_a, calibration_data['mean_model'].to_numpy(), best_a, best_b_a, sql=True)
            calibration_data['b_model'], self.sql_b = self.apply_func(best_func_b, calibration_data['mean_model'].to_numpy(), best_a_b, best_b, sql=True)
        else:
            calibration_data['a_model'], self.sql_a = self.apply_func(best_func_a, calibration_data['mob'].to_numpy(), best_a, best_b_a, sql=True)
            calibration_data['b_model'], self.sql_b = self.apply_func(best_func_b, calibration_data['mob'].to_numpy(), best_a_b, best_b, sql=True)

        self.plot_results(calibration_data.iloc[:, 0], calibration_data['a'], calibration_data['a_model'], f"Coef A = {self.sql_a}")
        self.plot_results(calibration_data.iloc[:, 0], calibration_data['b'], calibration_data['b_model'], f"Coef B = {self.sql_b}")

        self.sql_stage_2 = self.get_sql(best_func_key, self.sql_a, self.sql_b)
        
    def stage_final(self):
        '''
        Финальный метод, в котором после получения необходимой 
        калибровочной функции от моба и скора производится подключение 
        к GP и выгрузка итоговой таблицы, а также вывод результатов 
        в разрезе мобов, квантилей, отчётных дат и дат выдач.
        '''
        self.initialize_greenplum(2)
        final_df = pd.read_csv(global_config.path_2)
        
        for col in ['mob', 'quantile', 'report_gen', 'agreement_gen']:
            df, x, model, fact = self.preprocess(final_df, groupby_col = col)
            self.plot_results(x, fact, model, col.upper())
        
    def preprocess(self, df_calib, method = None, mob = None, quantile = None, groupby_col = None):
        '''
        Метод группировки калибровочной таблицы в зависимости от метода и стадии калибровки.
        '''
        if method == 'quantile':
            
            df_calib = df_calib[df_calib['mob'] == mob]
            df_calib = df_calib.groupby([method]) \
                .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
                .reset_index()
            
            quantile = np.array(df_calib['quantile'])
            model = np.array(df_calib['score']/df_calib['cnt'])
            fact = np.array(df_calib['debt']/df_calib['total_debt'])

            return df_calib, quantile, model, fact
        
        if method == 'mob':
            
            df_calib = df_calib[df_calib['quantile'] == quantile]
            df_calib = df_calib.groupby([method]) \
                .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
                .reset_index()
            
            df_calib = df_calib[(df_calib['cnt']/df_calib['cnt'].sum())*100 >= 0.1]
            
            mob = np.array(df_calib['mob'])
            model = np.array(df_calib['score']/df_calib['cnt'])
            fact = np.array(df_calib['debt']/df_calib['total_debt'])
            
            return df_calib, mob, model, fact
        
        else:
            
            df_calib = df_calib.groupby([groupby_col]) \
            .agg({'debt':'sum', 'total_debt':'sum', 'score':'sum', 'cnt':'sum' }) \
            .reset_index()
            
            x = np.array(df_calib[groupby_col])
            model = np.array(df_calib['score']/df_calib['total_debt'])
            fact = np.array(df_calib['debt']/df_calib['total_debt'])
            
            return df_calib, x, model, fact
        
    def evaluate_model(self, model_data, parameter):
        '''
        Метод перебора калибровочных функций для аппроксимации коэффициентов.
        '''
        results = pd.DataFrame()
        for func_name in self.params_grid['func']:
            x = model_data['mean_model'].to_numpy() if self.method == 'mob' else model_data['mob'].to_numpy()
            r2, a, b = eval(f'self.{func_name}')(x, model_data[parameter].to_numpy())
            results = pd.concat([results, pd.DataFrame({
                'func': func_name,
                'r2': r2,
                'a': a,
                'b': b
            }, index=[0])], ignore_index=True)

        return results

    def find_best_model(self, results):
        '''
        Метод нахождения наилучшей калибровочной функции.
        '''
        best_idx = results['r2'].idxmax()
        best_func_name = results.loc[best_idx, 'func']
        a, b = results.loc[best_idx, ['a', 'b']]
        return best_func_name, a, b

    def plot_results(self, group, fact, model, title):
        '''
        Метод вывода результирующих графиков калибровки.
        '''        
        plt.figure(figsize=(20, 15))
        plt.scatter(list(map(str, group)), fact, color = 'Black')
        plt.plot(list(map(str, group)), fact, label = 'Fact', color = 'Blue')
        plt.plot(list(map(str, group)), model, label = 'Model', color = 'Red')
        plt.xticks(rotation=60)
        plt.title(title)
        plt.legend()
        plt.show()
    
    '''
    Блок методов с функциями калибровки.
    '''
    def Linear(self, x, fact, plot = False):
        
        linear_model = LinearRegression()
        linear_model.fit(x.reshape(-1, 1), fact)
        predicts = linear_model.predict(x.reshape(-1, 1))
        
        rmse = mean_squared_error(fact, predicts,  squared=False)
        mape = mean_absolute_percentage_error(fact, predicts)
        r2 = r2_score(fact, predicts)
        
        if plot == True:
            self.plot_calib(x, x, fact,  linear_model.coef_, linear_model.intercept_, 'Linear')
        
        return r2, linear_model.coef_, linear_model.intercept_
    
    def Ln(self, x, fact, plot = False):
        
        if x[0] == 0:
            x = x[1:]
            fact = fact[1:]
            
        coef = np.polyfit(np.log(x), fact, 1)
        coef = list(coef)
        polyline = np.linspace(np.min(x), np.max(x), 100)
        predicts = coef[0]*np.log(x) + coef[1]
        
        rmse = mean_squared_error(fact, predicts,  squared=False)
        mape = mean_absolute_percentage_error(fact, predicts)
        r2 = r2_score(fact, predicts)
        
        if plot == True:
            self.plot_calib(polyline, x, fact,  coef[0], coef[1], 'Ln')
        
        return r2, coef[0], coef[1]
    
    def Power(self, x, fact, plot = False):
        
        if x[0] == 0:
            x = x[1:]
            fact = fact[1:]
        
        coef = np.polyfit(np.log(x), np.log(fact), 1)
        coef = list(coef)
        polyline = np.linspace(np.min(x), np.max(x), 100)
        predicts = np.exp(coef[1])*(x**coef[0])
        
        rmse = mean_squared_error(fact, predicts,  squared=False)
        mape = mean_absolute_percentage_error(fact, predicts)
        r2 = r2_score(fact, predicts)
        
        if plot == True:
            self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Power')
        
        return r2, np.exp(coef[1]), coef[0]
    
    def Exp(self, x, fact, plot = False):
        
        coef = np.polyfit(x, np.log(fact), 1)
        coef = list(coef)
        polyline = np.linspace(np.min(x), np.max(x), 100)
        predicts = np.exp(coef[1])*np.exp(coef[0]*x)
        
        rmse = mean_squared_error(fact, predicts,  squared=False)
        mape = mean_absolute_percentage_error(fact, predicts)
        r2 = r2_score(fact, predicts)
        
        if plot == True:
            self.plot_calib(polyline, x, fact, np.exp(coef[1]), coef[0], 'Exp')
        
        return r2, np.exp(coef[1]), coef[0]
    
    def plot_calib(self, polyline, x, fact, a, b, func):
        
        plt.figure(figsize=(20, 15))
        plt.scatter(x, fact, color = 'Black')
        plt.plot(polyline, self.apply_func(func, polyline, a, b), label = 'Model', color = 'Red')
        plt.plot(x, fact, label = 'Fact', color = 'Blue')
        plt.legend()
        plt.show()
    
    def apply_func(self, func, x, a, b, sql=False):
        '''
        Метод приминения калибровочных функций к данным.
        '''
        operations = {
            'Linear': (lambda x, a, b: a*x+b, '({} * {} + {})'),
            'Ln': (lambda x, a, b: a*np.log(x)+b, '({} * ln({}) + {})'),
            'Power': (lambda x, a, b: a*(x**b), '({} * power({}, {}))'),
            'Exp': (lambda x, a, b: a*np.exp(x*b), '({} * exp({} * {}))'),
        }

        calc, sql_template = operations[func]
        result = calc(x, a, b)

        if self.method == 'mob':
            sql_str = sql_template.format(a, f'c.{inflow.score_col}', b)
        else:
            sql_str = sql_template.format(a, f'p.{inflow.mob_col}', b)

        return (result, sql_str) if sql else result

    def get_sql(self, best_func, func_a, func_b):
        '''
        Метод формирования итогового SQL-скрипта с калибровочной функцией.
        '''
        sql_patterns = {
            'Linear': '{} * {} + {}',
            'Ln': '{} * ln({}) + {}',
            'Power': '{} * power({}, {})',
            'Exp': '{} * exp({} * {})',
            }

        column = f'p.{inflow.mob_col}' if self.method == 'mob' else f'c.{inflow.score_col}'
        sql_template = sql_patterns[best_func]
        return sql_template.format(func_a, column, func_b)
Leave a Comment