Untitled
unknown
plain_text
a year ago
27 kB
15
Indexable
import time
from tqdm import tqdm
from models import fiscal_data, fund_data, transformed
class DataFetcher:
"""
A class to handle fetching data from the raw database.
"""
def __init__(self, session):
self.session = session
def fetch_fiscal_data(self):
"""
Fetch all data from the fiscal database.
"""
data = {}
# List of models to fetch data from
models = [
fiscal_data.Summary,
fiscal_data.SummaryQ,
fiscal_data.Balance,
fiscal_data.BalanceQ,
fiscal_data.Cashflow,
fiscal_data.CashflowQ,
fiscal_data.Income,
fiscal_data.IncomeQ,
fiscal_data.Ratio,
fiscal_data.RatioQ,
fiscal_data.TblCur,
fiscal_data.TblFormat,
]
for model in models:
start = time.time()
# Query all records from the model's table
data[model.__name__] = self.session.query(model).all()
print(f"Query {model.__name__} time: {time.time() - start}")
return data
def fetch_fund_data(self):
"""
Fetch all data from the fund database.
"""
data = {}
# List of models to fetch data from
models = [
fund_data.FPrimaryinfo,
fund_data.FSector,
]
for model in models:
start = time.time()
# Query all records from the model's table
data[model.__name__] = self.session.query(model).all()
print(f"Query {model.__name__} time: {time.time() - start}")
return data
class DataTransformer:
"""
A class to handle transforming raw data into the desired format.
"""
def __init__(self, fiscal_data, fund_data):
self.fiscal_data = fiscal_data
self.currency_dict = self.build_currency_dict(fiscal_data["TblCur"])
self.unit_dict = self.build_unit_dict(fiscal_data["TblFormat"])
self.longname_dict = self.build_longname_dict(
fund_data["FPrimaryinfo"])
self.shortname_dict = self.build_shortname_dict(
fund_data["FPrimaryinfo"])
self.subsec_dict = self.build_subsec_dict(fund_data["FPrimaryinfo"])
self.sector_dict, self.sector_name_dict = self.build_sector_dict(
fund_data["FPrimaryinfo"], fund_data["FSector"])
self.d_balance = {}
self.d_balanceQ = {}
self.d_cashflow = {}
self.d_cashflowQ = {}
self.d_income = {}
self.d_incomeQ = {}
self.d_ratio = {}
self.d_ratioQ = {}
self.map_ids_to_indices(self.fiscal_data["Balance"], self.d_balance)
self.map_ids_to_indices(self.fiscal_data["BalanceQ"], self.d_balanceQ)
self.map_ids_to_indices(self.fiscal_data["Cashflow"], self.d_cashflow)
self.map_ids_to_indices(
self.fiscal_data["CashflowQ"], self.d_cashflowQ)
self.map_ids_to_indices(self.fiscal_data["Income"], self.d_income)
self.map_ids_to_indices(self.fiscal_data["IncomeQ"], self.d_incomeQ)
self.map_ids_to_indices(self.fiscal_data["Ratio"], self.d_ratio)
self.map_ids_to_indices(self.fiscal_data["RatioQ"], self.d_ratioQ)
def map_ids_to_indices(self, source, dest_dict):
for i, item in tqdm(enumerate(source)):
dest_dict[item.s_id] = i
def build_unit_dict(self, fiscal_data_units):
"""
Build a dictionary of unit ids to unit names.
"""
d_unit = {}
for unit in fiscal_data_units:
d_unit[unit.id] = unit.f_value
return d_unit
def build_currency_dict(self, fiscal_data_currency):
"""
Build a dictionary of currency ids to currency names.
"""
d_currency = {}
for currency in fiscal_data_currency:
d_currency[currency.id] = currency.cur_code
return d_currency
def build_longname_dict(self, fund_data_primaryinfo):
"""
Build a dictionary of symbols to long names.
"""
d_longname = {}
for i, subsec in tqdm(enumerate(fund_data_primaryinfo)):
d_longname[subsec.f_symbol] = subsec.f_longname
return d_longname
def build_shortname_dict(self, fund_data_primaryinfo):
"""
Build a dictionary of symbols to short names.
"""
d_shortname = {}
for i, subsec in tqdm(enumerate(fund_data_primaryinfo)):
d_shortname[subsec.f_symbol] = subsec.f_shortname
return d_shortname
def build_subsec_dict(self, fund_data_primaryinfo):
"""
Build a dictionary of symbols to subsector names.
"""
d_subsector = {}
for i, subsec in tqdm(enumerate(fund_data_primaryinfo)):
d_subsector[subsec.f_symbol] = subsec.f_subsec
return d_subsector
def build_sector_dict(self, fund_data_primaryinfo, fund_data_sector):
"""
Build a dictionary of sector to subsec.
"""
d_sector = {}
d_sector_name = {}
for i, sector in tqdm(enumerate(fund_data_sector)):
d_sector[sector.s_id] = sector.s_name
for fund in fund_data_primaryinfo:
sector_name = d_sector.get(fund.f_sid)
if sector_name:
d_sector_name[fund.f_symbol] = sector_name
return d_sector, d_sector_name
def transform_balances(self):
# Balance
raw_balances = self.fiscal_data["Balance"]
raw_summaries = self.fiscal_data["Summary"]
balances = []
for summary in tqdm(raw_summaries):
try:
s_id = summary.s_id
unit = self.unit_dict[summary.s_unit]
currency = self.currency_dict[summary.s_cur]
if currency != "THB":
continue
if self.d_balance.get(s_id) is None:
continue
balance = raw_balances[self.d_balance.get(s_id)]
subsec = self.subsec_dict[summary.s_symbol]
shortname = self.shortname_dict[summary.s_symbol]
longname = self.longname_dict[summary.s_symbol]
sector = self.sector_name_dict[summary.s_symbol]
except KeyError:
continue # Skip this iteration if subsec is not found
# transform balance
try:
transform_balance = transformed.Balance(
s_id=summary.s_id,
symbol=summary.s_symbol,
year=summary.s_year,
exch=summary.s_exch,
currency=currency,
subsec=subsec,
sector=sector,
shortname=shortname,
longname=longname,
cce=unit * balance.b_cacbb,
shortterm_invest=unit * balance.b_casti,
inventory=unit * balance.b_castock,
accounts_receivables=unit * balance.b_cadebt,
other_current_assets=unit * balance.b_caoca,
total_current_assets=unit * balance.b_catotal,
shortterm_debt=unit * balance.b_clloans,
accounts_payable=unit * balance.b_clcredit,
taxes_payable=unit * balance.b_cltaxation,
dividend_payable=unit * balance.b_cldividend,
other_current_liabilities=unit * balance.b_clocl,
total_current_liabilities=unit * balance.b_cltotal,
net_current_assets=unit * balance.b_netasset,
ppe=unit * balance.b_lappe,
longterm_invest=unit * balance.b_lainvest,
intangible_assets=unit * balance.b_laintasset,
other_longterm_assets=unit * balance.b_laolta,
total_long_termassets=unit * balance.b_latotal,
share_capital=unit * balance.b_sfshare,
treasury_shares=unit * balance.b_sftreasury,
reserves=unit * balance.b_sfreserve,
total_shareholders_equity=unit * balance.b_sftotal,
minority_interest=unit * balance.b_mininterest,
longterm_liabilities=unit * balance.b_longterm,
total_debt=unit * balance.b_cltotal + unit * balance.b_longterm,
)
except Exception as e:
continue
# Skip this iteration if subsec is not found
balances.append(transform_balance)
return balances
def transform_balances_q(self):
# BalanceQ
raw_balancesQ = self.fiscal_data["BalanceQ"]
raw_summariesQ = self.fiscal_data["SummaryQ"]
balancesQ = []
for summaryQ in tqdm(raw_summariesQ):
try:
s_id = summaryQ.s_id
unit = self.unit_dict[summaryQ.s_unit]
currency = self.currency_dict[summaryQ.s_cur]
if currency != "THB":
continue
if self.d_balanceQ.get(s_id) is None:
continue
balanceQ = raw_balancesQ[self.d_balanceQ.get(s_id)]
subsec = self.subsec_dict[summaryQ.s_symbol]
shortname = self.shortname_dict[summaryQ.s_symbol]
longname = self.longname_dict[summaryQ.s_symbol]
sector = self.sector_name_dict[summaryQ.s_symbol]
except KeyError:
continue # Skip this iteration if subsec is not found
# transform balance quarterly
try:
transform_balanceQ = transformed.BalanceQ(
s_id=summaryQ.s_id,
symbol=summaryQ.s_symbol,
shortname=shortname,
longname=longname,
subsec=subsec,
sector=sector,
year=summaryQ.s_year,
exch=summaryQ.s_exch,
quarter=summaryQ.s_quarter,
currency=currency,
cce=unit * balanceQ.b_cacbb,
shortterm_invest=unit * balanceQ.b_casti,
inventory=unit * balanceQ.b_castock,
accounts_receivables=unit * balanceQ.b_cadebt,
other_current_assets=unit * balanceQ.b_caoca,
total_current_assets=unit * balanceQ.b_catotal,
shortterm_debt=unit * balanceQ.b_clloans,
accounts_payable=unit * balanceQ.b_clcredit,
taxes_payable=unit * balanceQ.b_cltaxation,
dividend_payable=unit * balanceQ.b_cldividend,
other_current_liabilities=unit * balanceQ.b_clocl,
total_current_liabilities=unit * balanceQ.b_cltotal,
net_current_assets=unit * balanceQ.b_netasset,
ppe=unit * balanceQ.b_lappe,
longterm_invest=unit * balanceQ.b_lainvest,
intangible_assets=unit * balanceQ.b_laintasset,
other_longterm_assets=unit * balanceQ.b_laolta,
total_long_termassets=unit * balanceQ.b_latotal,
share_capital=unit * balanceQ.b_sfshare,
treasury_shares=unit * balanceQ.b_sftreasury,
reserves=unit * balanceQ.b_sfreserve,
total_shareholders_equity=unit * balanceQ.b_sftotal,
minority_interest=unit * balanceQ.b_mininterest,
longterm_liabilities=unit * balanceQ.b_longterm,
total_debt=unit * balanceQ.b_cltotal + unit * balanceQ.b_longterm,
)
except Exception as e:
continue
balancesQ.append(transform_balanceQ)
return balancesQ
def transform_cashflows(self):
# Cashflow
raw_cashflow = self.fiscal_data["Cashflow"]
raw_summaries = self.fiscal_data["Summary"]
cashflows = []
for summary in tqdm(raw_summaries):
try:
s_id = summary.s_id
unit = self.unit_dict[summary.s_unit]
currency = self.currency_dict[summary.s_cur]
if currency != "THB":
continue
if self.d_cashflow.get(s_id) is None:
continue
cashflow = raw_cashflow[self.d_cashflow.get(s_id)]
subsec = self.subsec_dict[summary.s_symbol]
shortname = self.shortname_dict[summary.s_symbol]
longname = self.longname_dict[summary.s_symbol]
sector = self.sector_name_dict[summary.s_symbol]
except KeyError:
continue # Skip this iteration if subsec is not found
# transform balance
try:
transform_cashflow = transformed.Cashflow(
s_id=summary.s_id,
symbol=summary.s_symbol,
shortname=shortname,
longname=longname,
sector=sector,
subsec=subsec,
year=summary.s_year,
exch=summary.s_exch,
currency=currency,
cfo=unit * cashflow.c_opeact,
cfi=unit * cashflow.c_investact,
cff=unit * cashflow.c_financeact,
net_change_in_cash=unit * cashflow.c_netchange,
balance_bf=unit * cashflow.c_brought,
balance_cf=unit * cashflow.c_carried,
)
except Exception as e:
continue
cashflows.append(transform_cashflow)
return cashflows
def transform_cashflows_q(self):
# CashflowQ
raw_cashflowQ = self.fiscal_data["CashflowQ"]
raw_summariesQ = self.fiscal_data["SummaryQ"]
cashflowsQ = []
for summaryQ in tqdm(raw_summariesQ):
try:
s_id = summaryQ.s_id
unit = self.unit_dict[summaryQ.s_unit]
currency = self.currency_dict[summaryQ.s_cur]
if currency != "THB":
continue
if self.d_cashflowQ.get(s_id) is None:
continue
cashflowQ = raw_cashflowQ[self.d_cashflowQ.get(s_id)]
subsec = self.subsec_dict[summaryQ.s_symbol]
shortname = self.shortname_dict[summaryQ.s_symbol]
longname = self.longname_dict[summaryQ.s_symbol]
sector = self.sector_name_dict[summaryQ.s_symbol]
except KeyError:
continue # Skip this iteration if subsec is not found
# transform cashflow
try:
transform_cashflowQ = transformed.CashflowQ(
s_id=summaryQ.s_id,
symbol=summaryQ.s_symbol,
year=summaryQ.s_year,
shortname=shortname,
longname=longname,
subsec=subsec,
sector=sector,
exch=summaryQ.s_exch,
quarter=summaryQ.s_quarter,
currency=currency,
cfo=unit * cashflowQ.c_opeact,
cfi=unit * cashflowQ.c_investact,
cff=unit * cashflowQ.c_financeact,
net_change_in_cash=unit * cashflowQ.c_netchange,
balance_bf=unit * cashflowQ.c_brought,
balance_cf=unit * cashflowQ.c_carried,
)
except Exception as e:
continue
cashflowsQ.append(transform_cashflowQ)
return cashflowsQ
def transform_incomes(self):
# Income
raw_income = self.fiscal_data["Income"]
raw_summaries = self.fiscal_data["Summary"]
incomes = []
for summary in tqdm(raw_summaries):
try:
s_id = summary.s_id
unit = self.unit_dict[summary.s_unit]
currency = self.currency_dict[summary.s_cur]
if currency != "THB":
continue
if self.d_income.get(s_id) is None:
continue
income = raw_income[self.d_income.get(s_id)]
subsec = self.subsec_dict[summary.s_symbol]
shortname = self.shortname_dict[summary.s_symbol]
longname = self.longname_dict[summary.s_symbol]
sector = self.sector_name_dict[summary.s_symbol]
except KeyError:
continue # Skip this iteration if subsec is not found
# transform income
try:
transform_income = transformed.Income(
s_id=summary.s_id,
symbol=summary.s_symbol,
shortname=shortname,
longname=longname,
subsec=subsec,
sector=sector,
year=summary.s_year,
exch=summary.s_exch,
currency=currency,
revenue=unit * income.i_turnover,
operating_profit=unit * income.i_oprofit,
pbt=unit * income.i_probtax,
tax=unit * income.i_taxation,
pat=unit * income.i_proatax,
extraord_items=unit * income.i_extraord,
minority_interest=unit * income.i_mininterest,
net_income=unit * income.i_netproshare,
)
except Exception as e:
continue
incomes.append(transform_income)
return incomes
def transform_incomes_q(self):
# IncomeQ
raw_incomeQ = self.fiscal_data["IncomeQ"]
raw_summariesQ = self.fiscal_data["SummaryQ"]
incomesQ = []
for summaryQ in tqdm(raw_summariesQ):
try:
s_id = summaryQ.s_id
unit = self.unit_dict[summaryQ.s_unit]
currency = self.currency_dict[summaryQ.s_cur]
if currency != "THB":
continue
if self.d_incomeQ.get(s_id) is None:
continue
incomeQ = raw_incomeQ[self.d_incomeQ.get(s_id)]
subsec = self.subsec_dict[summaryQ.s_symbol]
shortname = self.shortname_dict[summaryQ.s_symbol]
longname = self.longname_dict[summaryQ.s_symbol]
sector = self.sector_name_dict[summaryQ.s_symbol]
except KeyError:
continue # Skip this iteration if subsec is not found
# transform income quarter
try:
transform_incomeQ = transformed.IncomeQ(
s_id=summaryQ.s_id,
symbol=summaryQ.s_symbol,
shortname=shortname,
longname=longname,
subsec=subsec,
sector=sector,
year=summaryQ.s_year,
exch=summaryQ.s_exch,
quarter=summaryQ.s_quarter,
currency=currency,
revenue=unit * incomeQ.i_turnover,
operating_profit=unit * incomeQ.i_oprofit,
pbt=unit * incomeQ.i_probtax,
tax=unit * incomeQ.i_taxation,
pat=unit * incomeQ.i_proatax,
extraord_items=unit * incomeQ.i_extraord,
minority_interest=unit * incomeQ.i_mininterest,
net_income=unit * incomeQ.i_netproshare,
)
except Exception as e:
continue
incomesQ.append(transform_incomeQ)
return incomesQ
def transform_ratios(self):
# Ratio
raw_ratio = self.fiscal_data["Ratio"]
raw_summaries = self.fiscal_data["Summary"]
raw_income = self.fiscal_data["Income"]
ratios = []
for summary in tqdm(raw_summaries):
try:
s_id = summary.s_id
unit = self.unit_dict[summary.s_unit]
currency = self.currency_dict[summary.s_cur]
if currency != "THB":
continue
if self.d_ratio.get(s_id) is None:
continue
income = raw_income[self.d_income.get(s_id)]
ratio = raw_ratio[self.d_ratio.get(s_id)]
subsec = self.subsec_dict[summary.s_symbol]
shortname = self.shortname_dict[summary.s_symbol]
longname = self.longname_dict[summary.s_symbol]
sector = self.sector_name_dict[summary.s_symbol]
except KeyError:
continue # Skip this iteration if subsec is not found
# transform ratio
try:
transform_ratio = transformed.Ratio(
s_id=summary.s_id,
symbol=summary.s_symbol,
shortname=shortname,
longname=longname,
subsec=subsec,
sector=sector,
year=summary.s_year,
exch=summary.s_exch,
currency=currency,
stock_price=ratio.r_stock,
roe=ratio.r_roe,
rota=ratio.r_rota,
net_profit_margin=ratio.r_ror,
pe=ratio.r_pe,
dy=ratio.r_dy,
dpr=ratio.r_dp * 100,
current=ratio.r_current,
quick=ratio.r_quick,
de=ratio.r_leverage,
icr=ratio.r_intcover,
debt=ratio.r_debt,
eps=ratio.r_eps,
dps=ratio.r_dividend,
operating_profit_margin=(
unit * income.i_oprofit) / (unit * income.i_turnover) * 100,
)
except Exception as e:
continue
ratios.append(transform_ratio)
return ratios
def transform_ratios_q(self):
# RatioQ
raw_ratioQ = self.fiscal_data["RatioQ"]
raw_summariesQ = self.fiscal_data["SummaryQ"]
raw_incomeQ = self.fiscal_data["IncomeQ"]
ratiosQ = []
for summaryQ in tqdm(raw_summariesQ):
try:
s_id = summaryQ.s_id
unit = self.unit_dict[summaryQ.s_unit]
currency = self.currency_dict[summaryQ.s_cur]
if currency != "THB":
continue
if self.d_ratioQ.get(s_id) is None:
continue
incomeQ = raw_incomeQ[self.d_incomeQ.get(s_id)]
ratioQ = raw_ratioQ[self.d_ratioQ.get(s_id)]
subsec = self.subsec_dict[summaryQ.s_symbol]
shortname = self.shortname_dict[summaryQ.s_symbol]
longname = self.longname_dict[summaryQ.s_symbol]
sector = self.sector_name_dict[summaryQ.s_symbol]
except KeyError:
continue # Skip this iteration if subsec is not found
try:
transform_ratioQ = transformed.RatioQ(
s_id=summaryQ.s_id,
symbol=summaryQ.s_symbol,
shortname=shortname,
longname=longname,
subsec=subsec,
sector=sector,
year=summaryQ.s_year,
exch=summaryQ.s_exch,
quarter=summaryQ.s_quarter,
currency=currency,
stock_price=ratioQ.r_stock,
roe=ratioQ.r_roe,
rota=ratioQ.r_rota,
net_profit_margin=ratioQ.r_ror,
pe=ratioQ.r_pe,
dy=ratioQ.r_dy,
dpr=ratioQ.r_dp * 100,
current=ratioQ.r_current,
quick=ratioQ.r_quick,
de=ratioQ.r_leverage,
eps=ratioQ.r_eps,
dps=ratioQ.r_dividend,
operating_profit_margin=(
unit * incomeQ.i_oprofit) / (unit * incomeQ.i_turnover) * 100,
)
except Exception as e:
continue
ratiosQ.append(transform_ratioQ)
return ratiosQ
def transform_all(self):
"""
Transform all raw data to the desired format.
"""
return {
"balances": self.transform_balances(),
"balances_q": self.transform_balances_q(),
"cashflows": self.transform_cashflows(),
"cashflows_q": self.transform_cashflows_q(),
"incomes": self.transform_incomes(),
"incomes_q": self.transform_incomes_q(),
"ratios": self.transform_ratios(),
"ratios_q": self.transform_ratios_q(),
}
class DataDeleter:
"""
A class to handle bulk deleting data from the source database.
"""
def __init__(self, session):
self.session = session
def delete_transformed_data(self):
"""
Delete all data from the transformed database.
"""
# List of models to fetch data from
models = [
transformed.Balance,
transformed.BalanceQ,
transformed.Cashflow,
transformed.CashflowQ,
transformed.Income,
transformed.IncomeQ,
transformed.Ratio,
transformed.RatioQ,
]
for model in models:
start = time.time()
# Delete all records from the model's table
self.session.query(model).delete(synchronize_session=False)
print(f"Delete {model.__name__} time: {time.time() - start}")
self.session.commit()
return True
class DataCommitter:
"""
A class to handle bulk committing transformed data to the target database.
"""
def __init__(self, session):
self.session = session
def bulk_commit(self, data, chunk_size=1000):
"""
Commit data in bulk with specified chunk size.
"""
for i in range((len(data) // chunk_size) + 1):
start = time.time()
self.session.add_all(data[i * chunk_size: (i + 1) * chunk_size])
self.session.commit()
print(f"Commit time: {time.time() - start}")
Editor is loading...
Leave a Comment