Untitled

mail@pastecode.io avatar
unknown
plain_text
5 months ago
27 kB
3
Indexable
import time
from tqdm import tqdm
from models import fiscal_data, fund_data, transformed


class DataFetcher:
    """
    A class to handle fetching data from the raw database.
    """

    def __init__(self, session):
        self.session = session

    def fetch_fiscal_data(self):
        """
        Fetch all data from the fiscal database.
        """
        data = {}
        # List of models to fetch data from
        models = [
            fiscal_data.Summary,
            fiscal_data.SummaryQ,
            fiscal_data.Balance,
            fiscal_data.BalanceQ,
            fiscal_data.Cashflow,
            fiscal_data.CashflowQ,
            fiscal_data.Income,
            fiscal_data.IncomeQ,
            fiscal_data.Ratio,
            fiscal_data.RatioQ,
            fiscal_data.TblCur,
            fiscal_data.TblFormat,
        ]
        for model in models:
            start = time.time()
            # Query all records from the model's table
            data[model.__name__] = self.session.query(model).all()
            print(f"Query {model.__name__} time: {time.time() - start}")
        return data

    def fetch_fund_data(self):
        """
        Fetch all data from the fund database.
        """
        data = {}
        # List of models to fetch data from
        models = [
            fund_data.FPrimaryinfo,
            fund_data.FSector,
        ]
        for model in models:
            start = time.time()
            # Query all records from the model's table
            data[model.__name__] = self.session.query(model).all()
            print(f"Query {model.__name__} time: {time.time() - start}")
        return data


class DataTransformer:
    """
    A class to handle transforming raw data into the desired format.
    """

    def __init__(self, fiscal_data, fund_data):
        self.fiscal_data = fiscal_data
        self.currency_dict = self.build_currency_dict(fiscal_data["TblCur"])
        self.unit_dict = self.build_unit_dict(fiscal_data["TblFormat"])
        self.longname_dict = self.build_longname_dict(
            fund_data["FPrimaryinfo"])
        self.shortname_dict = self.build_shortname_dict(
            fund_data["FPrimaryinfo"])
        self.subsec_dict = self.build_subsec_dict(fund_data["FPrimaryinfo"])
        self.sector_dict, self.sector_name_dict = self.build_sector_dict(
            fund_data["FPrimaryinfo"], fund_data["FSector"])

        self.d_balance = {}
        self.d_balanceQ = {}
        self.d_cashflow = {}
        self.d_cashflowQ = {}
        self.d_income = {}
        self.d_incomeQ = {}
        self.d_ratio = {}
        self.d_ratioQ = {}

        self.map_ids_to_indices(self.fiscal_data["Balance"], self.d_balance)
        self.map_ids_to_indices(self.fiscal_data["BalanceQ"], self.d_balanceQ)
        self.map_ids_to_indices(self.fiscal_data["Cashflow"], self.d_cashflow)
        self.map_ids_to_indices(
            self.fiscal_data["CashflowQ"], self.d_cashflowQ)
        self.map_ids_to_indices(self.fiscal_data["Income"], self.d_income)
        self.map_ids_to_indices(self.fiscal_data["IncomeQ"], self.d_incomeQ)
        self.map_ids_to_indices(self.fiscal_data["Ratio"], self.d_ratio)
        self.map_ids_to_indices(self.fiscal_data["RatioQ"], self.d_ratioQ)

    def map_ids_to_indices(self, source, dest_dict):
        for i, item in tqdm(enumerate(source)):
            dest_dict[item.s_id] = i

    def build_unit_dict(self, fiscal_data_units):
        """
        Build a dictionary of unit ids to unit names.
        """
        d_unit = {}
        for unit in fiscal_data_units:
            d_unit[unit.id] = unit.f_value

        return d_unit

    def build_currency_dict(self, fiscal_data_currency):
        """
        Build a dictionary of currency ids to currency names.
        """
        d_currency = {}
        for currency in fiscal_data_currency:
            d_currency[currency.id] = currency.cur_code

        return d_currency

    def build_longname_dict(self, fund_data_primaryinfo):
        """
        Build a dictionary of symbols to long names.
        """
        d_longname = {}
        for i, subsec in tqdm(enumerate(fund_data_primaryinfo)):
            d_longname[subsec.f_symbol] = subsec.f_longname

        return d_longname

    def build_shortname_dict(self, fund_data_primaryinfo):
        """
        Build a dictionary of symbols to short names.
        """
        d_shortname = {}
        for i, subsec in tqdm(enumerate(fund_data_primaryinfo)):
            d_shortname[subsec.f_symbol] = subsec.f_shortname

        return d_shortname

    def build_subsec_dict(self, fund_data_primaryinfo):
        """
        Build a dictionary of symbols to subsector names.
        """
        d_subsector = {}
        for i, subsec in tqdm(enumerate(fund_data_primaryinfo)):
            d_subsector[subsec.f_symbol] = subsec.f_subsec

        return d_subsector

    def build_sector_dict(self, fund_data_primaryinfo, fund_data_sector):
        """
        Build a dictionary of sector to subsec.
        """
        d_sector = {}
        d_sector_name = {}

        for i, sector in tqdm(enumerate(fund_data_sector)):
            d_sector[sector.s_id] = sector.s_name

        for fund in fund_data_primaryinfo:
            sector_name = d_sector.get(fund.f_sid)
            if sector_name:
                d_sector_name[fund.f_symbol] = sector_name

        return d_sector, d_sector_name

    def transform_balances(self):
        # Balance
        raw_balances = self.fiscal_data["Balance"]
        raw_summaries = self.fiscal_data["Summary"]

        balances = []
        for summary in tqdm(raw_summaries):
            try:
                s_id = summary.s_id
                unit = self.unit_dict[summary.s_unit]
                currency = self.currency_dict[summary.s_cur]
                if currency != "THB":
                    continue
                if self.d_balance.get(s_id) is None:
                    continue
                balance = raw_balances[self.d_balance.get(s_id)]
                subsec = self.subsec_dict[summary.s_symbol]
                shortname = self.shortname_dict[summary.s_symbol]
                longname = self.longname_dict[summary.s_symbol]
                sector = self.sector_name_dict[summary.s_symbol]
            except KeyError:
                continue  # Skip this iteration if subsec is not found

            # transform balance
            try:
                transform_balance = transformed.Balance(
                    s_id=summary.s_id,
                    symbol=summary.s_symbol,
                    year=summary.s_year,
                    exch=summary.s_exch,
                    currency=currency,
                    subsec=subsec,
                    sector=sector,
                    shortname=shortname,
                    longname=longname,
                    cce=unit * balance.b_cacbb,
                    shortterm_invest=unit * balance.b_casti,
                    inventory=unit * balance.b_castock,
                    accounts_receivables=unit * balance.b_cadebt,
                    other_current_assets=unit * balance.b_caoca,
                    total_current_assets=unit * balance.b_catotal,
                    shortterm_debt=unit * balance.b_clloans,
                    accounts_payable=unit * balance.b_clcredit,
                    taxes_payable=unit * balance.b_cltaxation,
                    dividend_payable=unit * balance.b_cldividend,
                    other_current_liabilities=unit * balance.b_clocl,
                    total_current_liabilities=unit * balance.b_cltotal,
                    net_current_assets=unit * balance.b_netasset,
                    ppe=unit * balance.b_lappe,
                    longterm_invest=unit * balance.b_lainvest,
                    intangible_assets=unit * balance.b_laintasset,
                    other_longterm_assets=unit * balance.b_laolta,
                    total_long_termassets=unit * balance.b_latotal,
                    share_capital=unit * balance.b_sfshare,
                    treasury_shares=unit * balance.b_sftreasury,
                    reserves=unit * balance.b_sfreserve,
                    total_shareholders_equity=unit * balance.b_sftotal,
                    minority_interest=unit * balance.b_mininterest,
                    longterm_liabilities=unit * balance.b_longterm,
                    total_debt=unit * balance.b_cltotal + unit * balance.b_longterm,
                )
            except Exception as e:
                continue
            # Skip this iteration if subsec is not found
            balances.append(transform_balance)
        return balances

    def transform_balances_q(self):
        # BalanceQ
        raw_balancesQ = self.fiscal_data["BalanceQ"]
        raw_summariesQ = self.fiscal_data["SummaryQ"]

        balancesQ = []
        for summaryQ in tqdm(raw_summariesQ):
            try:
                s_id = summaryQ.s_id
                unit = self.unit_dict[summaryQ.s_unit]
                currency = self.currency_dict[summaryQ.s_cur]
                if currency != "THB":
                    continue
                if self.d_balanceQ.get(s_id) is None:
                    continue

                balanceQ = raw_balancesQ[self.d_balanceQ.get(s_id)]
                subsec = self.subsec_dict[summaryQ.s_symbol]
                shortname = self.shortname_dict[summaryQ.s_symbol]
                longname = self.longname_dict[summaryQ.s_symbol]
                sector = self.sector_name_dict[summaryQ.s_symbol]
            except KeyError:
                continue  # Skip this iteration if subsec is not found

            # transform balance quarterly
            try:
                transform_balanceQ = transformed.BalanceQ(
                    s_id=summaryQ.s_id,
                    symbol=summaryQ.s_symbol,
                    shortname=shortname,
                    longname=longname,
                    subsec=subsec,
                    sector=sector,
                    year=summaryQ.s_year,
                    exch=summaryQ.s_exch,
                    quarter=summaryQ.s_quarter,
                    currency=currency,
                    cce=unit * balanceQ.b_cacbb,
                    shortterm_invest=unit * balanceQ.b_casti,
                    inventory=unit * balanceQ.b_castock,
                    accounts_receivables=unit * balanceQ.b_cadebt,
                    other_current_assets=unit * balanceQ.b_caoca,
                    total_current_assets=unit * balanceQ.b_catotal,
                    shortterm_debt=unit * balanceQ.b_clloans,
                    accounts_payable=unit * balanceQ.b_clcredit,
                    taxes_payable=unit * balanceQ.b_cltaxation,
                    dividend_payable=unit * balanceQ.b_cldividend,
                    other_current_liabilities=unit * balanceQ.b_clocl,
                    total_current_liabilities=unit * balanceQ.b_cltotal,
                    net_current_assets=unit * balanceQ.b_netasset,
                    ppe=unit * balanceQ.b_lappe,
                    longterm_invest=unit * balanceQ.b_lainvest,
                    intangible_assets=unit * balanceQ.b_laintasset,
                    other_longterm_assets=unit * balanceQ.b_laolta,
                    total_long_termassets=unit * balanceQ.b_latotal,
                    share_capital=unit * balanceQ.b_sfshare,
                    treasury_shares=unit * balanceQ.b_sftreasury,
                    reserves=unit * balanceQ.b_sfreserve,
                    total_shareholders_equity=unit * balanceQ.b_sftotal,
                    minority_interest=unit * balanceQ.b_mininterest,
                    longterm_liabilities=unit * balanceQ.b_longterm,
                    total_debt=unit * balanceQ.b_cltotal + unit * balanceQ.b_longterm,
                )
            except Exception as e:
                continue
            balancesQ.append(transform_balanceQ)
        return balancesQ

    def transform_cashflows(self):
        # Cashflow
        raw_cashflow = self.fiscal_data["Cashflow"]
        raw_summaries = self.fiscal_data["Summary"]

        cashflows = []
        for summary in tqdm(raw_summaries):

            try:
                s_id = summary.s_id
                unit = self.unit_dict[summary.s_unit]
                currency = self.currency_dict[summary.s_cur]
                if currency != "THB":
                    continue
                if self.d_cashflow.get(s_id) is None:
                    continue

                cashflow = raw_cashflow[self.d_cashflow.get(s_id)]
                subsec = self.subsec_dict[summary.s_symbol]
                shortname = self.shortname_dict[summary.s_symbol]
                longname = self.longname_dict[summary.s_symbol]
                sector = self.sector_name_dict[summary.s_symbol]
            except KeyError:
                continue  # Skip this iteration if subsec is not found

            # transform balance
            try:
                transform_cashflow = transformed.Cashflow(
                    s_id=summary.s_id,
                    symbol=summary.s_symbol,
                    shortname=shortname,
                    longname=longname,
                    sector=sector,
                    subsec=subsec,
                    year=summary.s_year,
                    exch=summary.s_exch,
                    currency=currency,
                    cfo=unit * cashflow.c_opeact,
                    cfi=unit * cashflow.c_investact,
                    cff=unit * cashflow.c_financeact,
                    net_change_in_cash=unit * cashflow.c_netchange,
                    balance_bf=unit * cashflow.c_brought,
                    balance_cf=unit * cashflow.c_carried,
                )
            except Exception as e:
                continue
            cashflows.append(transform_cashflow)
        return cashflows

    def transform_cashflows_q(self):
        # CashflowQ
        raw_cashflowQ = self.fiscal_data["CashflowQ"]
        raw_summariesQ = self.fiscal_data["SummaryQ"]

        cashflowsQ = []
        for summaryQ in tqdm(raw_summariesQ):
            try:
                s_id = summaryQ.s_id
                unit = self.unit_dict[summaryQ.s_unit]
                currency = self.currency_dict[summaryQ.s_cur]
                if currency != "THB":
                    continue
                if self.d_cashflowQ.get(s_id) is None:
                    continue

                cashflowQ = raw_cashflowQ[self.d_cashflowQ.get(s_id)]
                subsec = self.subsec_dict[summaryQ.s_symbol]
                shortname = self.shortname_dict[summaryQ.s_symbol]
                longname = self.longname_dict[summaryQ.s_symbol]
                sector = self.sector_name_dict[summaryQ.s_symbol]
            except KeyError:
                continue  # Skip this iteration if subsec is not found

            # transform cashflow
            try:
                transform_cashflowQ = transformed.CashflowQ(
                    s_id=summaryQ.s_id,
                    symbol=summaryQ.s_symbol,
                    year=summaryQ.s_year,
                    shortname=shortname,
                    longname=longname,
                    subsec=subsec,
                    sector=sector,
                    exch=summaryQ.s_exch,
                    quarter=summaryQ.s_quarter,
                    currency=currency,
                    cfo=unit * cashflowQ.c_opeact,
                    cfi=unit * cashflowQ.c_investact,
                    cff=unit * cashflowQ.c_financeact,
                    net_change_in_cash=unit * cashflowQ.c_netchange,
                    balance_bf=unit * cashflowQ.c_brought,
                    balance_cf=unit * cashflowQ.c_carried,
                )
            except Exception as e:
                continue
            cashflowsQ.append(transform_cashflowQ)
        return cashflowsQ

    def transform_incomes(self):
        # Income
        raw_income = self.fiscal_data["Income"]
        raw_summaries = self.fiscal_data["Summary"]

        incomes = []
        for summary in tqdm(raw_summaries):
            try:
                s_id = summary.s_id
                unit = self.unit_dict[summary.s_unit]
                currency = self.currency_dict[summary.s_cur]
                if currency != "THB":
                    continue
                if self.d_income.get(s_id) is None:
                    continue

                income = raw_income[self.d_income.get(s_id)]
                subsec = self.subsec_dict[summary.s_symbol]
                shortname = self.shortname_dict[summary.s_symbol]
                longname = self.longname_dict[summary.s_symbol]
                sector = self.sector_name_dict[summary.s_symbol]
            except KeyError:
                continue  # Skip this iteration if subsec is not found

            # transform income
            try:
                transform_income = transformed.Income(
                    s_id=summary.s_id,
                    symbol=summary.s_symbol,
                    shortname=shortname,
                    longname=longname,
                    subsec=subsec,
                    sector=sector,
                    year=summary.s_year,
                    exch=summary.s_exch,
                    currency=currency,
                    revenue=unit * income.i_turnover,
                    operating_profit=unit * income.i_oprofit,
                    pbt=unit * income.i_probtax,
                    tax=unit * income.i_taxation,
                    pat=unit * income.i_proatax,
                    extraord_items=unit * income.i_extraord,
                    minority_interest=unit * income.i_mininterest,
                    net_income=unit * income.i_netproshare,
                )
            except Exception as e:
                continue
            incomes.append(transform_income)
        return incomes

    def transform_incomes_q(self):
        # IncomeQ
        raw_incomeQ = self.fiscal_data["IncomeQ"]
        raw_summariesQ = self.fiscal_data["SummaryQ"]

        incomesQ = []
        for summaryQ in tqdm(raw_summariesQ):
            try:
                s_id = summaryQ.s_id
                unit = self.unit_dict[summaryQ.s_unit]
                currency = self.currency_dict[summaryQ.s_cur]
                if currency != "THB":
                    continue
                if self.d_incomeQ.get(s_id) is None:
                    continue

                incomeQ = raw_incomeQ[self.d_incomeQ.get(s_id)]
                subsec = self.subsec_dict[summaryQ.s_symbol]
                shortname = self.shortname_dict[summaryQ.s_symbol]
                longname = self.longname_dict[summaryQ.s_symbol]
                sector = self.sector_name_dict[summaryQ.s_symbol]
            except KeyError:
                continue  # Skip this iteration if subsec is not found

            # transform income quarter
            try:
                transform_incomeQ = transformed.IncomeQ(
                    s_id=summaryQ.s_id,
                    symbol=summaryQ.s_symbol,
                    shortname=shortname,
                    longname=longname,
                    subsec=subsec,
                    sector=sector,
                    year=summaryQ.s_year,
                    exch=summaryQ.s_exch,
                    quarter=summaryQ.s_quarter,
                    currency=currency,
                    revenue=unit * incomeQ.i_turnover,
                    operating_profit=unit * incomeQ.i_oprofit,
                    pbt=unit * incomeQ.i_probtax,
                    tax=unit * incomeQ.i_taxation,
                    pat=unit * incomeQ.i_proatax,
                    extraord_items=unit * incomeQ.i_extraord,
                    minority_interest=unit * incomeQ.i_mininterest,
                    net_income=unit * incomeQ.i_netproshare,
                )
            except Exception as e:
                continue
            incomesQ.append(transform_incomeQ)
        return incomesQ

    def transform_ratios(self):
        # Ratio
        raw_ratio = self.fiscal_data["Ratio"]
        raw_summaries = self.fiscal_data["Summary"]
        raw_income = self.fiscal_data["Income"]

        ratios = []
        for summary in tqdm(raw_summaries):
            try:
                s_id = summary.s_id
                unit = self.unit_dict[summary.s_unit]
                currency = self.currency_dict[summary.s_cur]
                if currency != "THB":
                    continue
                if self.d_ratio.get(s_id) is None:
                    continue
                income = raw_income[self.d_income.get(s_id)]
                ratio = raw_ratio[self.d_ratio.get(s_id)]
                subsec = self.subsec_dict[summary.s_symbol]
                shortname = self.shortname_dict[summary.s_symbol]
                longname = self.longname_dict[summary.s_symbol]
                sector = self.sector_name_dict[summary.s_symbol]
            except KeyError:
                continue  # Skip this iteration if subsec is not found

            # transform ratio
            try:
                transform_ratio = transformed.Ratio(
                    s_id=summary.s_id,
                    symbol=summary.s_symbol,
                    shortname=shortname,
                    longname=longname,
                    subsec=subsec,
                    sector=sector,
                    year=summary.s_year,
                    exch=summary.s_exch,
                    currency=currency,
                    stock_price=ratio.r_stock,
                    roe=ratio.r_roe,
                    rota=ratio.r_rota,
                    net_profit_margin=ratio.r_ror,
                    pe=ratio.r_pe,
                    dy=ratio.r_dy,
                    dpr=ratio.r_dp * 100,
                    current=ratio.r_current,
                    quick=ratio.r_quick,
                    de=ratio.r_leverage,
                    icr=ratio.r_intcover,
                    debt=ratio.r_debt,
                    eps=ratio.r_eps,
                    dps=ratio.r_dividend,
                    operating_profit_margin=(
                        unit * income.i_oprofit) / (unit * income.i_turnover) * 100,
                )
            except Exception as e:
                continue
            ratios.append(transform_ratio)
        return ratios

    def transform_ratios_q(self):
        # RatioQ
        raw_ratioQ = self.fiscal_data["RatioQ"]
        raw_summariesQ = self.fiscal_data["SummaryQ"]
        raw_incomeQ = self.fiscal_data["IncomeQ"]

        ratiosQ = []
        for summaryQ in tqdm(raw_summariesQ):
            try:
                s_id = summaryQ.s_id
                unit = self.unit_dict[summaryQ.s_unit]
                currency = self.currency_dict[summaryQ.s_cur]
                if currency != "THB":
                    continue
                if self.d_ratioQ.get(s_id) is None:
                    continue
                incomeQ = raw_incomeQ[self.d_incomeQ.get(s_id)]
                ratioQ = raw_ratioQ[self.d_ratioQ.get(s_id)]
                subsec = self.subsec_dict[summaryQ.s_symbol]
                shortname = self.shortname_dict[summaryQ.s_symbol]
                longname = self.longname_dict[summaryQ.s_symbol]
                sector = self.sector_name_dict[summaryQ.s_symbol]
            except KeyError:
                continue  # Skip this iteration if subsec is not found

            try:
                transform_ratioQ = transformed.RatioQ(
                    s_id=summaryQ.s_id,
                    symbol=summaryQ.s_symbol,
                    shortname=shortname,
                    longname=longname,
                    subsec=subsec,
                    sector=sector,
                    year=summaryQ.s_year,
                    exch=summaryQ.s_exch,
                    quarter=summaryQ.s_quarter,
                    currency=currency,
                    stock_price=ratioQ.r_stock,
                    roe=ratioQ.r_roe,
                    rota=ratioQ.r_rota,
                    net_profit_margin=ratioQ.r_ror,
                    pe=ratioQ.r_pe,
                    dy=ratioQ.r_dy,
                    dpr=ratioQ.r_dp * 100,
                    current=ratioQ.r_current,
                    quick=ratioQ.r_quick,
                    de=ratioQ.r_leverage,
                    eps=ratioQ.r_eps,
                    dps=ratioQ.r_dividend,
                    operating_profit_margin=(
                        unit * incomeQ.i_oprofit) / (unit * incomeQ.i_turnover) * 100,
                )
            except Exception as e:
                continue
            ratiosQ.append(transform_ratioQ)
        return ratiosQ

    def transform_all(self):
        """
        Transform all raw data to the desired format.
        """
        return {
            "balances": self.transform_balances(),
            "balances_q": self.transform_balances_q(),
            "cashflows": self.transform_cashflows(),
            "cashflows_q": self.transform_cashflows_q(),
            "incomes": self.transform_incomes(),
            "incomes_q": self.transform_incomes_q(),
            "ratios": self.transform_ratios(),
            "ratios_q": self.transform_ratios_q(),
        }


class DataDeleter:
    """
    A class to handle bulk deleting data from the source database.
    """

    def __init__(self, session):
        self.session = session

    def delete_transformed_data(self):
        """
        Delete all data from the transformed database.
        """
        # List of models to fetch data from
        models = [
            transformed.Balance,
            transformed.BalanceQ,
            transformed.Cashflow,
            transformed.CashflowQ,
            transformed.Income,
            transformed.IncomeQ,
            transformed.Ratio,
            transformed.RatioQ,
        ]
        for model in models:
            start = time.time()
            # Delete all records from the model's table
            self.session.query(model).delete(synchronize_session=False)
            print(f"Delete {model.__name__} time: {time.time() - start}")
        self.session.commit()
        return True


class DataCommitter:
    """
    A class to handle bulk committing transformed data to the target database.
    """

    def __init__(self, session):
        self.session = session

    def bulk_commit(self, data, chunk_size=1000):
        """
        Commit data in bulk with specified chunk size.
        """
        for i in range((len(data) // chunk_size) + 1):
            start = time.time()
            self.session.add_all(data[i * chunk_size: (i + 1) * chunk_size])
            self.session.commit()
            print(f"Commit time: {time.time() - start}")
Leave a Comment