Untitled

 avatar
unknown
plain_text
2 months ago
3.6 kB
2
Indexable
import logging
import os
from string import Formatter

import pandas as pd

from app.base.Constantes import *

logger = logging.getLogger(__name__)


class MDXDataLoader:
    query_result: pd.DataFrame

    def __init__(self, catalog, mdx_query_name, source_cube):
        self.mdx_query_name = mdx_query_name
        self.parameter_map = {}
        self.mdx_query_template = ''
        self.mdx_query_to_run = ''
        self.catalog = catalog
        self.source_cube = source_cube
        self.query_result = pd.DataFrame()
        self.transformed_df = None

        try:
            with open(os.path.join("app", "data_loader", "mdx", mdx_query_name + '.mdx')) as mdx_query:
                self.mdx_query_template = mdx_query.read()
                self.parameters_to_fill = [param for _, param, _, _ in Formatter().parse(self.mdx_query_template) if
                                           param is not None]

        except EnvironmentError as exception:
            logger.error('Could not create MDX query loader for query: %s and parameters %s', mdx_query_name)
            print(exception)

    def map_mdx_parameters(self, mdx_parameters):
        """
        Maps input mdx params' values with params' keys from mdx formatting.
        Ex: {AsOfDate: 2022-09-30} --> {data: 2022-09-30}
        """
        if len(mdx_parameters) < len(self.parameters_to_fill):
            logger.error('MDX query %s expects parameters: %s; but only these were received: %s',
                         self.mdx_query_name, self.parameters_to_fill, mdx_parameters)
            raise KeyError

        for param in self.parameters_to_fill:
            try:
                self.parameter_map[param] = mdx_parameters[DICT_CAMPOS_MDX[param]]
                print("DICT_CAMPOS_MDX[param]:", DICT_CAMPOS_MDX.get(param))
                print("mdx_parameters.keys():", mdx_parameters.keys())
            except KeyError:  # Maybe there is no need to convert naming, just use the received parameter name
                self.parameter_map[param] = mdx_parameters[param]

    def extract(self, mdx_parameters, *args, **kwargs):
        self.map_mdx_parameters(mdx_parameters)
        self.mdx_query_to_run = self.mdx_query_template.format(**self.parameter_map)
        logger.debug('Running query %s:  %s', self.mdx_query_name, self.mdx_query_to_run)
        self.query_result = self.catalog.query_mdx(self.source_cube, self.mdx_query_to_run, self.source_cube)
        logger.info('Query %s with parameters %s received %d results.', self.mdx_query_name,
                    self.parameter_map, len(self.query_result))

    def transform(self, transf_function=None, *args, **kwargs):
        if self.transformed_df is None:
            self.transformed_df = self.query_result.reset_index()
        if transf_function:
            self.transformed_df = transf_function(self.transformed_df)

    def load(self, destination_store_name):
        if len(self.transformed_df) > 0:
            store = self.catalog.data_store.get_table(destination_store_name)
            initial_size = len(store)
            store.load_pandas(self.transformed_df)
            logger.info('Loaded %d rows into store %s. Current size: %s.', len(store)-initial_size,
                        destination_store_name, len(store))
        else:
            logger.warning('Attempting to load dataframe of size 0 into store %s. Skipping load.',
                           destination_store_name)

    def get_query_result(self):
        return self.query_result.reset_index()
Editor is loading...
Leave a Comment