Untitled
import time from tqdm import tqdm from models import fiscal_data, fund_data, transformed class DataFetcher: """ A class to handle fetching data from the raw database. """ def __init__(self, session): self.session = session def fetch_fiscal_data(self): """ Fetch all data from the fiscal database. """ data = {} # List of models to fetch data from models = [ fiscal_data.Summary, fiscal_data.SummaryQ, fiscal_data.Balance, fiscal_data.BalanceQ, fiscal_data.Cashflow, fiscal_data.CashflowQ, fiscal_data.Income, fiscal_data.IncomeQ, fiscal_data.Ratio, fiscal_data.RatioQ, fiscal_data.TblCur, fiscal_data.TblFormat, ] for model in models: start = time.time() # Query all records from the model's table data[model.__name__] = self.session.query(model).all() print(f"Query {model.__name__} time: {time.time() - start}") return data def fetch_fund_data(self): """ Fetch all data from the fund database. """ data = {} # List of models to fetch data from models = [ fund_data.FPrimaryinfo, fund_data.FSector, ] for model in models: start = time.time() # Query all records from the model's table data[model.__name__] = self.session.query(model).all() print(f"Query {model.__name__} time: {time.time() - start}") return data class DataTransformer: """ A class to handle transforming raw data into the desired format. """ def __init__(self, fiscal_data, fund_data): self.fiscal_data = fiscal_data self.currency_dict = self.build_currency_dict(fiscal_data["TblCur"]) self.unit_dict = self.build_unit_dict(fiscal_data["TblFormat"]) self.longname_dict = self.build_longname_dict( fund_data["FPrimaryinfo"]) self.shortname_dict = self.build_shortname_dict( fund_data["FPrimaryinfo"]) self.subsec_dict = self.build_subsec_dict(fund_data["FPrimaryinfo"]) self.sector_dict, self.sector_name_dict = self.build_sector_dict( fund_data["FPrimaryinfo"], fund_data["FSector"]) self.d_balance = {} self.d_balanceQ = {} self.d_cashflow = {} self.d_cashflowQ = {} self.d_income = {} self.d_incomeQ = {} self.d_ratio = {} self.d_ratioQ = {} self.map_ids_to_indices(self.fiscal_data["Balance"], self.d_balance) self.map_ids_to_indices(self.fiscal_data["BalanceQ"], self.d_balanceQ) self.map_ids_to_indices(self.fiscal_data["Cashflow"], self.d_cashflow) self.map_ids_to_indices( self.fiscal_data["CashflowQ"], self.d_cashflowQ) self.map_ids_to_indices(self.fiscal_data["Income"], self.d_income) self.map_ids_to_indices(self.fiscal_data["IncomeQ"], self.d_incomeQ) self.map_ids_to_indices(self.fiscal_data["Ratio"], self.d_ratio) self.map_ids_to_indices(self.fiscal_data["RatioQ"], self.d_ratioQ) def map_ids_to_indices(self, source, dest_dict): for i, item in tqdm(enumerate(source)): dest_dict[item.s_id] = i def build_unit_dict(self, fiscal_data_units): """ Build a dictionary of unit ids to unit names. """ d_unit = {} for unit in fiscal_data_units: d_unit[unit.id] = unit.f_value return d_unit def build_currency_dict(self, fiscal_data_currency): """ Build a dictionary of currency ids to currency names. """ d_currency = {} for currency in fiscal_data_currency: d_currency[currency.id] = currency.cur_code return d_currency def build_longname_dict(self, fund_data_primaryinfo): """ Build a dictionary of symbols to long names. """ d_longname = {} for i, subsec in tqdm(enumerate(fund_data_primaryinfo)): d_longname[subsec.f_symbol] = subsec.f_longname return d_longname def build_shortname_dict(self, fund_data_primaryinfo): """ Build a dictionary of symbols to short names. """ d_shortname = {} for i, subsec in tqdm(enumerate(fund_data_primaryinfo)): d_shortname[subsec.f_symbol] = subsec.f_shortname return d_shortname def build_subsec_dict(self, fund_data_primaryinfo): """ Build a dictionary of symbols to subsector names. """ d_subsector = {} for i, subsec in tqdm(enumerate(fund_data_primaryinfo)): d_subsector[subsec.f_symbol] = subsec.f_subsec return d_subsector def build_sector_dict(self, fund_data_primaryinfo, fund_data_sector): """ Build a dictionary of sector to subsec. """ d_sector = {} d_sector_name = {} for i, sector in tqdm(enumerate(fund_data_sector)): d_sector[sector.s_id] = sector.s_name for fund in fund_data_primaryinfo: sector_name = d_sector.get(fund.f_sid) if sector_name: d_sector_name[fund.f_symbol] = sector_name return d_sector, d_sector_name def transform_balances(self): # Balance raw_balances = self.fiscal_data["Balance"] raw_summaries = self.fiscal_data["Summary"] balances = [] for summary in tqdm(raw_summaries): try: s_id = summary.s_id unit = self.unit_dict[summary.s_unit] currency = self.currency_dict[summary.s_cur] if currency != "THB": continue if self.d_balance.get(s_id) is None: continue balance = raw_balances[self.d_balance.get(s_id)] subsec = self.subsec_dict[summary.s_symbol] shortname = self.shortname_dict[summary.s_symbol] longname = self.longname_dict[summary.s_symbol] sector = self.sector_name_dict[summary.s_symbol] except KeyError: continue # Skip this iteration if subsec is not found # transform balance try: transform_balance = transformed.Balance( s_id=summary.s_id, symbol=summary.s_symbol, year=summary.s_year, exch=summary.s_exch, currency=currency, subsec=subsec, sector=sector, shortname=shortname, longname=longname, cce=unit * balance.b_cacbb, shortterm_invest=unit * balance.b_casti, inventory=unit * balance.b_castock, accounts_receivables=unit * balance.b_cadebt, other_current_assets=unit * balance.b_caoca, total_current_assets=unit * balance.b_catotal, shortterm_debt=unit * balance.b_clloans, accounts_payable=unit * balance.b_clcredit, taxes_payable=unit * balance.b_cltaxation, dividend_payable=unit * balance.b_cldividend, other_current_liabilities=unit * balance.b_clocl, total_current_liabilities=unit * balance.b_cltotal, net_current_assets=unit * balance.b_netasset, ppe=unit * balance.b_lappe, longterm_invest=unit * balance.b_lainvest, intangible_assets=unit * balance.b_laintasset, other_longterm_assets=unit * balance.b_laolta, total_long_termassets=unit * balance.b_latotal, share_capital=unit * balance.b_sfshare, treasury_shares=unit * balance.b_sftreasury, reserves=unit * balance.b_sfreserve, total_shareholders_equity=unit * balance.b_sftotal, minority_interest=unit * balance.b_mininterest, longterm_liabilities=unit * balance.b_longterm, total_debt=unit * balance.b_cltotal + unit * balance.b_longterm, ) except Exception as e: continue # Skip this iteration if subsec is not found balances.append(transform_balance) return balances def transform_balances_q(self): # BalanceQ raw_balancesQ = self.fiscal_data["BalanceQ"] raw_summariesQ = self.fiscal_data["SummaryQ"] balancesQ = [] for summaryQ in tqdm(raw_summariesQ): try: s_id = summaryQ.s_id unit = self.unit_dict[summaryQ.s_unit] currency = self.currency_dict[summaryQ.s_cur] if currency != "THB": continue if self.d_balanceQ.get(s_id) is None: continue balanceQ = raw_balancesQ[self.d_balanceQ.get(s_id)] subsec = self.subsec_dict[summaryQ.s_symbol] shortname = self.shortname_dict[summaryQ.s_symbol] longname = self.longname_dict[summaryQ.s_symbol] sector = self.sector_name_dict[summaryQ.s_symbol] except KeyError: continue # Skip this iteration if subsec is not found # transform balance quarterly try: transform_balanceQ = transformed.BalanceQ( s_id=summaryQ.s_id, symbol=summaryQ.s_symbol, shortname=shortname, longname=longname, subsec=subsec, sector=sector, year=summaryQ.s_year, exch=summaryQ.s_exch, quarter=summaryQ.s_quarter, currency=currency, cce=unit * balanceQ.b_cacbb, shortterm_invest=unit * balanceQ.b_casti, inventory=unit * balanceQ.b_castock, accounts_receivables=unit * balanceQ.b_cadebt, other_current_assets=unit * balanceQ.b_caoca, total_current_assets=unit * balanceQ.b_catotal, shortterm_debt=unit * balanceQ.b_clloans, accounts_payable=unit * balanceQ.b_clcredit, taxes_payable=unit * balanceQ.b_cltaxation, dividend_payable=unit * balanceQ.b_cldividend, other_current_liabilities=unit * balanceQ.b_clocl, total_current_liabilities=unit * balanceQ.b_cltotal, net_current_assets=unit * balanceQ.b_netasset, ppe=unit * balanceQ.b_lappe, longterm_invest=unit * balanceQ.b_lainvest, intangible_assets=unit * balanceQ.b_laintasset, other_longterm_assets=unit * balanceQ.b_laolta, total_long_termassets=unit * balanceQ.b_latotal, share_capital=unit * balanceQ.b_sfshare, treasury_shares=unit * balanceQ.b_sftreasury, reserves=unit * balanceQ.b_sfreserve, total_shareholders_equity=unit * balanceQ.b_sftotal, minority_interest=unit * balanceQ.b_mininterest, longterm_liabilities=unit * balanceQ.b_longterm, total_debt=unit * balanceQ.b_cltotal + unit * balanceQ.b_longterm, ) except Exception as e: continue balancesQ.append(transform_balanceQ) return balancesQ def transform_cashflows(self): # Cashflow raw_cashflow = self.fiscal_data["Cashflow"] raw_summaries = self.fiscal_data["Summary"] cashflows = [] for summary in tqdm(raw_summaries): try: s_id = summary.s_id unit = self.unit_dict[summary.s_unit] currency = self.currency_dict[summary.s_cur] if currency != "THB": continue if self.d_cashflow.get(s_id) is None: continue cashflow = raw_cashflow[self.d_cashflow.get(s_id)] subsec = self.subsec_dict[summary.s_symbol] shortname = self.shortname_dict[summary.s_symbol] longname = self.longname_dict[summary.s_symbol] sector = self.sector_name_dict[summary.s_symbol] except KeyError: continue # Skip this iteration if subsec is not found # transform balance try: transform_cashflow = transformed.Cashflow( s_id=summary.s_id, symbol=summary.s_symbol, shortname=shortname, longname=longname, sector=sector, subsec=subsec, year=summary.s_year, exch=summary.s_exch, currency=currency, cfo=unit * cashflow.c_opeact, cfi=unit * cashflow.c_investact, cff=unit * cashflow.c_financeact, net_change_in_cash=unit * cashflow.c_netchange, balance_bf=unit * cashflow.c_brought, balance_cf=unit * cashflow.c_carried, ) except Exception as e: continue cashflows.append(transform_cashflow) return cashflows def transform_cashflows_q(self): # CashflowQ raw_cashflowQ = self.fiscal_data["CashflowQ"] raw_summariesQ = self.fiscal_data["SummaryQ"] cashflowsQ = [] for summaryQ in tqdm(raw_summariesQ): try: s_id = summaryQ.s_id unit = self.unit_dict[summaryQ.s_unit] currency = self.currency_dict[summaryQ.s_cur] if currency != "THB": continue if self.d_cashflowQ.get(s_id) is None: continue cashflowQ = raw_cashflowQ[self.d_cashflowQ.get(s_id)] subsec = self.subsec_dict[summaryQ.s_symbol] shortname = self.shortname_dict[summaryQ.s_symbol] longname = self.longname_dict[summaryQ.s_symbol] sector = self.sector_name_dict[summaryQ.s_symbol] except KeyError: continue # Skip this iteration if subsec is not found # transform cashflow try: transform_cashflowQ = transformed.CashflowQ( s_id=summaryQ.s_id, symbol=summaryQ.s_symbol, year=summaryQ.s_year, shortname=shortname, longname=longname, subsec=subsec, sector=sector, exch=summaryQ.s_exch, quarter=summaryQ.s_quarter, currency=currency, cfo=unit * cashflowQ.c_opeact, cfi=unit * cashflowQ.c_investact, cff=unit * cashflowQ.c_financeact, net_change_in_cash=unit * cashflowQ.c_netchange, balance_bf=unit * cashflowQ.c_brought, balance_cf=unit * cashflowQ.c_carried, ) except Exception as e: continue cashflowsQ.append(transform_cashflowQ) return cashflowsQ def transform_incomes(self): # Income raw_income = self.fiscal_data["Income"] raw_summaries = self.fiscal_data["Summary"] incomes = [] for summary in tqdm(raw_summaries): try: s_id = summary.s_id unit = self.unit_dict[summary.s_unit] currency = self.currency_dict[summary.s_cur] if currency != "THB": continue if self.d_income.get(s_id) is None: continue income = raw_income[self.d_income.get(s_id)] subsec = self.subsec_dict[summary.s_symbol] shortname = self.shortname_dict[summary.s_symbol] longname = self.longname_dict[summary.s_symbol] sector = self.sector_name_dict[summary.s_symbol] except KeyError: continue # Skip this iteration if subsec is not found # transform income try: transform_income = transformed.Income( s_id=summary.s_id, symbol=summary.s_symbol, shortname=shortname, longname=longname, subsec=subsec, sector=sector, year=summary.s_year, exch=summary.s_exch, currency=currency, revenue=unit * income.i_turnover, operating_profit=unit * income.i_oprofit, pbt=unit * income.i_probtax, tax=unit * income.i_taxation, pat=unit * income.i_proatax, extraord_items=unit * income.i_extraord, minority_interest=unit * income.i_mininterest, net_income=unit * income.i_netproshare, ) except Exception as e: continue incomes.append(transform_income) return incomes def transform_incomes_q(self): # IncomeQ raw_incomeQ = self.fiscal_data["IncomeQ"] raw_summariesQ = self.fiscal_data["SummaryQ"] incomesQ = [] for summaryQ in tqdm(raw_summariesQ): try: s_id = summaryQ.s_id unit = self.unit_dict[summaryQ.s_unit] currency = self.currency_dict[summaryQ.s_cur] if currency != "THB": continue if self.d_incomeQ.get(s_id) is None: continue incomeQ = raw_incomeQ[self.d_incomeQ.get(s_id)] subsec = self.subsec_dict[summaryQ.s_symbol] shortname = self.shortname_dict[summaryQ.s_symbol] longname = self.longname_dict[summaryQ.s_symbol] sector = self.sector_name_dict[summaryQ.s_symbol] except KeyError: continue # Skip this iteration if subsec is not found # transform income quarter try: transform_incomeQ = transformed.IncomeQ( s_id=summaryQ.s_id, symbol=summaryQ.s_symbol, shortname=shortname, longname=longname, subsec=subsec, sector=sector, year=summaryQ.s_year, exch=summaryQ.s_exch, quarter=summaryQ.s_quarter, currency=currency, revenue=unit * incomeQ.i_turnover, operating_profit=unit * incomeQ.i_oprofit, pbt=unit * incomeQ.i_probtax, tax=unit * incomeQ.i_taxation, pat=unit * incomeQ.i_proatax, extraord_items=unit * incomeQ.i_extraord, minority_interest=unit * incomeQ.i_mininterest, net_income=unit * incomeQ.i_netproshare, ) except Exception as e: continue incomesQ.append(transform_incomeQ) return incomesQ def transform_ratios(self): # Ratio raw_ratio = self.fiscal_data["Ratio"] raw_summaries = self.fiscal_data["Summary"] raw_income = self.fiscal_data["Income"] ratios = [] for summary in tqdm(raw_summaries): try: s_id = summary.s_id unit = self.unit_dict[summary.s_unit] currency = self.currency_dict[summary.s_cur] if currency != "THB": continue if self.d_ratio.get(s_id) is None: continue income = raw_income[self.d_income.get(s_id)] ratio = raw_ratio[self.d_ratio.get(s_id)] subsec = self.subsec_dict[summary.s_symbol] shortname = self.shortname_dict[summary.s_symbol] longname = self.longname_dict[summary.s_symbol] sector = self.sector_name_dict[summary.s_symbol] except KeyError: continue # Skip this iteration if subsec is not found # transform ratio try: transform_ratio = transformed.Ratio( s_id=summary.s_id, symbol=summary.s_symbol, shortname=shortname, longname=longname, subsec=subsec, sector=sector, year=summary.s_year, exch=summary.s_exch, currency=currency, stock_price=ratio.r_stock, roe=ratio.r_roe, rota=ratio.r_rota, net_profit_margin=ratio.r_ror, pe=ratio.r_pe, dy=ratio.r_dy, dpr=ratio.r_dp * 100, current=ratio.r_current, quick=ratio.r_quick, de=ratio.r_leverage, icr=ratio.r_intcover, debt=ratio.r_debt, eps=ratio.r_eps, dps=ratio.r_dividend, operating_profit_margin=( unit * income.i_oprofit) / (unit * income.i_turnover) * 100, ) except Exception as e: continue ratios.append(transform_ratio) return ratios def transform_ratios_q(self): # RatioQ raw_ratioQ = self.fiscal_data["RatioQ"] raw_summariesQ = self.fiscal_data["SummaryQ"] raw_incomeQ = self.fiscal_data["IncomeQ"] ratiosQ = [] for summaryQ in tqdm(raw_summariesQ): try: s_id = summaryQ.s_id unit = self.unit_dict[summaryQ.s_unit] currency = self.currency_dict[summaryQ.s_cur] if currency != "THB": continue if self.d_ratioQ.get(s_id) is None: continue incomeQ = raw_incomeQ[self.d_incomeQ.get(s_id)] ratioQ = raw_ratioQ[self.d_ratioQ.get(s_id)] subsec = self.subsec_dict[summaryQ.s_symbol] shortname = self.shortname_dict[summaryQ.s_symbol] longname = self.longname_dict[summaryQ.s_symbol] sector = self.sector_name_dict[summaryQ.s_symbol] except KeyError: continue # Skip this iteration if subsec is not found try: transform_ratioQ = transformed.RatioQ( s_id=summaryQ.s_id, symbol=summaryQ.s_symbol, shortname=shortname, longname=longname, subsec=subsec, sector=sector, year=summaryQ.s_year, exch=summaryQ.s_exch, quarter=summaryQ.s_quarter, currency=currency, stock_price=ratioQ.r_stock, roe=ratioQ.r_roe, rota=ratioQ.r_rota, net_profit_margin=ratioQ.r_ror, pe=ratioQ.r_pe, dy=ratioQ.r_dy, dpr=ratioQ.r_dp * 100, current=ratioQ.r_current, quick=ratioQ.r_quick, de=ratioQ.r_leverage, eps=ratioQ.r_eps, dps=ratioQ.r_dividend, operating_profit_margin=( unit * incomeQ.i_oprofit) / (unit * incomeQ.i_turnover) * 100, ) except Exception as e: continue ratiosQ.append(transform_ratioQ) return ratiosQ def transform_all(self): """ Transform all raw data to the desired format. """ return { "balances": self.transform_balances(), "balances_q": self.transform_balances_q(), "cashflows": self.transform_cashflows(), "cashflows_q": self.transform_cashflows_q(), "incomes": self.transform_incomes(), "incomes_q": self.transform_incomes_q(), "ratios": self.transform_ratios(), "ratios_q": self.transform_ratios_q(), } class DataDeleter: """ A class to handle bulk deleting data from the source database. """ def __init__(self, session): self.session = session def delete_transformed_data(self): """ Delete all data from the transformed database. """ # List of models to fetch data from models = [ transformed.Balance, transformed.BalanceQ, transformed.Cashflow, transformed.CashflowQ, transformed.Income, transformed.IncomeQ, transformed.Ratio, transformed.RatioQ, ] for model in models: start = time.time() # Delete all records from the model's table self.session.query(model).delete(synchronize_session=False) print(f"Delete {model.__name__} time: {time.time() - start}") self.session.commit() return True class DataCommitter: """ A class to handle bulk committing transformed data to the target database. """ def __init__(self, session): self.session = session def bulk_commit(self, data, chunk_size=1000): """ Commit data in bulk with specified chunk size. """ for i in range((len(data) // chunk_size) + 1): start = time.time() self.session.add_all(data[i * chunk_size: (i + 1) * chunk_size]) self.session.commit() print(f"Commit time: {time.time() - start}")
Leave a Comment