Untitled
unknown
plain_text
a month ago
45 kB
2
Indexable
Never
import tempfile import xml.etree.ElementTree as ET from pathlib import Path from os import environ from tableauserverclient import DatasourceItem, ServerResponseError, PersonalAccessTokenAuth, Server, Pager from tabulate import tabulate from src.logger.logger import * from src.msystem import sys_variables from src.utilities import * import tableauserverclient as TSC from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) # get the environ variables forward_slash = environ.get("slashreresent", "_fwd_SLASH") # env load for set_ds_connection_info function class TransformDirective: def __init__(self, file, rules): self.name = file self.sheets = rules def set_ds_connection_info(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying Set connection info transformation") logger_info( f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and " f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m", color=Fore.YELLOW) root = args[0] database_type = kwargs["value"]["@class"] caption = kwargs["value"]["@caption"] # password vault try: pvault = sys_variables.pvault except Exception: pvault = None pass # find the node having connection class as database_type named_connection_node = root.find(f".//named-connection[@caption='{caption}']") if named_connection_node is not None: logger_info(f'\n\t\tNamed Connection node found : {named_connection_node}') # Get the children connection node which has class defined in database_type connection_node = named_connection_node.find(f".//connection[@class='{database_type}']") if connection_node is not None: logger_info(f'\n\t\tConnection node found : {connection_node}') # Change the attributes to the new value now for key, value in kwargs["value"][database_type].items(): logger_info(f"\t\tChanging the value of attribute {key[1:]} to {value}") if pvault: if key[1:] in pvault: pvault_val = pvault[key[1:]] logger_info(f"\t\tChanging the password of {key[1:]} to {pvault_val}") connection_node.set(key[1:], pvault_val) connection_node.set(key[1:], value) def replace_table_name(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying Replace table name transformation") root = args[0] # print(ET.tostring(root, encoding='unicode', method='xml')) # Case One:Main Datasoure # Findeing the name-connection tag in the datasource # New_Table_name new_table_name = kwargs["value"]["@new_table_name"] # Old_Table_name old_table_name = kwargs["value"]["@old_table_name"] for element in root.iter(): if "caption" in element.attrib and element.get("caption") == old_table_name: logger_info(f"\t\tTable Named Matched {old_table_name}") element.set("caption", new_table_name) if "name" in element.attrib and element.get("name") == old_table_name: element.set("name", new_table_name) # else: # logger_info(f"\t\tTable Named Doesnt Matched") def change_custom_sql(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying Set custom sql transformation") root = args[0] name = kwargs["value"]["@name"] old_query = kwargs["value"]["@old_sql"] new_query = kwargs["value"]["@new_sql"] elements_to_replace = root.findall(f".//*[@name='{name}']") for element_to_replace in elements_to_replace: if element_to_replace.text == old_query: logger_info(f"\t\tOld query Matched") element_to_replace.text = new_query else: logger_info(f"\t\tOld query Doesn't Match") def replace_calculation_formula(self, *args, **kwargs): """ Find the node which matches the caption and get the child node of the caption node calculation. Replace the calculation formula. :param args: :param kwargs: :return: """ logger_info(f"\n\t\t-- Applying Replace Calculation Formula transformation") root = args[0] caption = kwargs["value"]["@caption"] new_calculation_formula = kwargs["value"]["@new_formula"] column_nodes = root.findall(f".//*[@caption='{caption}']") # Get all the child nodes of the column node which has the calculation tag for column_node in column_nodes: calculation_nodes = column_node.findall(".//calculation") for calculation_node in calculation_nodes: calculation_node.set("formula", new_calculation_formula) logger_info( f"\t\tSetted calculation node : {calculation_node}, new formula as : {new_calculation_formula}" ) def set_display_name(self, *args, **kwargs): """ Find the datasource node which matches the caption and set the display name. :param args: :param kwargs: :return: """ logger_info(f"\n\t\t-- Applying Set Display Name transformation", Fore.BLUE) root = args[0] old_display_name = kwargs["value"]["@old_display_name"] new_display_name = kwargs["value"]["@new_display_name"] datasource_node = root.findall(f".//datasource[@caption='{old_display_name}']") if datasource_node: logger_info( f"\t\tFound {len(datasource_node)} datasource nodes with caption {old_display_name}." ) for ds_node in datasource_node: ds_node.set("caption", new_display_name) logger_info( f"\t\tSet the caption of the datasource node to {new_display_name}." ) def remove_extract(self, *args, **kwargs): """ Remove the extract from the datasource. :param args: :param kwargs: :return: """ logger_info(f"\n\t\t-- Applying Remove Extract transformation") root = args[0] new_value = kwargs["value"]["@new_value"] extract_nodes = root.findall(f".//extract") if extract_nodes: logger_info(f"\t\tFound {len(extract_nodes)}.") for ex_node in extract_nodes: ex_node.set("enabled", new_value) logger_info( f"\t\tSet the enabled attribute of the extract node to {new_value}." ) def transform_unc_path(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying UNC transformation") root = args[0] caption_name = kwargs["value"]["@caption"] path = kwargs["value"]["@path"] file = kwargs["value"]["@file"] file_name, file_type = os.path.splitext(file) # Find the named-connection element with the specified caption for named_connection in root.iter('named-connection'): caption = named_connection.get('caption') logger_info(f"\t\t Named connection caption: {caption}") if caption == caption_name: logger_info("\t\t Matching named-connection found") # Find the connection element and update its filename attribute connection = named_connection.find('connection') if connection is not None: class_value = connection.get('class') if file_type == ".xlsx": if class_value == "excel-direct": path = path + file filename = connection.get('filename') name = filename.split('/')[-1] if name == file: connection.set('filename', path) logger_info(f"\t\t Updated old path {filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".csv" or file_type == ".txt": if class_value == "textscan": filename = connection.get('filename') directory = connection.get('directory') if filename == file: connection.set('directory', path) path = path + file logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".pdf": path = path + file if class_value == "pdf": filename = connection.get('origfilename') name = filename.split('/')[-1] if name == file: connection.set('origfilename', path) logger_info(f"\t\t Updated old path {filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".kml": if class_value == "ogrdirect": filename = connection.get('filename') directory = connection.get('directory') if filename == file: connection.set('directory', path) path = path + file logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".json": if class_value == "semistructpassivestore-direct": filename = connection.get('filename') directory = connection.get('directory') if filename == file: connection.set('directory', path) path = path + file logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".hyper": path = path + file if class_value == "hyper": filename = connection.get('dbname') name = filename.split('/')[-1] if name == file: connection.set('dbname', path) logger_info(f"\t\t Updated old path {filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".sas7bdat": if class_value == "stat-direct": filename = connection.get('filename') directory = connection.get('directory') if filename == file: connection.set('directory', path) path = path + file logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") else: print("else") # elif file_type == ".csv": # # Find the named-connection element with the specified caption # for named_connection in root.iter('named-connection'): # caption = named_connection.get('caption') # print("Named connection caption:", caption) # if caption == caption_name: # print("Matching named-connection found") # # Find the connection element and update its filename attribute # connection = named_connection.find('connection') # if connection is not None: # connection.set('directory', path) # print("Updated filename to:", path) # else: # print("else") def tabulate(self, header, data): """ Print in tabulate format :param header: :param data: :return: """ print(tabulate(data, headers=header, tablefmt="grid")) class Project: def __init__(self, server): self.projects = list(TSC.Pager(server.projects)) self.project_id_parent_id_map = {} self.project_parent_map = {} for project in self.projects: self.project_id_parent_id_map[project.id] = project self.project_parent_map[project.name] = project.parent_id self.server = server def get_project_id_nested(self, project_names): """ Get the project ID of the project name. :param project_name: :return: """ status, project = self.check_project_exists(project_names) if status: for project_name in self.projects: if project_name.name.lower() == project.lower(): return project_name.id else: logger_info(f"\t\tProject {project} not found") def check_project_exists(self, project_names): """ Check if the project exists :param project_name: :return: """ parent = None logger_info(f"\t\tProject Names for nested checking: {project_names}") projects_found = [] projects_not_found = [] for each_project_name in project_names: try: parent_project_id = self.project_parent_map[each_project_name] try: parent_project = self.project_id_parent_id_map[parent_project_id] except: parent_project = None if parent_project is None or parent_project.name.lower() == parent.lower(): projects_found.append(each_project_name) except KeyError as e: projects_not_found.append(each_project_name) break parent = each_project_name if projects_not_found: logger_info(f"\t\tProject {projects_not_found} not found") return False, projects_not_found return True, project_names[-1] def get_project_id(self, project_name, path: None, name: None): """ Get the project id of the project name. :param project_name: :return: """ # loop # for project in self.projects: # if project.name == project_name: # return project.id if path is None: print("Path is None.") return None if name is None: path_parts = path.split(os.path.sep) else: path_parts = path.split("/") # path_parts = path.split("\\") current_project = None for project in self.projects: if project.name == path_parts[1]: current_project = project break # If the project doesn't exist, return None if current_project is None: return None # print("Current Project:", current_project.id) # Traverse through the rest of the path parts for part in path_parts[2:]: # Find the child project with the given name and parent ID child_project_found = False for child in self.projects: if child.parent_id == current_project.id and child.name == part: current_project = child child_project_found = True break # If the child project doesn't exist, return None if not child_project_found: return None # print("Current Project:", current_project.id) return current_project.id def get_child_project_id(self, projects, project_path): # Base case: If the project path is empty, return None if not project_path: return None for project in projects: project_path_ = project_path[0].replace(forward_slash, '/') if project.name.lower() == project_path_.lower(): if len(project_path) == 1: return project.id else: child_projects = [p for p in self.projects if p.parent_id == project.id] child_project_id = self.get_child_project_id(child_projects, project_path[1:]) if child_project_id: return child_project_id return None def get_project_ids(self, path): main_path = path[1:] if '/' in main_path: path_parts = main_path.split('/') return self.get_child_project_id(self.projects, path_parts) else: for pro in self.projects: main_path = main_path.replace(forward_slash, '/') if pro.name.lower() == main_path.lower() and pro.parent_id is None: return pro.id return None def make_project(self, name, parent_id=None): new_project = TSC.ProjectItem(name, description=None, content_permissions=None, parent_id=parent_id) try: created_project = self.server.projects.create(new_project) return created_project except TSC.ServerResponseError as e: print(f"Error creating project: {e}") return None def create_project(self, path, all_projects, parent_project=None): project_name = path[0].replace('_fwd_SLASH_', '/') if parent_project is None: existing_project = next( (p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id is None), None) else: existing_project = next( (p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id == parent_project.id), None) if existing_project: current_project = existing_project else: # Create the project if it doesn't exist new_project = self.make_project(project_name, parent_project.id if parent_project else None) if new_project: current_project = new_project else: print(f"Failed to create project: {project_name}") return None if len(path) > 1: return self.create_project(path[1:], all_projects, current_project) else: return current_project def get_path_id(self, path): path=path[1:] all_projects=list(TSC.Pager(self.server.projects)) if '/' in path: path=path.split('/') project_id= self.get_child_project_id(path,all_projects) return project_id else: for project in all_projects: if project.name==path and project.parent_id==None: return project.id class Datasource: def __init__(self, filepath, filename, content_id, rules, project_id, type, output=None, name_map=None, project_path=None): # Full path to the file self.filepath = filepath # Name of the file self.name = filename self.content_id = content_id self.rule = rules self.transform = TransformDirective(filepath, rules) # self.publish = PublishDirective(filepath, rules) self.project_id = project_id self.root_project_id = project_id self.schedule_name = "" self.rename_path = None self.rename_ds = None self.project = Project(sys_variables.server) self.name_map = name_map self.project_path = project_path # if type == "publish": # self.project = Project(sys_variables.server) # project_name = self.get_project_path_from_rules() # if project_name: # self.root_project_id = self.project.get_project_id_nested(project_name) # Unzip the files in the temp directory self.temp_dir = tempfile.NamedTemporaryFile().name unzip_to_dir(filepath, self.temp_dir) # Get the .twb file from the temp directory filename = [ file for file in os.listdir(self.temp_dir) if file.endswith(".tds") ][0] self.twb_file = os.path.join(self.temp_dir, filename) # Read the xml file using ElementTree self.tree = ET.parse(self.twb_file) self.root = self.tree.getroot() # Check if output directory exists, if not create it output_dir = ( os.path.join(Path.home(), output) if output else sys_variables.output_dir ) if not os.path.exists(output_dir): os.makedirs(output_dir) # self.output_dir = os.path.join(Path.home(), output_dir, self.project_id) # if not os.path.exists(self.output_dir): # os.makedirs(self.output_dir) if self.project_id: self.output_dir = os.path.join(Path.home(), output_dir, self.project_id) if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) else: self.output_dir = None def get_project_path_from_rules(self): project_name = [] for rule in self.rule: if rule["@name"] in self.name: project_name = rule["@project"].split("/")[1:] if project_name: logger_info( f"\t\tProject structure for default publish : {project_name} for {self.name}" ) return project_name def apply_transformation(self): rule = self.rule rules_mapping = { "replace_table_name": self.transform.replace_table_name, "change_custom_sql": self.transform.change_custom_sql, "set_calculation_formula": self.transform.replace_calculation_formula, "set_display_name": self.transform.set_display_name, "remove_extract": self.transform.remove_extract, "set_ds_connection_info": self.transform.set_ds_connection_info, "transform_unc_path": self.transform.transform_unc_path } try: for key, value in rules_mapping.items(): if key in rule: if isinstance(rule[key], list): for item in rule[key]: if item["@enabled"].lower() == "false": continue rules_mapping[key](self.root, value=item) else: if rule[key]["@enabled"].lower() == "false": continue value = rule[key] rules_mapping[key](self.root, value=value) except Exception as e: logger_error(f"Error processing rules: {e}") def apply_publish(self): # Get the root project ID from the project name in the destination server # This will get overwritten if the add Project or change project rule is enabled # self.root_project_id = self.project.get_project_id( # project_name=rule["@project"], path=path, name=None # ) rule = self.rule rules_mapping = { "set_folder": self.set_folder, "apply_schedule": self.apply_schedule, "rename_datasource": self.rename_datasource, "set_connection_secrets": self.set_connection_param, } if rule["@project"].lower() == "all" and rule["@name"].lower() == "all": # logger_info("\n\t\tThe 'Add' and 'Change Project' directives have no effect.") # logger_info("\n\t\tThe workbook will be published in the same project as on the source server.") path = self.project_path project_id = self.project.get_path_id(path) all_projects=list(TSC.Pager(self.project.server.projects)) if project_id is None: path=path[1:] if '/' in path: path=path.split('/') project_id=self.project.create_project(path,all_projects) project_id=project_id.id else: project_id=self.project.make_project(path) project_id=project_id.id else: print("Project path already exists:",project_id) if project_id: self.root_project_id = project_id logger_info(f"\t\tProject {path} found") try: for key, value in rules_mapping.items(): if key in rule: if isinstance(rule[key], list): for item in rule[key]: if item["@enabled"].lower() == "false": continue rules_mapping[key](self.root, value=item) else: if rule[key]["@enabled"].lower() == "false": continue value = rule[key] rules_mapping[key](self.root, value=value) except Exception as e: logger_error(f"Error processing rules: {e}") def set_connection_param(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying Set connection param changes.") logger_info( f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and " f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m", color=Fore.YELLOW) root = args[0] database_type = kwargs["value"]["@class"] caption = kwargs["value"]["@caption"] # password vault try: pvault = sys_variables.pvault except Exception: pvault = None pass # find the node having connection class as database_type named_connection_node = root.find(f".//named-connection[@caption='{caption}']") if named_connection_node is not None: logger_info(f'\n\t\tDatabase Connection found with caption : {named_connection_node}') # Get the children connection node which has class defined in database_type connection_node = named_connection_node.find(f".//connection[@class='{database_type}']") if connection_node is not None: logger_info(f'\n\t\tConnection found : {connection_node}') # Change the attributes to the new value now for key, value in kwargs["value"][database_type].items(): if pvault: for key1,val in pvault.items(): if value == key1: pvault_val = pvault[key1] logger_info(f"\t\tChanging") connection_node.set(key[1:], pvault_val) logger_info(f"\t\tSaving the datasource:") self.save_datasource() self.zip_datasource("publish") else: logger_info(f"\n\t\tNo database connection found with the caption : {caption}") def apply_rename(self, rule, path): rules_mapping = { "rename_datasource": self.rename_datasource, } for key, value in rules_mapping.items(): if key in rule: if rule[key]["@enabled"].lower() == "false": continue value = rule[key] self.rename_ds = value['@new_ds_name'] self.rename_path = value['@current_path'] def set_folder(self, *args, **kwargs): """ Folder To place the datasource: :return: """ logger_info(f"\n\t\t-- Setting the folder to place the datasource") project_name = kwargs["value"]["@folder_name"] path = project_name # get the project id of the project name # project_id = self.project.get_project_id(project_name, path, name="set_folder") project_id = self.project.get_project_ids(path) if project_id: self.root_project_id = project_id logger_info(f"\t\tProject {path} found") else: self.root_project_id = "" logger_error(f"\t\tProject {project_name} not found") return def apply_schedule(self, *args, **kwargs): """ Apply schedules on target datasource: :return: """ logger_info(f"\n\t\t-- Applying schedules on target datasource datasource") schedule_name = kwargs["value"]["@schedule_name"] self.schedule_name = schedule_name def save_datasource(self): """ Save the changes in the datasource to datasource directory. :return: """ logger_info(f"\t\tSaving the datasource : {self.twb_file}") self.tree.write(os.path.join(self.temp_dir, self.twb_file)) # zip the temp directory into .twbx file #self.zip_datasource() def zip_datasource(self, publish=None): """ Zip the datasource. :return: """ #new_filename = os.path.join() filename = f"{os.path.basename(self.name).split('.')[0]}.tdsx" if publish: output_dir = os.path.dirname(self.filepath) else: output_dir = self.output_dir zip_directory_to_twbx(self.temp_dir, os.path.join(output_dir, filename)) def publish_(self): """ Publish the workbook. :return: """ if ".tdsx" in self.content_id: self.content_id = self.content_id.replace(".tdsx", "") if self.name_map: for key, value in self.name_map.items(): if value == self.name: self.name = key logger_info(f"\t\tPublishing the DataSource : {self.name}") if self.root_project_id: published_datasource = sys_variables.server.datasources.publish( DatasourceItem( name=self.name, project_id=self.root_project_id, ), self.filepath, "Overwrite", ) self.apply_schedules(published_datasource.id) self.update_datasource_name() else: logger_info( "\n\t\tNo project found to publish. There could be multiple reasons for this" "\n\t\t1. The project structure is not correct in config. " "\n\t\t2. The project structure is not available in the destination site. " "\n\t\tPlease check the spelling of the names of each project in the project structure " "\n\t\tCheck in the destination site whether the project structure exists or not " f"\n\t\tDATASOURCE FILENAME : {self.content_id}") def apply_schedules(self, data_source_id): """ Apply Schedules. :return: """ # get a datasource item that should be added to a schedule datasource_item = sys_variables.server.datasources.get_by_id(data_source_id) datasource_schedules = sys_variables.server.schedules.get() specific_name = self.schedule_name if specific_name: matching_schedule_ids = "" for schedule in datasource_schedules[0]: if schedule.name == specific_name: matching_schedule_ids = schedule.id.strip() # print("Matching Schedule IDs for '{}': {}".format(specific_name, matching_schedule_ids)) # retrieve the id of the target schedule schedule_id = matching_schedule_ids try: # Add the data source to the schedule sys_variables.server.schedules.add_to_schedule( schedule_id=schedule_id, datasource=datasource_item, ) logger_info( f"\tSuccessfully applied schedules for the {self.name} data source." ) except ServerResponseError as e: if e.code == "403078": logger_info( "Permission Denied: Not allowed to create extract refresh tasks on this object." ) else: # Handle other types of errors if needed logger_error("An error occurred:", e) def __repr__(self): return f"Datasource : {self.name}" def rename_datasource(self, *args, **kwargs): """ Rename the datasource. :return: """ logger_info( f"\n\t\t-- Renaming the datasource to : {kwargs['value']['@new_ds_name']}" ) new_name = kwargs["value"]["@new_ds_name"] current_path = kwargs["value"]["@current_path"] self.content_id = new_name def get_all_projects(self, server): all_projects = {} for project in list(TSC.Pager(server.projects)): all_projects[project.id] = project return all_projects def get_full_path(self, datasource, projects_dict): path_parts = [datasource.name] project = projects_dict.get(datasource.project_id) while project: path_parts.insert(0, f"/{project.name}") if project.parent_id: project = projects_dict.get(project.parent_id) else: project = None return '/'.join(path_parts) def update_datasource_name(self): full_path = f"{self.rename_path}/{self.name}" self.name = self.rename_ds matched_datasource = None projects_dict = self.get_all_projects(sys_variables.server) for datasource in Pager(sys_variables.server.datasources.get): if self.get_full_path(datasource, projects_dict) == full_path: matched_datasource = datasource break if matched_datasource: matched_datasource.name = self.name sys_variables.server.datasources.update(matched_datasource) print(f"Datasource '{full_path}' updated to '{self.name}'") class RecurseDataSourceProjectPath: def __init__(self, project_path, source_folder_map, publish_folder_map, recurse, source, each_rule, target=None, name_map=None): self.project_path = project_path self.recurse = recurse self.source_folder_map = source_folder_map self.publish_folder_map = publish_folder_map self.source = source self.projects_to_operate = [] self.rule = each_rule self.target = target self.name_map = name_map self.reverse_name_map = {value: key for key, value in self.name_map.items()} if self.project_path.lower() == "all": for key, value in source_folder_map.items(): self.projects_to_operate.append(key) if recurse: for key, value in source_folder_map.items(): if self.project_path.lower() == "all": self.projects_to_operate.append(key) if recurse.lower() in ['true', 'yes']: if key.startswith(project_path) and value: self.projects_to_operate.append(key) else: if project_path == '/': if key.count('/') == 1: self.projects_to_operate.append(key) elif project_path == key and value: self.projects_to_operate.append(key) self.workbooks = [] def get_datasources(self): for project_path in self.projects_to_operate: # Iterate through the project paths # find the keys name_map=None try: folder_uuid = self.source_folder_map[project_path] destination_project_id = self.publish_folder_map[project_path] file_dir = os.path.join(self.source, folder_uuid) # Iterate through the directory for file in os.listdir(file_dir): workbook_name = os.path.splitext(file)[0] actual_name = self.reverse_name_map[workbook_name] if os.path.splitext(file)[1] in ['.tdsx', '.tds']: logger_info(f"\t\tFile exist in the path : {os.path.join(file_dir, file)}") self.workbooks.append( [ os.path.join(file_dir, file), Datasource( # TODO: change the file path read from db os.path.join(file_dir, file), workbook_name, actual_name, self.rule, destination_project_id, "transform", self.target, name_map, project_path ), ] ) except Exception as e: pass return self.workbooks def get_datasource_objects(rule, source_folder_map, project_archive_map, source, target=None, name_map=None , type=None): datasources = [] logger_info(f"\n\n-- Going through the config file.") logger_info(f"-------------------------------------------------------------") if type == "transform": rules = rule["directives"]["datasources"]["transform"]["datasource"] else: rules = rule["directives"]["datasources"]["publish"]["datasource"] if isinstance(rules, dict): rules = [rules] for each_rule in rules: try: project_path = each_rule["@project"] content_name = each_rule["@name"] required = each_rule["@enabled"] logger_info(f"\n\nProject : {project_path} , Datasource : {content_name} , Enabled : {required}") # Search the project path in source archive map if required.lower() in ["true", "yes", "ok"]: if content_name.lower() in ['all', '*']: recurse = each_rule.get("@recurse", None) rpp = RecurseDataSourceProjectPath(project_path, source_folder_map, project_archive_map, recurse, source, each_rule, target, name_map) datasources = datasources + rpp.get_datasources() print("datasources",datasources) continue if project_path in source_folder_map and content_name in name_map: logger_info(f"\t\tProject path exist in destination : {urllib.parse.unquote(project_path)}") # Get the folder name where the workbook is lying project_id = source_folder_map[project_path] destination_project_id = project_archive_map[project_path] if destination_project_id is None: logger_error(f"\t\tThe project in destination doesn't exist : {project_path}") # continue file_dir = os.path.join(source, project_id) #content_id = name_map[content_name] sanitized_datasource_name = replace_special_characters(content_name) file_path = get_file_path_with_extension(sanitized_datasource_name, file_dir) # if type == "transform": # content_name = content_id # Check whether the filename exist in the path or not if file_path and os.path.exists(file_path): logger_info(f"\t\tFile exist in the path : {file_path}") datasources.append( [ # os.path.join(dirpath, file), file_path, Datasource( # TODO: change the file path read from db file_path, content_name, sanitized_datasource_name, each_rule, destination_project_id, "transform", target, name_map, project_path ), ] ) else: logger_info(f"File doesn't exist in the path : {file_path}. " f"The datasource might not be tagged.") else: logger_info(f"\t\tThe datasource : {content_name} is not tagged in project path : {project_path}." f"Please check the config file.") else: logger_info(f"Disabled for Datasource : {content_name}") except Exception as e: logger_error(f"\t\tThe datasource : {content_name} in project : {project_path} " f" is not tagged.") pass return datasources
Leave a Comment