Untitled
unknown
plain_text
a year ago
3.6 kB
15
Indexable
import logging
import os
from string import Formatter
import pandas as pd
from app.base.Constantes import *
logger = logging.getLogger(__name__)
class MDXDataLoader:
query_result: pd.DataFrame
def __init__(self, catalog, mdx_query_name, source_cube):
self.mdx_query_name = mdx_query_name
self.parameter_map = {}
self.mdx_query_template = ''
self.mdx_query_to_run = ''
self.catalog = catalog
self.source_cube = source_cube
self.query_result = pd.DataFrame()
self.transformed_df = None
try:
with open(os.path.join("app", "data_loader", "mdx", mdx_query_name + '.mdx')) as mdx_query:
self.mdx_query_template = mdx_query.read()
self.parameters_to_fill = [param for _, param, _, _ in Formatter().parse(self.mdx_query_template) if
param is not None]
except EnvironmentError as exception:
logger.error('Could not create MDX query loader for query: %s and parameters %s', mdx_query_name)
print(exception)
def map_mdx_parameters(self, mdx_parameters):
"""
Maps input mdx params' values with params' keys from mdx formatting.
Ex: {AsOfDate: 2022-09-30} --> {data: 2022-09-30}
"""
if len(mdx_parameters) < len(self.parameters_to_fill):
logger.error('MDX query %s expects parameters: %s; but only these were received: %s',
self.mdx_query_name, self.parameters_to_fill, mdx_parameters)
raise KeyError
for param in self.parameters_to_fill:
try:
self.parameter_map[param] = mdx_parameters[DICT_CAMPOS_MDX[param]]
print("DICT_CAMPOS_MDX[param]:", DICT_CAMPOS_MDX.get(param))
print("mdx_parameters.keys():", mdx_parameters.keys())
except KeyError: # Maybe there is no need to convert naming, just use the received parameter name
self.parameter_map[param] = mdx_parameters[param]
def extract(self, mdx_parameters, *args, **kwargs):
self.map_mdx_parameters(mdx_parameters)
self.mdx_query_to_run = self.mdx_query_template.format(**self.parameter_map)
logger.debug('Running query %s: %s', self.mdx_query_name, self.mdx_query_to_run)
self.query_result = self.catalog.query_mdx(self.source_cube, self.mdx_query_to_run, self.source_cube)
logger.info('Query %s with parameters %s received %d results.', self.mdx_query_name,
self.parameter_map, len(self.query_result))
def transform(self, transf_function=None, *args, **kwargs):
if self.transformed_df is None:
self.transformed_df = self.query_result.reset_index()
if transf_function:
self.transformed_df = transf_function(self.transformed_df)
def load(self, destination_store_name):
if len(self.transformed_df) > 0:
store = self.catalog.data_store.get_table(destination_store_name)
initial_size = len(store)
store.load_pandas(self.transformed_df)
logger.info('Loaded %d rows into store %s. Current size: %s.', len(store)-initial_size,
destination_store_name, len(store))
else:
logger.warning('Attempting to load dataframe of size 0 into store %s. Skipping load.',
destination_store_name)
def get_query_result(self):
return self.query_result.reset_index()
Editor is loading...
Leave a Comment