Untitled
unknown
plain_text
2 months ago
3.6 kB
2
Indexable
import logging import os from string import Formatter import pandas as pd from app.base.Constantes import * logger = logging.getLogger(__name__) class MDXDataLoader: query_result: pd.DataFrame def __init__(self, catalog, mdx_query_name, source_cube): self.mdx_query_name = mdx_query_name self.parameter_map = {} self.mdx_query_template = '' self.mdx_query_to_run = '' self.catalog = catalog self.source_cube = source_cube self.query_result = pd.DataFrame() self.transformed_df = None try: with open(os.path.join("app", "data_loader", "mdx", mdx_query_name + '.mdx')) as mdx_query: self.mdx_query_template = mdx_query.read() self.parameters_to_fill = [param for _, param, _, _ in Formatter().parse(self.mdx_query_template) if param is not None] except EnvironmentError as exception: logger.error('Could not create MDX query loader for query: %s and parameters %s', mdx_query_name) print(exception) def map_mdx_parameters(self, mdx_parameters): """ Maps input mdx params' values with params' keys from mdx formatting. Ex: {AsOfDate: 2022-09-30} --> {data: 2022-09-30} """ if len(mdx_parameters) < len(self.parameters_to_fill): logger.error('MDX query %s expects parameters: %s; but only these were received: %s', self.mdx_query_name, self.parameters_to_fill, mdx_parameters) raise KeyError for param in self.parameters_to_fill: try: self.parameter_map[param] = mdx_parameters[DICT_CAMPOS_MDX[param]] print("DICT_CAMPOS_MDX[param]:", DICT_CAMPOS_MDX.get(param)) print("mdx_parameters.keys():", mdx_parameters.keys()) except KeyError: # Maybe there is no need to convert naming, just use the received parameter name self.parameter_map[param] = mdx_parameters[param] def extract(self, mdx_parameters, *args, **kwargs): self.map_mdx_parameters(mdx_parameters) self.mdx_query_to_run = self.mdx_query_template.format(**self.parameter_map) logger.debug('Running query %s: %s', self.mdx_query_name, self.mdx_query_to_run) self.query_result = self.catalog.query_mdx(self.source_cube, self.mdx_query_to_run, self.source_cube) logger.info('Query %s with parameters %s received %d results.', self.mdx_query_name, self.parameter_map, len(self.query_result)) def transform(self, transf_function=None, *args, **kwargs): if self.transformed_df is None: self.transformed_df = self.query_result.reset_index() if transf_function: self.transformed_df = transf_function(self.transformed_df) def load(self, destination_store_name): if len(self.transformed_df) > 0: store = self.catalog.data_store.get_table(destination_store_name) initial_size = len(store) store.load_pandas(self.transformed_df) logger.info('Loaded %d rows into store %s. Current size: %s.', len(store)-initial_size, destination_store_name, len(store)) else: logger.warning('Attempting to load dataframe of size 0 into store %s. Skipping load.', destination_store_name) def get_query_result(self): return self.query_result.reset_index()
Editor is loading...
Leave a Comment