Untitled

 avatar
unknown
plain_text
9 months ago
45 kB
6
Indexable
import tempfile
import xml.etree.ElementTree as ET
from pathlib import Path
from os import environ
from tableauserverclient import DatasourceItem, ServerResponseError, PersonalAccessTokenAuth, Server, Pager
from tabulate import tabulate

from src.logger.logger import *
from src.msystem import sys_variables
from src.utilities import *
import tableauserverclient as TSC
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

# get the environ variables
forward_slash = environ.get("slashreresent", "_fwd_SLASH")

# env load for set_ds_connection_info function


class TransformDirective:
    def __init__(self, file, rules):
        self.name = file
        self.sheets = rules

    def set_ds_connection_info(self, *args, **kwargs):
        logger_info(f"\n\t\t-- Applying Set connection info transformation")
        logger_info(
            f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and "
            f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m",
            color=Fore.YELLOW)
        root = args[0]

        database_type = kwargs["value"]["@class"]
        caption = kwargs["value"]["@caption"]

        # password vault
        try:
            pvault = sys_variables.pvault
        except Exception:
            pvault = None
            pass

        # find the node having connection class as database_type
        named_connection_node = root.find(f".//named-connection[@caption='{caption}']")
        if named_connection_node is not None:
            logger_info(f'\n\t\tNamed Connection node found : {named_connection_node}')
            # Get the children connection node which has class defined in database_type
            connection_node = named_connection_node.find(f".//connection[@class='{database_type}']")
            if connection_node is not None:
                logger_info(f'\n\t\tConnection node found : {connection_node}')
                # Change the attributes to the new value now
                for key, value in kwargs["value"][database_type].items():
                    logger_info(f"\t\tChanging the value of attribute {key[1:]} to {value}")
                    if pvault:
                        if key[1:] in pvault:
                            pvault_val = pvault[key[1:]]
                            logger_info(f"\t\tChanging the password of {key[1:]} to {pvault_val}")
                            connection_node.set(key[1:], pvault_val)
                    connection_node.set(key[1:], value)

    def replace_table_name(self, *args, **kwargs):
        logger_info(f"\n\t\t-- Applying Replace table name transformation")
        root = args[0]
        # print(ET.tostring(root, encoding='unicode', method='xml'))

        # Case One:Main Datasoure
        # Findeing the name-connection tag in the datasource
        # New_Table_name
        new_table_name = kwargs["value"]["@new_table_name"]
        # Old_Table_name
        old_table_name = kwargs["value"]["@old_table_name"]
        for element in root.iter():
            if "caption" in element.attrib and element.get("caption") == old_table_name:
                logger_info(f"\t\tTable Named Matched {old_table_name}")
                element.set("caption", new_table_name)
            if "name" in element.attrib and element.get("name") == old_table_name:
                element.set("name", new_table_name)
            # else:
            #     logger_info(f"\t\tTable Named Doesnt Matched")

    def change_custom_sql(self, *args, **kwargs):

        logger_info(f"\n\t\t-- Applying Set custom sql transformation")
        root = args[0]
        name = kwargs["value"]["@name"]
        old_query = kwargs["value"]["@old_sql"]
        new_query = kwargs["value"]["@new_sql"]
        elements_to_replace = root.findall(f".//*[@name='{name}']")

        for element_to_replace in elements_to_replace:
            if element_to_replace.text == old_query:
                logger_info(f"\t\tOld query Matched")
                element_to_replace.text = new_query
            else:
                logger_info(f"\t\tOld query Doesn't Match")

    def replace_calculation_formula(self, *args, **kwargs):
        """
        Find the node which matches the caption and get the child node of the caption node calculation.
        Replace the calculation formula.
        :param args:
        :param kwargs:
        :return:
        """
        logger_info(f"\n\t\t-- Applying Replace Calculation Formula transformation")
        root = args[0]
        caption = kwargs["value"]["@caption"]
        new_calculation_formula = kwargs["value"]["@new_formula"]
        column_nodes = root.findall(f".//*[@caption='{caption}']")
        # Get all the child nodes of the column node which has the calculation tag
        for column_node in column_nodes:
            calculation_nodes = column_node.findall(".//calculation")
            for calculation_node in calculation_nodes:
                calculation_node.set("formula", new_calculation_formula)
                logger_info(
                    f"\t\tSetted calculation node : {calculation_node}, new formula as : {new_calculation_formula}"
                )

    def set_display_name(self, *args, **kwargs):
        """
        Find the datasource node which matches the caption and set the display name.
        :param args:
        :param kwargs:
        :return:
        """
        logger_info(f"\n\t\t-- Applying Set Display Name transformation", Fore.BLUE)
        root = args[0]
        old_display_name = kwargs["value"]["@old_display_name"]
        new_display_name = kwargs["value"]["@new_display_name"]
        datasource_node = root.findall(f".//datasource[@caption='{old_display_name}']")
        if datasource_node:
            logger_info(
                f"\t\tFound {len(datasource_node)} datasource nodes with caption {old_display_name}."
            )

            for ds_node in datasource_node:
                ds_node.set("caption", new_display_name)
                logger_info(
                    f"\t\tSet the caption of the datasource node to {new_display_name}."
                )

    def remove_extract(self, *args, **kwargs):
        """
        Remove the extract from the datasource.
        :param args:
        :param kwargs:
        :return:
        """
        logger_info(f"\n\t\t-- Applying Remove Extract transformation")
        root = args[0]
        new_value = kwargs["value"]["@new_value"]
        extract_nodes = root.findall(f".//extract")
        if extract_nodes:
            logger_info(f"\t\tFound {len(extract_nodes)}.")

            for ex_node in extract_nodes:
                ex_node.set("enabled", new_value)
                logger_info(
                    f"\t\tSet the enabled attribute of the extract node to {new_value}."
                )

    def transform_unc_path(self, *args, **kwargs):
        logger_info(f"\n\t\t-- Applying UNC transformation")
        root = args[0]
        caption_name = kwargs["value"]["@caption"]
        path = kwargs["value"]["@path"]
        file = kwargs["value"]["@file"]
        file_name, file_type = os.path.splitext(file)
        # Find the named-connection element with the specified caption
        for named_connection in root.iter('named-connection'):
            caption = named_connection.get('caption')
            logger_info(f"\t\t Named connection caption: {caption}")

            if caption == caption_name:
                logger_info("\t\t Matching named-connection found")

                # Find the connection element and update its filename attribute
                connection = named_connection.find('connection')
                if connection is not None:
                    class_value = connection.get('class')
                    if file_type == ".xlsx":
                        if class_value == "excel-direct":
                            path = path + file
                            filename = connection.get('filename')
                            name = filename.split('/')[-1]
                            if name == file:
                                connection.set('filename', path)
                                logger_info(f"\t\t Updated old path {filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".csv" or file_type == ".txt":
                        if class_value == "textscan":
                            filename = connection.get('filename')
                            directory = connection.get('directory')
                            if filename == file:
                                connection.set('directory', path)
                                path = path + file
                                logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".pdf":
                        path = path + file
                        if class_value == "pdf":
                            filename = connection.get('origfilename')
                            name = filename.split('/')[-1]
                            if name == file:
                                connection.set('origfilename', path)
                                logger_info(f"\t\t Updated old path {filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".kml":
                        if class_value == "ogrdirect":
                            filename = connection.get('filename')
                            directory = connection.get('directory')
                            if filename == file:
                                connection.set('directory', path)
                                path = path + file
                                logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".json":
                        if class_value == "semistructpassivestore-direct":
                            filename = connection.get('filename')
                            directory = connection.get('directory')
                            if filename == file:
                                connection.set('directory', path)
                                path = path + file
                                logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".hyper":
                        path = path + file
                        if class_value == "hyper":
                            filename = connection.get('dbname')
                            name = filename.split('/')[-1]
                            if name == file:
                                connection.set('dbname', path)
                                logger_info(f"\t\t Updated old path {filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".sas7bdat":
                        if class_value == "stat-direct":
                            filename = connection.get('filename')
                            directory = connection.get('directory')
                            if filename == file:
                                connection.set('directory', path)
                                path = path + file
                                logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    else:
                        print("else")
                        

        # elif file_type == ".csv":
        #     # Find the named-connection element with the specified caption
        #     for named_connection in root.iter('named-connection'):
        #         caption = named_connection.get('caption')
        #         print("Named connection caption:", caption)
    
        #         if caption == caption_name:
        #             print("Matching named-connection found")
    
        #             # Find the connection element and update its filename attribute
        #             connection = named_connection.find('connection')
        #             if connection is not None:
        #                 connection.set('directory', path)
        #                 print("Updated filename to:", path)
        # else:
        #     print("else")

    def tabulate(self, header, data):
        """
        Print in tabulate format
        :param header:
        :param data:
        :return:
        """
        print(tabulate(data, headers=header, tablefmt="grid"))


class Project:
    def __init__(self, server):
        self.projects = list(TSC.Pager(server.projects))
        self.project_id_parent_id_map = {}
        self.project_parent_map = {}
        for project in self.projects:
            self.project_id_parent_id_map[project.id] = project
            self.project_parent_map[project.name] = project.parent_id
        self.server = server

    def get_project_id_nested(self, project_names):
        """
        Get the project ID of the project name.
        :param project_name:
        :return:
        """
        status, project = self.check_project_exists(project_names)
        if status:
            for project_name in self.projects:
                if project_name.name.lower() == project.lower():
                    return project_name.id
        else:
            logger_info(f"\t\tProject {project} not found")

    def check_project_exists(self, project_names):
        """
        Check if the project exists
        :param project_name:
        :return:
        """
        parent = None
        logger_info(f"\t\tProject Names for nested checking: {project_names}")
        projects_found = []
        projects_not_found = []
        for each_project_name in project_names:
            try:
                parent_project_id = self.project_parent_map[each_project_name]
                try:
                    parent_project = self.project_id_parent_id_map[parent_project_id]
                except:
                    parent_project = None

                if parent_project is None or parent_project.name.lower() == parent.lower():
                    projects_found.append(each_project_name)
            except KeyError as e:
                projects_not_found.append(each_project_name)
                break
            parent = each_project_name

        if projects_not_found:
            logger_info(f"\t\tProject {projects_not_found} not found")
            return False, projects_not_found

        return True, project_names[-1]

    def get_project_id(self, project_name, path: None, name: None):
        """
        Get the project id of the project name.
        :param project_name:
        :return:
        """
        # loop
        # for project in self.projects:
        #     if project.name == project_name:
        #         return project.id
        if path is None:
            print("Path is None.")
            return None

        if name is None:
            path_parts = path.split(os.path.sep)
        else:
            path_parts = path.split("/")

        # path_parts = path.split("\\")
        current_project = None
        for project in self.projects:
            if project.name == path_parts[1]:
                current_project = project
                break

            # If the project doesn't exist, return None
        if current_project is None:
            return None

        # print("Current Project:", current_project.id)

        # Traverse through the rest of the path parts
        for part in path_parts[2:]:
            # Find the child project with the given name and parent ID
            child_project_found = False
            for child in self.projects:
                if child.parent_id == current_project.id and child.name == part:
                    current_project = child
                    child_project_found = True
                    break

            # If the child project doesn't exist, return None
            if not child_project_found:
                return None

            # print("Current Project:", current_project.id)

        return current_project.id

    def get_child_project_id(self, projects, project_path):
        # Base case: If the project path is empty, return None
        if not project_path:
            return None
        for project in projects:
            project_path_ = project_path[0].replace(forward_slash, '/')
            if project.name.lower() == project_path_.lower():
                if len(project_path) == 1:
                    return project.id
                else:
                    child_projects = [p for p in self.projects if p.parent_id == project.id]
                    child_project_id = self.get_child_project_id(child_projects, project_path[1:])
                    if child_project_id:
                        return child_project_id
        return None

    def get_project_ids(self, path):
        main_path = path[1:]
        if '/' in main_path:
            path_parts = main_path.split('/')
            return self.get_child_project_id(self.projects, path_parts)
        else:
            for pro in self.projects:
                main_path = main_path.replace(forward_slash, '/')
                if pro.name.lower() == main_path.lower() and pro.parent_id is None:
                    return pro.id
        return None

    def make_project(self, name, parent_id=None):
        new_project = TSC.ProjectItem(name, description=None, content_permissions=None, parent_id=parent_id)
        try:
            created_project = self.server.projects.create(new_project)
            return created_project
        except TSC.ServerResponseError as e:
            print(f"Error creating project: {e}")
            return None


    def create_project(self, path, all_projects, parent_project=None):
        project_name = path[0].replace('_fwd_SLASH_', '/')

        if parent_project is None:
            existing_project = next(
                (p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id is None), None)
        else:
            existing_project = next(
                (p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id == parent_project.id),
                None)
        if existing_project:
            current_project = existing_project
        else:
            # Create the project if it doesn't exist
            new_project = self.make_project(project_name, parent_project.id if parent_project else None)
            if new_project:
                current_project = new_project
            else:
                print(f"Failed to create project: {project_name}")
                return None

        if len(path) > 1:
            return self.create_project(path[1:], all_projects, current_project)
        else:
            return current_project

    def get_path_id(self, path):
        path=path[1:]
        all_projects=list(TSC.Pager(self.server.projects))
        if '/' in path:
            path=path.split('/')
            project_id= self.get_child_project_id(path,all_projects)
            return project_id
        else:
            for project in all_projects:
                if project.name==path and project.parent_id==None:
                    return project.id

class Datasource:
    def __init__(self, filepath, filename, content_id, rules, project_id, type, output=None, name_map=None, project_path=None):
        # Full path to the file
        self.filepath = filepath
        # Name of the file
        self.name = filename
        self.content_id = content_id

        self.rule = rules
        self.transform = TransformDirective(filepath, rules)

        # self.publish = PublishDirective(filepath, rules)
        self.project_id = project_id

        self.root_project_id = project_id

        self.schedule_name = ""

        self.rename_path = None
        self.rename_ds = None

        self.project = Project(sys_variables.server)
        self.name_map = name_map
        self.project_path = project_path
        # if type == "publish":
        #     self.project = Project(sys_variables.server)
        #     project_name = self.get_project_path_from_rules()
        #     if project_name:
        #         self.root_project_id = self.project.get_project_id_nested(project_name)

        # Unzip the files in the temp directory
        self.temp_dir = tempfile.NamedTemporaryFile().name
        unzip_to_dir(filepath, self.temp_dir)
        # Get the .twb file from the temp directory
        filename = [
            file for file in os.listdir(self.temp_dir) if file.endswith(".tds")
        ][0]
        self.twb_file = os.path.join(self.temp_dir, filename)

        # Read the xml file using ElementTree
        self.tree = ET.parse(self.twb_file)
        self.root = self.tree.getroot()

        # Check if output directory exists, if not create it
        output_dir = (
            os.path.join(Path.home(), output) if output else sys_variables.output_dir
        )
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # self.output_dir = os.path.join(Path.home(), output_dir, self.project_id)
        # if not os.path.exists(self.output_dir):
        #     os.makedirs(self.output_dir)
        if self.project_id:
            self.output_dir = os.path.join(Path.home(), output_dir, self.project_id)
            if not os.path.exists(self.output_dir):
                os.makedirs(self.output_dir)
        else:
            self.output_dir = None

    def get_project_path_from_rules(self):
        project_name = []
        for rule in self.rule:
            if rule["@name"] in self.name:
                project_name = rule["@project"].split("/")[1:]
        if project_name:
            logger_info(
                f"\t\tProject structure for default publish : {project_name} for {self.name}"
            )
        return project_name

    def apply_transformation(self):
        rule = self.rule
        rules_mapping = {
            "replace_table_name": self.transform.replace_table_name,
            "change_custom_sql": self.transform.change_custom_sql,
            "set_calculation_formula": self.transform.replace_calculation_formula,
            "set_display_name": self.transform.set_display_name,
            "remove_extract": self.transform.remove_extract,
            "set_ds_connection_info": self.transform.set_ds_connection_info,
            "transform_unc_path": self.transform.transform_unc_path
        }
        try:
            for key, value in rules_mapping.items():
                if key in rule:
                    if isinstance(rule[key], list):
                        for item in rule[key]:
                            if item["@enabled"].lower() == "false":
                                continue
                            rules_mapping[key](self.root, value=item)
                    else:
                        if rule[key]["@enabled"].lower() == "false":
                            continue
                        value = rule[key]
                        rules_mapping[key](self.root, value=value)
        except Exception as e:
            logger_error(f"Error processing rules: {e}")

    def apply_publish(self):
        # Get the root project ID from the project name in the destination server
        # This will get overwritten if the add Project or change project rule is enabled
        # self.root_project_id = self.project.get_project_id(
        #     project_name=rule["@project"], path=path, name=None
        # )
        rule = self.rule
        rules_mapping = {
            "set_folder": self.set_folder,
            "apply_schedule": self.apply_schedule,
            "rename_datasource": self.rename_datasource,
            "set_connection_secrets": self.set_connection_param,
        }

        if rule["@project"].lower() == "all" and  rule["@name"].lower() == "all":
            # logger_info("\n\t\tThe 'Add' and 'Change Project' directives have no effect.")
            # logger_info("\n\t\tThe workbook will be published in the same project as on the source server.")
            path = self.project_path
            project_id = self.project.get_path_id(path)
            all_projects=list(TSC.Pager(self.project.server.projects))
            if project_id is None:
                path=path[1:]
                if '/' in path:
                    path=path.split('/')
                    project_id=self.project.create_project(path,all_projects)
                    project_id=project_id.id
                else:
                    project_id=self.project.make_project(path)
                    project_id=project_id.id
            else:
                print("Project path already exists:",project_id)

            if project_id:
                self.root_project_id = project_id
                logger_info(f"\t\tProject {path} found")

        try:
            for key, value in rules_mapping.items():
                if key in rule:
                    if isinstance(rule[key], list):
                        for item in rule[key]:
                            if item["@enabled"].lower() == "false":
                                continue
                            rules_mapping[key](self.root, value=item)
                    else:
                        if rule[key]["@enabled"].lower() == "false":
                            continue
                        value = rule[key]
                        rules_mapping[key](self.root, value=value)
        except Exception as e:
            logger_error(f"Error processing rules: {e}")

    def set_connection_param(self, *args, **kwargs):
        logger_info(f"\n\t\t-- Applying Set connection param changes.")
        logger_info(
            f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and "
            f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m",
            color=Fore.YELLOW)
        root = args[0]

        database_type = kwargs["value"]["@class"]
        caption = kwargs["value"]["@caption"]

        # password vault
        try:
            pvault = sys_variables.pvault
        except Exception:
            pvault = None
            pass

        # find the node having connection class as database_type
        named_connection_node = root.find(f".//named-connection[@caption='{caption}']")
        if named_connection_node is not None:
            logger_info(f'\n\t\tDatabase Connection found with caption : {named_connection_node}')
            # Get the children connection node which has class defined in database_type
            connection_node = named_connection_node.find(f".//connection[@class='{database_type}']")
            if connection_node is not None:
                logger_info(f'\n\t\tConnection found : {connection_node}')
                # Change the attributes to the new value now
                for key, value in kwargs["value"][database_type].items():
                    if pvault:
                        for key1,val in pvault.items():
                            if value == key1:
                                pvault_val = pvault[key1]
                                logger_info(f"\t\tChanging")
                                connection_node.set(key[1:], pvault_val)
            logger_info(f"\t\tSaving the datasource:")
            self.save_datasource()
            self.zip_datasource("publish")
        else:
            logger_info(f"\n\t\tNo database connection found with the caption : {caption}")

    def apply_rename(self, rule, path):
        rules_mapping = {
            "rename_datasource": self.rename_datasource,
        }
        for key, value in rules_mapping.items():
            if key in rule:
                if rule[key]["@enabled"].lower() == "false":
                    continue
                value = rule[key]
                self.rename_ds = value['@new_ds_name']
                self.rename_path = value['@current_path']

    def set_folder(self, *args, **kwargs):
        """
        Folder To place the datasource:
        :return:
        """
        logger_info(f"\n\t\t-- Setting the folder to place the datasource")
        project_name = kwargs["value"]["@folder_name"]
        path = project_name
        # get the project id of the project name
        # project_id = self.project.get_project_id(project_name, path, name="set_folder")
        project_id = self.project.get_project_ids(path)
        if project_id:
            self.root_project_id = project_id
            logger_info(f"\t\tProject {path} found")
        else:
            self.root_project_id = ""
            logger_error(f"\t\tProject {project_name} not found")
            return

    def apply_schedule(self, *args, **kwargs):
        """
        Apply schedules on target datasource:
        :return:
        """
        logger_info(f"\n\t\t-- Applying schedules on target datasource datasource")
        schedule_name = kwargs["value"]["@schedule_name"]
        self.schedule_name = schedule_name

    def save_datasource(self):
        """
        Save the changes in the datasource to datasource directory.
        :return:
        """
        logger_info(f"\t\tSaving the datasource : {self.twb_file}")
        self.tree.write(os.path.join(self.temp_dir, self.twb_file))
        # zip the temp directory into .twbx file
        #self.zip_datasource()

    def zip_datasource(self, publish=None):
        """
        Zip the datasource.
        :return:
        """
        #new_filename = os.path.join()
        filename = f"{os.path.basename(self.name).split('.')[0]}.tdsx"
        if publish:
            output_dir = os.path.dirname(self.filepath)
        else:
            output_dir = self.output_dir
        zip_directory_to_twbx(self.temp_dir, os.path.join(output_dir, filename))

    def publish_(self):
        """
        Publish the workbook.
        :return:
        """
        if ".tdsx" in self.content_id:
            self.content_id = self.content_id.replace(".tdsx", "")
        
        if self.name_map:
            for key, value in self.name_map.items():  
                if value == self.name:
                    self.name = key
        
        logger_info(f"\t\tPublishing the DataSource : {self.name}")
        if self.root_project_id:
            published_datasource = sys_variables.server.datasources.publish(
                DatasourceItem(
                    name=self.name,
                    project_id=self.root_project_id,
                ),
                self.filepath,
                "Overwrite",
            )
            self.apply_schedules(published_datasource.id)
            self.update_datasource_name()
            

        else:
            logger_info(
                "\n\t\tNo project found to publish. There could be multiple reasons for this"
                "\n\t\t1. The project structure is not correct in config. "
                "\n\t\t2. The project structure is not available in the destination site. "
                "\n\t\tPlease check the spelling of the names of each project in the project structure "
                "\n\t\tCheck in the destination site whether the project structure exists or not "
                f"\n\t\tDATASOURCE FILENAME : {self.content_id}")

    def apply_schedules(self, data_source_id):
        """
        Apply Schedules.
        :return:
        """
        # get a datasource item that should be added to a schedule
        datasource_item = sys_variables.server.datasources.get_by_id(data_source_id)

        datasource_schedules = sys_variables.server.schedules.get()
        specific_name = self.schedule_name
        if specific_name:
            matching_schedule_ids = ""

            for schedule in datasource_schedules[0]:
                if schedule.name == specific_name:
                    matching_schedule_ids = schedule.id.strip()

            # print("Matching Schedule IDs for '{}': {}".format(specific_name, matching_schedule_ids))

            # retrieve the id of the target schedule
            schedule_id = matching_schedule_ids

            try:
                # Add the data source to the schedule
                sys_variables.server.schedules.add_to_schedule(
                    schedule_id=schedule_id,
                    datasource=datasource_item,
                )
                logger_info(
                    f"\tSuccessfully applied schedules for the {self.name} data source."
                )
            except ServerResponseError as e:
                if e.code == "403078":
                    logger_info(
                        "Permission Denied: Not allowed to create extract refresh tasks on this object."
                    )
                else:
                    # Handle other types of errors if needed
                    logger_error("An error occurred:", e)

    def __repr__(self):
        return f"Datasource : {self.name}"
    

    def rename_datasource(self, *args, **kwargs):
        """
        Rename the datasource.
        :return:
        """
        logger_info(
            f"\n\t\t-- Renaming the datasource to : {kwargs['value']['@new_ds_name']}"
        )
        new_name = kwargs["value"]["@new_ds_name"]
        current_path = kwargs["value"]["@current_path"]
        self.content_id = new_name

    def get_all_projects(self, server):
        all_projects = {}
        for project in list(TSC.Pager(server.projects)):
            all_projects[project.id] = project
        return all_projects

    def get_full_path(self, datasource, projects_dict):
        path_parts = [datasource.name]
        project = projects_dict.get(datasource.project_id)
        
        while project:
            path_parts.insert(0, f"/{project.name}")
            if project.parent_id:
                project = projects_dict.get(project.parent_id)
            else:
                project = None

        return '/'.join(path_parts)

    def update_datasource_name(self):
        full_path = f"{self.rename_path}/{self.name}"
        self.name = self.rename_ds

        matched_datasource = None
        projects_dict = self.get_all_projects(sys_variables.server)

        for datasource in Pager(sys_variables.server.datasources.get):
            if self.get_full_path(datasource, projects_dict) == full_path:
                matched_datasource = datasource
                break

        if matched_datasource:
            matched_datasource.name = self.name
            sys_variables.server.datasources.update(matched_datasource)

            print(f"Datasource '{full_path}' updated to '{self.name}'")

class RecurseDataSourceProjectPath:
    def __init__(self, project_path,
                 source_folder_map,
                 publish_folder_map,
                 recurse,
                 source,
                 each_rule,
                 target=None,
                 name_map=None):
        self.project_path = project_path
        self.recurse = recurse
        self.source_folder_map = source_folder_map
        self.publish_folder_map = publish_folder_map
        self.source = source
        self.projects_to_operate = []
        self.rule = each_rule
        self.target = target
        self.name_map = name_map

        self.reverse_name_map = {value: key for key, value in self.name_map.items()}

        if self.project_path.lower() == "all":
            for key, value in source_folder_map.items():
                self.projects_to_operate.append(key)

        if recurse:
            for key, value in source_folder_map.items():
                if self.project_path.lower() == "all":
                    self.projects_to_operate.append(key)
                if recurse.lower() in ['true', 'yes']:
                    if key.startswith(project_path) and value:
                        self.projects_to_operate.append(key)
                else:
                    if project_path == '/':
                        if key.count('/') == 1:
                            self.projects_to_operate.append(key)
                    elif project_path == key and value:
                        self.projects_to_operate.append(key)

        self.workbooks = []
    def get_datasources(self):
        for project_path in self.projects_to_operate:
            # Iterate through the project paths
            # find the keys
            name_map=None
            try:
                folder_uuid = self.source_folder_map[project_path]
                destination_project_id = self.publish_folder_map[project_path]
                file_dir = os.path.join(self.source, folder_uuid)
                # Iterate through the directory
                for file in os.listdir(file_dir):
                    workbook_name = os.path.splitext(file)[0]
                    actual_name = self.reverse_name_map[workbook_name]
                    if os.path.splitext(file)[1] in ['.tdsx', '.tds']:
                        logger_info(f"\t\tFile exist in the path : {os.path.join(file_dir, file)}")
                        self.workbooks.append(
                            [
                                os.path.join(file_dir, file),
                                Datasource(
                                    # TODO: change the file path read from db
                                    os.path.join(file_dir, file),
                                    workbook_name,
                                    actual_name,
                                    self.rule,
                                    destination_project_id,
                                    "transform",
                                    self.target,
                                    name_map,
                                    project_path
                                ),
                            ]
                        )
            except Exception as e:
                pass
        return self.workbooks

def get_datasource_objects(rule, source_folder_map, project_archive_map, source, target=None, name_map=None
                           , type=None):
    datasources = []
    logger_info(f"\n\n-- Going through the config file.")
    logger_info(f"-------------------------------------------------------------")
    if type == "transform":
        rules = rule["directives"]["datasources"]["transform"]["datasource"]
    else:
        rules = rule["directives"]["datasources"]["publish"]["datasource"]
    if isinstance(rules, dict):
        rules = [rules]
    for each_rule in rules:
        try:
            project_path = each_rule["@project"]
            content_name = each_rule["@name"]
            required = each_rule["@enabled"]
            logger_info(f"\n\nProject : {project_path} , Datasource : {content_name} , Enabled : {required}")
            # Search the project path in source archive map
            if required.lower() in ["true", "yes", "ok"]:
                if content_name.lower() in ['all', '*']:
                    recurse = each_rule.get("@recurse", None)
                    rpp = RecurseDataSourceProjectPath(project_path, source_folder_map,
                                             project_archive_map,
                                             recurse, source, each_rule, target, name_map)
                    datasources = datasources + rpp.get_datasources()
                    print("datasources",datasources)
                    continue

                if project_path in source_folder_map and content_name in name_map:
                        logger_info(f"\t\tProject path exist in destination : {urllib.parse.unquote(project_path)}")
                        # Get the folder name where the workbook is lying
                        project_id = source_folder_map[project_path]
                        destination_project_id = project_archive_map[project_path]

                        if destination_project_id is None:
                            logger_error(f"\t\tThe project in destination doesn't exist : {project_path}")
                            # continue

                        file_dir = os.path.join(source, project_id)

                        #content_id = name_map[content_name]
                        sanitized_datasource_name = replace_special_characters(content_name)
                        file_path = get_file_path_with_extension(sanitized_datasource_name, file_dir)

                        # if type == "transform":
                        #     content_name = content_id

                        # Check whether the filename exist in the path or not
                        if file_path and os.path.exists(file_path):
                            logger_info(f"\t\tFile exist in the path : {file_path}")
                            datasources.append(
                                [
                                    # os.path.join(dirpath, file),
                                    file_path,
                                    Datasource(
                                        # TODO: change the file path read from db
                                        file_path,
                                        content_name,
                                        sanitized_datasource_name,
                                        each_rule,
                                        destination_project_id,
                                        "transform",
                                        target,
                                        name_map,
                                        project_path
                                    ),
                                ]
                            )
                        else:
                            logger_info(f"File doesn't exist in the path : {file_path}. "
                                        f"The datasource might not be tagged.")
                else:
                    logger_info(f"\t\tThe datasource : {content_name} is not tagged in project path : {project_path}."
                                 f"Please check the config file.")
            else:
                logger_info(f"Disabled for Datasource : {content_name}")
        except Exception as e:
            logger_error(f"\t\tThe datasource : {content_name} in project : {project_path} "
                         f" is not tagged.")
            pass
    return datasources
Editor is loading...
Leave a Comment