Untitled
unknown
plain_text
9 months ago
45 kB
6
Indexable
import tempfile import xml.etree.ElementTree as ET from pathlib import Path from os import environ from tableauserverclient import DatasourceItem, ServerResponseError, PersonalAccessTokenAuth, Server, Pager from tabulate import tabulate from src.logger.logger import * from src.msystem import sys_variables from src.utilities import * import tableauserverclient as TSC from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) # get the environ variables forward_slash = environ.get("slashreresent", "_fwd_SLASH") # env load for set_ds_connection_info function class TransformDirective: def __init__(self, file, rules): self.name = file self.sheets = rules def set_ds_connection_info(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying Set connection info transformation") logger_info( f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and " f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m", color=Fore.YELLOW) root = args[0] database_type = kwargs["value"]["@class"] caption = kwargs["value"]["@caption"] # password vault try: pvault = sys_variables.pvault except Exception: pvault = None pass # find the node having connection class as database_type named_connection_node = root.find(f".//named-connection[@caption='{caption}']") if named_connection_node is not None: logger_info(f'\n\t\tNamed Connection node found : {named_connection_node}') # Get the children connection node which has class defined in database_type connection_node = named_connection_node.find(f".//connection[@class='{database_type}']") if connection_node is not None: logger_info(f'\n\t\tConnection node found : {connection_node}') # Change the attributes to the new value now for key, value in kwargs["value"][database_type].items(): logger_info(f"\t\tChanging the value of attribute {key[1:]} to {value}") if pvault: if key[1:] in pvault: pvault_val = pvault[key[1:]] logger_info(f"\t\tChanging the password of {key[1:]} to {pvault_val}") connection_node.set(key[1:], pvault_val) connection_node.set(key[1:], value) def replace_table_name(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying Replace table name transformation") root = args[0] # print(ET.tostring(root, encoding='unicode', method='xml')) # Case One:Main Datasoure # Findeing the name-connection tag in the datasource # New_Table_name new_table_name = kwargs["value"]["@new_table_name"] # Old_Table_name old_table_name = kwargs["value"]["@old_table_name"] for element in root.iter(): if "caption" in element.attrib and element.get("caption") == old_table_name: logger_info(f"\t\tTable Named Matched {old_table_name}") element.set("caption", new_table_name) if "name" in element.attrib and element.get("name") == old_table_name: element.set("name", new_table_name) # else: # logger_info(f"\t\tTable Named Doesnt Matched") def change_custom_sql(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying Set custom sql transformation") root = args[0] name = kwargs["value"]["@name"] old_query = kwargs["value"]["@old_sql"] new_query = kwargs["value"]["@new_sql"] elements_to_replace = root.findall(f".//*[@name='{name}']") for element_to_replace in elements_to_replace: if element_to_replace.text == old_query: logger_info(f"\t\tOld query Matched") element_to_replace.text = new_query else: logger_info(f"\t\tOld query Doesn't Match") def replace_calculation_formula(self, *args, **kwargs): """ Find the node which matches the caption and get the child node of the caption node calculation. Replace the calculation formula. :param args: :param kwargs: :return: """ logger_info(f"\n\t\t-- Applying Replace Calculation Formula transformation") root = args[0] caption = kwargs["value"]["@caption"] new_calculation_formula = kwargs["value"]["@new_formula"] column_nodes = root.findall(f".//*[@caption='{caption}']") # Get all the child nodes of the column node which has the calculation tag for column_node in column_nodes: calculation_nodes = column_node.findall(".//calculation") for calculation_node in calculation_nodes: calculation_node.set("formula", new_calculation_formula) logger_info( f"\t\tSetted calculation node : {calculation_node}, new formula as : {new_calculation_formula}" ) def set_display_name(self, *args, **kwargs): """ Find the datasource node which matches the caption and set the display name. :param args: :param kwargs: :return: """ logger_info(f"\n\t\t-- Applying Set Display Name transformation", Fore.BLUE) root = args[0] old_display_name = kwargs["value"]["@old_display_name"] new_display_name = kwargs["value"]["@new_display_name"] datasource_node = root.findall(f".//datasource[@caption='{old_display_name}']") if datasource_node: logger_info( f"\t\tFound {len(datasource_node)} datasource nodes with caption {old_display_name}." ) for ds_node in datasource_node: ds_node.set("caption", new_display_name) logger_info( f"\t\tSet the caption of the datasource node to {new_display_name}." ) def remove_extract(self, *args, **kwargs): """ Remove the extract from the datasource. :param args: :param kwargs: :return: """ logger_info(f"\n\t\t-- Applying Remove Extract transformation") root = args[0] new_value = kwargs["value"]["@new_value"] extract_nodes = root.findall(f".//extract") if extract_nodes: logger_info(f"\t\tFound {len(extract_nodes)}.") for ex_node in extract_nodes: ex_node.set("enabled", new_value) logger_info( f"\t\tSet the enabled attribute of the extract node to {new_value}." ) def transform_unc_path(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying UNC transformation") root = args[0] caption_name = kwargs["value"]["@caption"] path = kwargs["value"]["@path"] file = kwargs["value"]["@file"] file_name, file_type = os.path.splitext(file) # Find the named-connection element with the specified caption for named_connection in root.iter('named-connection'): caption = named_connection.get('caption') logger_info(f"\t\t Named connection caption: {caption}") if caption == caption_name: logger_info("\t\t Matching named-connection found") # Find the connection element and update its filename attribute connection = named_connection.find('connection') if connection is not None: class_value = connection.get('class') if file_type == ".xlsx": if class_value == "excel-direct": path = path + file filename = connection.get('filename') name = filename.split('/')[-1] if name == file: connection.set('filename', path) logger_info(f"\t\t Updated old path {filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".csv" or file_type == ".txt": if class_value == "textscan": filename = connection.get('filename') directory = connection.get('directory') if filename == file: connection.set('directory', path) path = path + file logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".pdf": path = path + file if class_value == "pdf": filename = connection.get('origfilename') name = filename.split('/')[-1] if name == file: connection.set('origfilename', path) logger_info(f"\t\t Updated old path {filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".kml": if class_value == "ogrdirect": filename = connection.get('filename') directory = connection.get('directory') if filename == file: connection.set('directory', path) path = path + file logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".json": if class_value == "semistructpassivestore-direct": filename = connection.get('filename') directory = connection.get('directory') if filename == file: connection.set('directory', path) path = path + file logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".hyper": path = path + file if class_value == "hyper": filename = connection.get('dbname') name = filename.split('/')[-1] if name == file: connection.set('dbname', path) logger_info(f"\t\t Updated old path {filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") elif file_type == ".sas7bdat": if class_value == "stat-direct": filename = connection.get('filename') directory = connection.get('directory') if filename == file: connection.set('directory', path) path = path + file logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}") else: logger_error("\t\t File name is not matched") else: logger_error(f"\t\t File type is not matched. Please check the file or file type") else: print("else") # elif file_type == ".csv": # # Find the named-connection element with the specified caption # for named_connection in root.iter('named-connection'): # caption = named_connection.get('caption') # print("Named connection caption:", caption) # if caption == caption_name: # print("Matching named-connection found") # # Find the connection element and update its filename attribute # connection = named_connection.find('connection') # if connection is not None: # connection.set('directory', path) # print("Updated filename to:", path) # else: # print("else") def tabulate(self, header, data): """ Print in tabulate format :param header: :param data: :return: """ print(tabulate(data, headers=header, tablefmt="grid")) class Project: def __init__(self, server): self.projects = list(TSC.Pager(server.projects)) self.project_id_parent_id_map = {} self.project_parent_map = {} for project in self.projects: self.project_id_parent_id_map[project.id] = project self.project_parent_map[project.name] = project.parent_id self.server = server def get_project_id_nested(self, project_names): """ Get the project ID of the project name. :param project_name: :return: """ status, project = self.check_project_exists(project_names) if status: for project_name in self.projects: if project_name.name.lower() == project.lower(): return project_name.id else: logger_info(f"\t\tProject {project} not found") def check_project_exists(self, project_names): """ Check if the project exists :param project_name: :return: """ parent = None logger_info(f"\t\tProject Names for nested checking: {project_names}") projects_found = [] projects_not_found = [] for each_project_name in project_names: try: parent_project_id = self.project_parent_map[each_project_name] try: parent_project = self.project_id_parent_id_map[parent_project_id] except: parent_project = None if parent_project is None or parent_project.name.lower() == parent.lower(): projects_found.append(each_project_name) except KeyError as e: projects_not_found.append(each_project_name) break parent = each_project_name if projects_not_found: logger_info(f"\t\tProject {projects_not_found} not found") return False, projects_not_found return True, project_names[-1] def get_project_id(self, project_name, path: None, name: None): """ Get the project id of the project name. :param project_name: :return: """ # loop # for project in self.projects: # if project.name == project_name: # return project.id if path is None: print("Path is None.") return None if name is None: path_parts = path.split(os.path.sep) else: path_parts = path.split("/") # path_parts = path.split("\\") current_project = None for project in self.projects: if project.name == path_parts[1]: current_project = project break # If the project doesn't exist, return None if current_project is None: return None # print("Current Project:", current_project.id) # Traverse through the rest of the path parts for part in path_parts[2:]: # Find the child project with the given name and parent ID child_project_found = False for child in self.projects: if child.parent_id == current_project.id and child.name == part: current_project = child child_project_found = True break # If the child project doesn't exist, return None if not child_project_found: return None # print("Current Project:", current_project.id) return current_project.id def get_child_project_id(self, projects, project_path): # Base case: If the project path is empty, return None if not project_path: return None for project in projects: project_path_ = project_path[0].replace(forward_slash, '/') if project.name.lower() == project_path_.lower(): if len(project_path) == 1: return project.id else: child_projects = [p for p in self.projects if p.parent_id == project.id] child_project_id = self.get_child_project_id(child_projects, project_path[1:]) if child_project_id: return child_project_id return None def get_project_ids(self, path): main_path = path[1:] if '/' in main_path: path_parts = main_path.split('/') return self.get_child_project_id(self.projects, path_parts) else: for pro in self.projects: main_path = main_path.replace(forward_slash, '/') if pro.name.lower() == main_path.lower() and pro.parent_id is None: return pro.id return None def make_project(self, name, parent_id=None): new_project = TSC.ProjectItem(name, description=None, content_permissions=None, parent_id=parent_id) try: created_project = self.server.projects.create(new_project) return created_project except TSC.ServerResponseError as e: print(f"Error creating project: {e}") return None def create_project(self, path, all_projects, parent_project=None): project_name = path[0].replace('_fwd_SLASH_', '/') if parent_project is None: existing_project = next( (p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id is None), None) else: existing_project = next( (p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id == parent_project.id), None) if existing_project: current_project = existing_project else: # Create the project if it doesn't exist new_project = self.make_project(project_name, parent_project.id if parent_project else None) if new_project: current_project = new_project else: print(f"Failed to create project: {project_name}") return None if len(path) > 1: return self.create_project(path[1:], all_projects, current_project) else: return current_project def get_path_id(self, path): path=path[1:] all_projects=list(TSC.Pager(self.server.projects)) if '/' in path: path=path.split('/') project_id= self.get_child_project_id(path,all_projects) return project_id else: for project in all_projects: if project.name==path and project.parent_id==None: return project.id class Datasource: def __init__(self, filepath, filename, content_id, rules, project_id, type, output=None, name_map=None, project_path=None): # Full path to the file self.filepath = filepath # Name of the file self.name = filename self.content_id = content_id self.rule = rules self.transform = TransformDirective(filepath, rules) # self.publish = PublishDirective(filepath, rules) self.project_id = project_id self.root_project_id = project_id self.schedule_name = "" self.rename_path = None self.rename_ds = None self.project = Project(sys_variables.server) self.name_map = name_map self.project_path = project_path # if type == "publish": # self.project = Project(sys_variables.server) # project_name = self.get_project_path_from_rules() # if project_name: # self.root_project_id = self.project.get_project_id_nested(project_name) # Unzip the files in the temp directory self.temp_dir = tempfile.NamedTemporaryFile().name unzip_to_dir(filepath, self.temp_dir) # Get the .twb file from the temp directory filename = [ file for file in os.listdir(self.temp_dir) if file.endswith(".tds") ][0] self.twb_file = os.path.join(self.temp_dir, filename) # Read the xml file using ElementTree self.tree = ET.parse(self.twb_file) self.root = self.tree.getroot() # Check if output directory exists, if not create it output_dir = ( os.path.join(Path.home(), output) if output else sys_variables.output_dir ) if not os.path.exists(output_dir): os.makedirs(output_dir) # self.output_dir = os.path.join(Path.home(), output_dir, self.project_id) # if not os.path.exists(self.output_dir): # os.makedirs(self.output_dir) if self.project_id: self.output_dir = os.path.join(Path.home(), output_dir, self.project_id) if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) else: self.output_dir = None def get_project_path_from_rules(self): project_name = [] for rule in self.rule: if rule["@name"] in self.name: project_name = rule["@project"].split("/")[1:] if project_name: logger_info( f"\t\tProject structure for default publish : {project_name} for {self.name}" ) return project_name def apply_transformation(self): rule = self.rule rules_mapping = { "replace_table_name": self.transform.replace_table_name, "change_custom_sql": self.transform.change_custom_sql, "set_calculation_formula": self.transform.replace_calculation_formula, "set_display_name": self.transform.set_display_name, "remove_extract": self.transform.remove_extract, "set_ds_connection_info": self.transform.set_ds_connection_info, "transform_unc_path": self.transform.transform_unc_path } try: for key, value in rules_mapping.items(): if key in rule: if isinstance(rule[key], list): for item in rule[key]: if item["@enabled"].lower() == "false": continue rules_mapping[key](self.root, value=item) else: if rule[key]["@enabled"].lower() == "false": continue value = rule[key] rules_mapping[key](self.root, value=value) except Exception as e: logger_error(f"Error processing rules: {e}") def apply_publish(self): # Get the root project ID from the project name in the destination server # This will get overwritten if the add Project or change project rule is enabled # self.root_project_id = self.project.get_project_id( # project_name=rule["@project"], path=path, name=None # ) rule = self.rule rules_mapping = { "set_folder": self.set_folder, "apply_schedule": self.apply_schedule, "rename_datasource": self.rename_datasource, "set_connection_secrets": self.set_connection_param, } if rule["@project"].lower() == "all" and rule["@name"].lower() == "all": # logger_info("\n\t\tThe 'Add' and 'Change Project' directives have no effect.") # logger_info("\n\t\tThe workbook will be published in the same project as on the source server.") path = self.project_path project_id = self.project.get_path_id(path) all_projects=list(TSC.Pager(self.project.server.projects)) if project_id is None: path=path[1:] if '/' in path: path=path.split('/') project_id=self.project.create_project(path,all_projects) project_id=project_id.id else: project_id=self.project.make_project(path) project_id=project_id.id else: print("Project path already exists:",project_id) if project_id: self.root_project_id = project_id logger_info(f"\t\tProject {path} found") try: for key, value in rules_mapping.items(): if key in rule: if isinstance(rule[key], list): for item in rule[key]: if item["@enabled"].lower() == "false": continue rules_mapping[key](self.root, value=item) else: if rule[key]["@enabled"].lower() == "false": continue value = rule[key] rules_mapping[key](self.root, value=value) except Exception as e: logger_error(f"Error processing rules: {e}") def set_connection_param(self, *args, **kwargs): logger_info(f"\n\t\t-- Applying Set connection param changes.") logger_info( f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and " f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m", color=Fore.YELLOW) root = args[0] database_type = kwargs["value"]["@class"] caption = kwargs["value"]["@caption"] # password vault try: pvault = sys_variables.pvault except Exception: pvault = None pass # find the node having connection class as database_type named_connection_node = root.find(f".//named-connection[@caption='{caption}']") if named_connection_node is not None: logger_info(f'\n\t\tDatabase Connection found with caption : {named_connection_node}') # Get the children connection node which has class defined in database_type connection_node = named_connection_node.find(f".//connection[@class='{database_type}']") if connection_node is not None: logger_info(f'\n\t\tConnection found : {connection_node}') # Change the attributes to the new value now for key, value in kwargs["value"][database_type].items(): if pvault: for key1,val in pvault.items(): if value == key1: pvault_val = pvault[key1] logger_info(f"\t\tChanging") connection_node.set(key[1:], pvault_val) logger_info(f"\t\tSaving the datasource:") self.save_datasource() self.zip_datasource("publish") else: logger_info(f"\n\t\tNo database connection found with the caption : {caption}") def apply_rename(self, rule, path): rules_mapping = { "rename_datasource": self.rename_datasource, } for key, value in rules_mapping.items(): if key in rule: if rule[key]["@enabled"].lower() == "false": continue value = rule[key] self.rename_ds = value['@new_ds_name'] self.rename_path = value['@current_path'] def set_folder(self, *args, **kwargs): """ Folder To place the datasource: :return: """ logger_info(f"\n\t\t-- Setting the folder to place the datasource") project_name = kwargs["value"]["@folder_name"] path = project_name # get the project id of the project name # project_id = self.project.get_project_id(project_name, path, name="set_folder") project_id = self.project.get_project_ids(path) if project_id: self.root_project_id = project_id logger_info(f"\t\tProject {path} found") else: self.root_project_id = "" logger_error(f"\t\tProject {project_name} not found") return def apply_schedule(self, *args, **kwargs): """ Apply schedules on target datasource: :return: """ logger_info(f"\n\t\t-- Applying schedules on target datasource datasource") schedule_name = kwargs["value"]["@schedule_name"] self.schedule_name = schedule_name def save_datasource(self): """ Save the changes in the datasource to datasource directory. :return: """ logger_info(f"\t\tSaving the datasource : {self.twb_file}") self.tree.write(os.path.join(self.temp_dir, self.twb_file)) # zip the temp directory into .twbx file #self.zip_datasource() def zip_datasource(self, publish=None): """ Zip the datasource. :return: """ #new_filename = os.path.join() filename = f"{os.path.basename(self.name).split('.')[0]}.tdsx" if publish: output_dir = os.path.dirname(self.filepath) else: output_dir = self.output_dir zip_directory_to_twbx(self.temp_dir, os.path.join(output_dir, filename)) def publish_(self): """ Publish the workbook. :return: """ if ".tdsx" in self.content_id: self.content_id = self.content_id.replace(".tdsx", "") if self.name_map: for key, value in self.name_map.items(): if value == self.name: self.name = key logger_info(f"\t\tPublishing the DataSource : {self.name}") if self.root_project_id: published_datasource = sys_variables.server.datasources.publish( DatasourceItem( name=self.name, project_id=self.root_project_id, ), self.filepath, "Overwrite", ) self.apply_schedules(published_datasource.id) self.update_datasource_name() else: logger_info( "\n\t\tNo project found to publish. There could be multiple reasons for this" "\n\t\t1. The project structure is not correct in config. " "\n\t\t2. The project structure is not available in the destination site. " "\n\t\tPlease check the spelling of the names of each project in the project structure " "\n\t\tCheck in the destination site whether the project structure exists or not " f"\n\t\tDATASOURCE FILENAME : {self.content_id}") def apply_schedules(self, data_source_id): """ Apply Schedules. :return: """ # get a datasource item that should be added to a schedule datasource_item = sys_variables.server.datasources.get_by_id(data_source_id) datasource_schedules = sys_variables.server.schedules.get() specific_name = self.schedule_name if specific_name: matching_schedule_ids = "" for schedule in datasource_schedules[0]: if schedule.name == specific_name: matching_schedule_ids = schedule.id.strip() # print("Matching Schedule IDs for '{}': {}".format(specific_name, matching_schedule_ids)) # retrieve the id of the target schedule schedule_id = matching_schedule_ids try: # Add the data source to the schedule sys_variables.server.schedules.add_to_schedule( schedule_id=schedule_id, datasource=datasource_item, ) logger_info( f"\tSuccessfully applied schedules for the {self.name} data source." ) except ServerResponseError as e: if e.code == "403078": logger_info( "Permission Denied: Not allowed to create extract refresh tasks on this object." ) else: # Handle other types of errors if needed logger_error("An error occurred:", e) def __repr__(self): return f"Datasource : {self.name}" def rename_datasource(self, *args, **kwargs): """ Rename the datasource. :return: """ logger_info( f"\n\t\t-- Renaming the datasource to : {kwargs['value']['@new_ds_name']}" ) new_name = kwargs["value"]["@new_ds_name"] current_path = kwargs["value"]["@current_path"] self.content_id = new_name def get_all_projects(self, server): all_projects = {} for project in list(TSC.Pager(server.projects)): all_projects[project.id] = project return all_projects def get_full_path(self, datasource, projects_dict): path_parts = [datasource.name] project = projects_dict.get(datasource.project_id) while project: path_parts.insert(0, f"/{project.name}") if project.parent_id: project = projects_dict.get(project.parent_id) else: project = None return '/'.join(path_parts) def update_datasource_name(self): full_path = f"{self.rename_path}/{self.name}" self.name = self.rename_ds matched_datasource = None projects_dict = self.get_all_projects(sys_variables.server) for datasource in Pager(sys_variables.server.datasources.get): if self.get_full_path(datasource, projects_dict) == full_path: matched_datasource = datasource break if matched_datasource: matched_datasource.name = self.name sys_variables.server.datasources.update(matched_datasource) print(f"Datasource '{full_path}' updated to '{self.name}'") class RecurseDataSourceProjectPath: def __init__(self, project_path, source_folder_map, publish_folder_map, recurse, source, each_rule, target=None, name_map=None): self.project_path = project_path self.recurse = recurse self.source_folder_map = source_folder_map self.publish_folder_map = publish_folder_map self.source = source self.projects_to_operate = [] self.rule = each_rule self.target = target self.name_map = name_map self.reverse_name_map = {value: key for key, value in self.name_map.items()} if self.project_path.lower() == "all": for key, value in source_folder_map.items(): self.projects_to_operate.append(key) if recurse: for key, value in source_folder_map.items(): if self.project_path.lower() == "all": self.projects_to_operate.append(key) if recurse.lower() in ['true', 'yes']: if key.startswith(project_path) and value: self.projects_to_operate.append(key) else: if project_path == '/': if key.count('/') == 1: self.projects_to_operate.append(key) elif project_path == key and value: self.projects_to_operate.append(key) self.workbooks = [] def get_datasources(self): for project_path in self.projects_to_operate: # Iterate through the project paths # find the keys name_map=None try: folder_uuid = self.source_folder_map[project_path] destination_project_id = self.publish_folder_map[project_path] file_dir = os.path.join(self.source, folder_uuid) # Iterate through the directory for file in os.listdir(file_dir): workbook_name = os.path.splitext(file)[0] actual_name = self.reverse_name_map[workbook_name] if os.path.splitext(file)[1] in ['.tdsx', '.tds']: logger_info(f"\t\tFile exist in the path : {os.path.join(file_dir, file)}") self.workbooks.append( [ os.path.join(file_dir, file), Datasource( # TODO: change the file path read from db os.path.join(file_dir, file), workbook_name, actual_name, self.rule, destination_project_id, "transform", self.target, name_map, project_path ), ] ) except Exception as e: pass return self.workbooks def get_datasource_objects(rule, source_folder_map, project_archive_map, source, target=None, name_map=None , type=None): datasources = [] logger_info(f"\n\n-- Going through the config file.") logger_info(f"-------------------------------------------------------------") if type == "transform": rules = rule["directives"]["datasources"]["transform"]["datasource"] else: rules = rule["directives"]["datasources"]["publish"]["datasource"] if isinstance(rules, dict): rules = [rules] for each_rule in rules: try: project_path = each_rule["@project"] content_name = each_rule["@name"] required = each_rule["@enabled"] logger_info(f"\n\nProject : {project_path} , Datasource : {content_name} , Enabled : {required}") # Search the project path in source archive map if required.lower() in ["true", "yes", "ok"]: if content_name.lower() in ['all', '*']: recurse = each_rule.get("@recurse", None) rpp = RecurseDataSourceProjectPath(project_path, source_folder_map, project_archive_map, recurse, source, each_rule, target, name_map) datasources = datasources + rpp.get_datasources() print("datasources",datasources) continue if project_path in source_folder_map and content_name in name_map: logger_info(f"\t\tProject path exist in destination : {urllib.parse.unquote(project_path)}") # Get the folder name where the workbook is lying project_id = source_folder_map[project_path] destination_project_id = project_archive_map[project_path] if destination_project_id is None: logger_error(f"\t\tThe project in destination doesn't exist : {project_path}") # continue file_dir = os.path.join(source, project_id) #content_id = name_map[content_name] sanitized_datasource_name = replace_special_characters(content_name) file_path = get_file_path_with_extension(sanitized_datasource_name, file_dir) # if type == "transform": # content_name = content_id # Check whether the filename exist in the path or not if file_path and os.path.exists(file_path): logger_info(f"\t\tFile exist in the path : {file_path}") datasources.append( [ # os.path.join(dirpath, file), file_path, Datasource( # TODO: change the file path read from db file_path, content_name, sanitized_datasource_name, each_rule, destination_project_id, "transform", target, name_map, project_path ), ] ) else: logger_info(f"File doesn't exist in the path : {file_path}. " f"The datasource might not be tagged.") else: logger_info(f"\t\tThe datasource : {content_name} is not tagged in project path : {project_path}." f"Please check the config file.") else: logger_info(f"Disabled for Datasource : {content_name}") except Exception as e: logger_error(f"\t\tThe datasource : {content_name} in project : {project_path} " f" is not tagged.") pass return datasources
Editor is loading...
Leave a Comment