Untitled

mail@pastecode.io avatar
unknown
plain_text
a month ago
45 kB
2
Indexable
Never
import tempfile
import xml.etree.ElementTree as ET
from pathlib import Path
from os import environ
from tableauserverclient import DatasourceItem, ServerResponseError, PersonalAccessTokenAuth, Server, Pager
from tabulate import tabulate

from src.logger.logger import *
from src.msystem import sys_variables
from src.utilities import *
import tableauserverclient as TSC
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

# get the environ variables
forward_slash = environ.get("slashreresent", "_fwd_SLASH")

# env load for set_ds_connection_info function


class TransformDirective:
    def __init__(self, file, rules):
        self.name = file
        self.sheets = rules

    def set_ds_connection_info(self, *args, **kwargs):
        logger_info(f"\n\t\t-- Applying Set connection info transformation")
        logger_info(
            f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and "
            f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m",
            color=Fore.YELLOW)
        root = args[0]

        database_type = kwargs["value"]["@class"]
        caption = kwargs["value"]["@caption"]

        # password vault
        try:
            pvault = sys_variables.pvault
        except Exception:
            pvault = None
            pass

        # find the node having connection class as database_type
        named_connection_node = root.find(f".//named-connection[@caption='{caption}']")
        if named_connection_node is not None:
            logger_info(f'\n\t\tNamed Connection node found : {named_connection_node}')
            # Get the children connection node which has class defined in database_type
            connection_node = named_connection_node.find(f".//connection[@class='{database_type}']")
            if connection_node is not None:
                logger_info(f'\n\t\tConnection node found : {connection_node}')
                # Change the attributes to the new value now
                for key, value in kwargs["value"][database_type].items():
                    logger_info(f"\t\tChanging the value of attribute {key[1:]} to {value}")
                    if pvault:
                        if key[1:] in pvault:
                            pvault_val = pvault[key[1:]]
                            logger_info(f"\t\tChanging the password of {key[1:]} to {pvault_val}")
                            connection_node.set(key[1:], pvault_val)
                    connection_node.set(key[1:], value)

    def replace_table_name(self, *args, **kwargs):
        logger_info(f"\n\t\t-- Applying Replace table name transformation")
        root = args[0]
        # print(ET.tostring(root, encoding='unicode', method='xml'))

        # Case One:Main Datasoure
        # Findeing the name-connection tag in the datasource
        # New_Table_name
        new_table_name = kwargs["value"]["@new_table_name"]
        # Old_Table_name
        old_table_name = kwargs["value"]["@old_table_name"]
        for element in root.iter():
            if "caption" in element.attrib and element.get("caption") == old_table_name:
                logger_info(f"\t\tTable Named Matched {old_table_name}")
                element.set("caption", new_table_name)
            if "name" in element.attrib and element.get("name") == old_table_name:
                element.set("name", new_table_name)
            # else:
            #     logger_info(f"\t\tTable Named Doesnt Matched")

    def change_custom_sql(self, *args, **kwargs):

        logger_info(f"\n\t\t-- Applying Set custom sql transformation")
        root = args[0]
        name = kwargs["value"]["@name"]
        old_query = kwargs["value"]["@old_sql"]
        new_query = kwargs["value"]["@new_sql"]
        elements_to_replace = root.findall(f".//*[@name='{name}']")

        for element_to_replace in elements_to_replace:
            if element_to_replace.text == old_query:
                logger_info(f"\t\tOld query Matched")
                element_to_replace.text = new_query
            else:
                logger_info(f"\t\tOld query Doesn't Match")

    def replace_calculation_formula(self, *args, **kwargs):
        """
        Find the node which matches the caption and get the child node of the caption node calculation.
        Replace the calculation formula.
        :param args:
        :param kwargs:
        :return:
        """
        logger_info(f"\n\t\t-- Applying Replace Calculation Formula transformation")
        root = args[0]
        caption = kwargs["value"]["@caption"]
        new_calculation_formula = kwargs["value"]["@new_formula"]
        column_nodes = root.findall(f".//*[@caption='{caption}']")
        # Get all the child nodes of the column node which has the calculation tag
        for column_node in column_nodes:
            calculation_nodes = column_node.findall(".//calculation")
            for calculation_node in calculation_nodes:
                calculation_node.set("formula", new_calculation_formula)
                logger_info(
                    f"\t\tSetted calculation node : {calculation_node}, new formula as : {new_calculation_formula}"
                )

    def set_display_name(self, *args, **kwargs):
        """
        Find the datasource node which matches the caption and set the display name.
        :param args:
        :param kwargs:
        :return:
        """
        logger_info(f"\n\t\t-- Applying Set Display Name transformation", Fore.BLUE)
        root = args[0]
        old_display_name = kwargs["value"]["@old_display_name"]
        new_display_name = kwargs["value"]["@new_display_name"]
        datasource_node = root.findall(f".//datasource[@caption='{old_display_name}']")
        if datasource_node:
            logger_info(
                f"\t\tFound {len(datasource_node)} datasource nodes with caption {old_display_name}."
            )

            for ds_node in datasource_node:
                ds_node.set("caption", new_display_name)
                logger_info(
                    f"\t\tSet the caption of the datasource node to {new_display_name}."
                )

    def remove_extract(self, *args, **kwargs):
        """
        Remove the extract from the datasource.
        :param args:
        :param kwargs:
        :return:
        """
        logger_info(f"\n\t\t-- Applying Remove Extract transformation")
        root = args[0]
        new_value = kwargs["value"]["@new_value"]
        extract_nodes = root.findall(f".//extract")
        if extract_nodes:
            logger_info(f"\t\tFound {len(extract_nodes)}.")

            for ex_node in extract_nodes:
                ex_node.set("enabled", new_value)
                logger_info(
                    f"\t\tSet the enabled attribute of the extract node to {new_value}."
                )

    def transform_unc_path(self, *args, **kwargs):
        logger_info(f"\n\t\t-- Applying UNC transformation")
        root = args[0]
        caption_name = kwargs["value"]["@caption"]
        path = kwargs["value"]["@path"]
        file = kwargs["value"]["@file"]
        file_name, file_type = os.path.splitext(file)
        # Find the named-connection element with the specified caption
        for named_connection in root.iter('named-connection'):
            caption = named_connection.get('caption')
            logger_info(f"\t\t Named connection caption: {caption}")

            if caption == caption_name:
                logger_info("\t\t Matching named-connection found")

                # Find the connection element and update its filename attribute
                connection = named_connection.find('connection')
                if connection is not None:
                    class_value = connection.get('class')
                    if file_type == ".xlsx":
                        if class_value == "excel-direct":
                            path = path + file
                            filename = connection.get('filename')
                            name = filename.split('/')[-1]
                            if name == file:
                                connection.set('filename', path)
                                logger_info(f"\t\t Updated old path {filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".csv" or file_type == ".txt":
                        if class_value == "textscan":
                            filename = connection.get('filename')
                            directory = connection.get('directory')
                            if filename == file:
                                connection.set('directory', path)
                                path = path + file
                                logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".pdf":
                        path = path + file
                        if class_value == "pdf":
                            filename = connection.get('origfilename')
                            name = filename.split('/')[-1]
                            if name == file:
                                connection.set('origfilename', path)
                                logger_info(f"\t\t Updated old path {filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".kml":
                        if class_value == "ogrdirect":
                            filename = connection.get('filename')
                            directory = connection.get('directory')
                            if filename == file:
                                connection.set('directory', path)
                                path = path + file
                                logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".json":
                        if class_value == "semistructpassivestore-direct":
                            filename = connection.get('filename')
                            directory = connection.get('directory')
                            if filename == file:
                                connection.set('directory', path)
                                path = path + file
                                logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".hyper":
                        path = path + file
                        if class_value == "hyper":
                            filename = connection.get('dbname')
                            name = filename.split('/')[-1]
                            if name == file:
                                connection.set('dbname', path)
                                logger_info(f"\t\t Updated old path {filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    elif file_type == ".sas7bdat":
                        if class_value == "stat-direct":
                            filename = connection.get('filename')
                            directory = connection.get('directory')
                            if filename == file:
                                connection.set('directory', path)
                                path = path + file
                                logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
                            else:
                                logger_error("\t\t File name is not matched")
                        else:
                            logger_error(f"\t\t File type is not matched. Please check the file or file type")
                    else:
                        print("else")
                        

        # elif file_type == ".csv":
        #     # Find the named-connection element with the specified caption
        #     for named_connection in root.iter('named-connection'):
        #         caption = named_connection.get('caption')
        #         print("Named connection caption:", caption)
    
        #         if caption == caption_name:
        #             print("Matching named-connection found")
    
        #             # Find the connection element and update its filename attribute
        #             connection = named_connection.find('connection')
        #             if connection is not None:
        #                 connection.set('directory', path)
        #                 print("Updated filename to:", path)
        # else:
        #     print("else")

    def tabulate(self, header, data):
        """
        Print in tabulate format
        :param header:
        :param data:
        :return:
        """
        print(tabulate(data, headers=header, tablefmt="grid"))


class Project:
    def __init__(self, server):
        self.projects = list(TSC.Pager(server.projects))
        self.project_id_parent_id_map = {}
        self.project_parent_map = {}
        for project in self.projects:
            self.project_id_parent_id_map[project.id] = project
            self.project_parent_map[project.name] = project.parent_id
        self.server = server

    def get_project_id_nested(self, project_names):
        """
        Get the project ID of the project name.
        :param project_name:
        :return:
        """
        status, project = self.check_project_exists(project_names)
        if status:
            for project_name in self.projects:
                if project_name.name.lower() == project.lower():
                    return project_name.id
        else:
            logger_info(f"\t\tProject {project} not found")

    def check_project_exists(self, project_names):
        """
        Check if the project exists
        :param project_name:
        :return:
        """
        parent = None
        logger_info(f"\t\tProject Names for nested checking: {project_names}")
        projects_found = []
        projects_not_found = []
        for each_project_name in project_names:
            try:
                parent_project_id = self.project_parent_map[each_project_name]
                try:
                    parent_project = self.project_id_parent_id_map[parent_project_id]
                except:
                    parent_project = None

                if parent_project is None or parent_project.name.lower() == parent.lower():
                    projects_found.append(each_project_name)
            except KeyError as e:
                projects_not_found.append(each_project_name)
                break
            parent = each_project_name

        if projects_not_found:
            logger_info(f"\t\tProject {projects_not_found} not found")
            return False, projects_not_found

        return True, project_names[-1]

    def get_project_id(self, project_name, path: None, name: None):
        """
        Get the project id of the project name.
        :param project_name:
        :return:
        """
        # loop
        # for project in self.projects:
        #     if project.name == project_name:
        #         return project.id
        if path is None:
            print("Path is None.")
            return None

        if name is None:
            path_parts = path.split(os.path.sep)
        else:
            path_parts = path.split("/")

        # path_parts = path.split("\\")
        current_project = None
        for project in self.projects:
            if project.name == path_parts[1]:
                current_project = project
                break

            # If the project doesn't exist, return None
        if current_project is None:
            return None

        # print("Current Project:", current_project.id)

        # Traverse through the rest of the path parts
        for part in path_parts[2:]:
            # Find the child project with the given name and parent ID
            child_project_found = False
            for child in self.projects:
                if child.parent_id == current_project.id and child.name == part:
                    current_project = child
                    child_project_found = True
                    break

            # If the child project doesn't exist, return None
            if not child_project_found:
                return None

            # print("Current Project:", current_project.id)

        return current_project.id

    def get_child_project_id(self, projects, project_path):
        # Base case: If the project path is empty, return None
        if not project_path:
            return None
        for project in projects:
            project_path_ = project_path[0].replace(forward_slash, '/')
            if project.name.lower() == project_path_.lower():
                if len(project_path) == 1:
                    return project.id
                else:
                    child_projects = [p for p in self.projects if p.parent_id == project.id]
                    child_project_id = self.get_child_project_id(child_projects, project_path[1:])
                    if child_project_id:
                        return child_project_id
        return None

    def get_project_ids(self, path):
        main_path = path[1:]
        if '/' in main_path:
            path_parts = main_path.split('/')
            return self.get_child_project_id(self.projects, path_parts)
        else:
            for pro in self.projects:
                main_path = main_path.replace(forward_slash, '/')
                if pro.name.lower() == main_path.lower() and pro.parent_id is None:
                    return pro.id
        return None

    def make_project(self, name, parent_id=None):
        new_project = TSC.ProjectItem(name, description=None, content_permissions=None, parent_id=parent_id)
        try:
            created_project = self.server.projects.create(new_project)
            return created_project
        except TSC.ServerResponseError as e:
            print(f"Error creating project: {e}")
            return None


    def create_project(self, path, all_projects, parent_project=None):
        project_name = path[0].replace('_fwd_SLASH_', '/')

        if parent_project is None:
            existing_project = next(
                (p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id is None), None)
        else:
            existing_project = next(
                (p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id == parent_project.id),
                None)
        if existing_project:
            current_project = existing_project
        else:
            # Create the project if it doesn't exist
            new_project = self.make_project(project_name, parent_project.id if parent_project else None)
            if new_project:
                current_project = new_project
            else:
                print(f"Failed to create project: {project_name}")
                return None

        if len(path) > 1:
            return self.create_project(path[1:], all_projects, current_project)
        else:
            return current_project

    def get_path_id(self, path):
        path=path[1:]
        all_projects=list(TSC.Pager(self.server.projects))
        if '/' in path:
            path=path.split('/')
            project_id= self.get_child_project_id(path,all_projects)
            return project_id
        else:
            for project in all_projects:
                if project.name==path and project.parent_id==None:
                    return project.id

class Datasource:
    def __init__(self, filepath, filename, content_id, rules, project_id, type, output=None, name_map=None, project_path=None):
        # Full path to the file
        self.filepath = filepath
        # Name of the file
        self.name = filename
        self.content_id = content_id

        self.rule = rules
        self.transform = TransformDirective(filepath, rules)

        # self.publish = PublishDirective(filepath, rules)
        self.project_id = project_id

        self.root_project_id = project_id

        self.schedule_name = ""

        self.rename_path = None
        self.rename_ds = None

        self.project = Project(sys_variables.server)
        self.name_map = name_map
        self.project_path = project_path
        # if type == "publish":
        #     self.project = Project(sys_variables.server)
        #     project_name = self.get_project_path_from_rules()
        #     if project_name:
        #         self.root_project_id = self.project.get_project_id_nested(project_name)

        # Unzip the files in the temp directory
        self.temp_dir = tempfile.NamedTemporaryFile().name
        unzip_to_dir(filepath, self.temp_dir)
        # Get the .twb file from the temp directory
        filename = [
            file for file in os.listdir(self.temp_dir) if file.endswith(".tds")
        ][0]
        self.twb_file = os.path.join(self.temp_dir, filename)

        # Read the xml file using ElementTree
        self.tree = ET.parse(self.twb_file)
        self.root = self.tree.getroot()

        # Check if output directory exists, if not create it
        output_dir = (
            os.path.join(Path.home(), output) if output else sys_variables.output_dir
        )
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # self.output_dir = os.path.join(Path.home(), output_dir, self.project_id)
        # if not os.path.exists(self.output_dir):
        #     os.makedirs(self.output_dir)
        if self.project_id:
            self.output_dir = os.path.join(Path.home(), output_dir, self.project_id)
            if not os.path.exists(self.output_dir):
                os.makedirs(self.output_dir)
        else:
            self.output_dir = None

    def get_project_path_from_rules(self):
        project_name = []
        for rule in self.rule:
            if rule["@name"] in self.name:
                project_name = rule["@project"].split("/")[1:]
        if project_name:
            logger_info(
                f"\t\tProject structure for default publish : {project_name} for {self.name}"
            )
        return project_name

    def apply_transformation(self):
        rule = self.rule
        rules_mapping = {
            "replace_table_name": self.transform.replace_table_name,
            "change_custom_sql": self.transform.change_custom_sql,
            "set_calculation_formula": self.transform.replace_calculation_formula,
            "set_display_name": self.transform.set_display_name,
            "remove_extract": self.transform.remove_extract,
            "set_ds_connection_info": self.transform.set_ds_connection_info,
            "transform_unc_path": self.transform.transform_unc_path
        }
        try:
            for key, value in rules_mapping.items():
                if key in rule:
                    if isinstance(rule[key], list):
                        for item in rule[key]:
                            if item["@enabled"].lower() == "false":
                                continue
                            rules_mapping[key](self.root, value=item)
                    else:
                        if rule[key]["@enabled"].lower() == "false":
                            continue
                        value = rule[key]
                        rules_mapping[key](self.root, value=value)
        except Exception as e:
            logger_error(f"Error processing rules: {e}")

    def apply_publish(self):
        # Get the root project ID from the project name in the destination server
        # This will get overwritten if the add Project or change project rule is enabled
        # self.root_project_id = self.project.get_project_id(
        #     project_name=rule["@project"], path=path, name=None
        # )
        rule = self.rule
        rules_mapping = {
            "set_folder": self.set_folder,
            "apply_schedule": self.apply_schedule,
            "rename_datasource": self.rename_datasource,
            "set_connection_secrets": self.set_connection_param,
        }

        if rule["@project"].lower() == "all" and  rule["@name"].lower() == "all":
            # logger_info("\n\t\tThe 'Add' and 'Change Project' directives have no effect.")
            # logger_info("\n\t\tThe workbook will be published in the same project as on the source server.")
            path = self.project_path
            project_id = self.project.get_path_id(path)
            all_projects=list(TSC.Pager(self.project.server.projects))
            if project_id is None:
                path=path[1:]
                if '/' in path:
                    path=path.split('/')
                    project_id=self.project.create_project(path,all_projects)
                    project_id=project_id.id
                else:
                    project_id=self.project.make_project(path)
                    project_id=project_id.id
            else:
                print("Project path already exists:",project_id)

            if project_id:
                self.root_project_id = project_id
                logger_info(f"\t\tProject {path} found")

        try:
            for key, value in rules_mapping.items():
                if key in rule:
                    if isinstance(rule[key], list):
                        for item in rule[key]:
                            if item["@enabled"].lower() == "false":
                                continue
                            rules_mapping[key](self.root, value=item)
                    else:
                        if rule[key]["@enabled"].lower() == "false":
                            continue
                        value = rule[key]
                        rules_mapping[key](self.root, value=value)
        except Exception as e:
            logger_error(f"Error processing rules: {e}")

    def set_connection_param(self, *args, **kwargs):
        logger_info(f"\n\t\t-- Applying Set connection param changes.")
        logger_info(
            f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and "
            f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m",
            color=Fore.YELLOW)
        root = args[0]

        database_type = kwargs["value"]["@class"]
        caption = kwargs["value"]["@caption"]

        # password vault
        try:
            pvault = sys_variables.pvault
        except Exception:
            pvault = None
            pass

        # find the node having connection class as database_type
        named_connection_node = root.find(f".//named-connection[@caption='{caption}']")
        if named_connection_node is not None:
            logger_info(f'\n\t\tDatabase Connection found with caption : {named_connection_node}')
            # Get the children connection node which has class defined in database_type
            connection_node = named_connection_node.find(f".//connection[@class='{database_type}']")
            if connection_node is not None:
                logger_info(f'\n\t\tConnection found : {connection_node}')
                # Change the attributes to the new value now
                for key, value in kwargs["value"][database_type].items():
                    if pvault:
                        for key1,val in pvault.items():
                            if value == key1:
                                pvault_val = pvault[key1]
                                logger_info(f"\t\tChanging")
                                connection_node.set(key[1:], pvault_val)
            logger_info(f"\t\tSaving the datasource:")
            self.save_datasource()
            self.zip_datasource("publish")
        else:
            logger_info(f"\n\t\tNo database connection found with the caption : {caption}")

    def apply_rename(self, rule, path):
        rules_mapping = {
            "rename_datasource": self.rename_datasource,
        }
        for key, value in rules_mapping.items():
            if key in rule:
                if rule[key]["@enabled"].lower() == "false":
                    continue
                value = rule[key]
                self.rename_ds = value['@new_ds_name']
                self.rename_path = value['@current_path']

    def set_folder(self, *args, **kwargs):
        """
        Folder To place the datasource:
        :return:
        """
        logger_info(f"\n\t\t-- Setting the folder to place the datasource")
        project_name = kwargs["value"]["@folder_name"]
        path = project_name
        # get the project id of the project name
        # project_id = self.project.get_project_id(project_name, path, name="set_folder")
        project_id = self.project.get_project_ids(path)
        if project_id:
            self.root_project_id = project_id
            logger_info(f"\t\tProject {path} found")
        else:
            self.root_project_id = ""
            logger_error(f"\t\tProject {project_name} not found")
            return

    def apply_schedule(self, *args, **kwargs):
        """
        Apply schedules on target datasource:
        :return:
        """
        logger_info(f"\n\t\t-- Applying schedules on target datasource datasource")
        schedule_name = kwargs["value"]["@schedule_name"]
        self.schedule_name = schedule_name

    def save_datasource(self):
        """
        Save the changes in the datasource to datasource directory.
        :return:
        """
        logger_info(f"\t\tSaving the datasource : {self.twb_file}")
        self.tree.write(os.path.join(self.temp_dir, self.twb_file))
        # zip the temp directory into .twbx file
        #self.zip_datasource()

    def zip_datasource(self, publish=None):
        """
        Zip the datasource.
        :return:
        """
        #new_filename = os.path.join()
        filename = f"{os.path.basename(self.name).split('.')[0]}.tdsx"
        if publish:
            output_dir = os.path.dirname(self.filepath)
        else:
            output_dir = self.output_dir
        zip_directory_to_twbx(self.temp_dir, os.path.join(output_dir, filename))

    def publish_(self):
        """
        Publish the workbook.
        :return:
        """
        if ".tdsx" in self.content_id:
            self.content_id = self.content_id.replace(".tdsx", "")
        
        if self.name_map:
            for key, value in self.name_map.items():  
                if value == self.name:
                    self.name = key
        
        logger_info(f"\t\tPublishing the DataSource : {self.name}")
        if self.root_project_id:
            published_datasource = sys_variables.server.datasources.publish(
                DatasourceItem(
                    name=self.name,
                    project_id=self.root_project_id,
                ),
                self.filepath,
                "Overwrite",
            )
            self.apply_schedules(published_datasource.id)
            self.update_datasource_name()
            

        else:
            logger_info(
                "\n\t\tNo project found to publish. There could be multiple reasons for this"
                "\n\t\t1. The project structure is not correct in config. "
                "\n\t\t2. The project structure is not available in the destination site. "
                "\n\t\tPlease check the spelling of the names of each project in the project structure "
                "\n\t\tCheck in the destination site whether the project structure exists or not "
                f"\n\t\tDATASOURCE FILENAME : {self.content_id}")

    def apply_schedules(self, data_source_id):
        """
        Apply Schedules.
        :return:
        """
        # get a datasource item that should be added to a schedule
        datasource_item = sys_variables.server.datasources.get_by_id(data_source_id)

        datasource_schedules = sys_variables.server.schedules.get()
        specific_name = self.schedule_name
        if specific_name:
            matching_schedule_ids = ""

            for schedule in datasource_schedules[0]:
                if schedule.name == specific_name:
                    matching_schedule_ids = schedule.id.strip()

            # print("Matching Schedule IDs for '{}': {}".format(specific_name, matching_schedule_ids))

            # retrieve the id of the target schedule
            schedule_id = matching_schedule_ids

            try:
                # Add the data source to the schedule
                sys_variables.server.schedules.add_to_schedule(
                    schedule_id=schedule_id,
                    datasource=datasource_item,
                )
                logger_info(
                    f"\tSuccessfully applied schedules for the {self.name} data source."
                )
            except ServerResponseError as e:
                if e.code == "403078":
                    logger_info(
                        "Permission Denied: Not allowed to create extract refresh tasks on this object."
                    )
                else:
                    # Handle other types of errors if needed
                    logger_error("An error occurred:", e)

    def __repr__(self):
        return f"Datasource : {self.name}"
    

    def rename_datasource(self, *args, **kwargs):
        """
        Rename the datasource.
        :return:
        """
        logger_info(
            f"\n\t\t-- Renaming the datasource to : {kwargs['value']['@new_ds_name']}"
        )
        new_name = kwargs["value"]["@new_ds_name"]
        current_path = kwargs["value"]["@current_path"]
        self.content_id = new_name

    def get_all_projects(self, server):
        all_projects = {}
        for project in list(TSC.Pager(server.projects)):
            all_projects[project.id] = project
        return all_projects

    def get_full_path(self, datasource, projects_dict):
        path_parts = [datasource.name]
        project = projects_dict.get(datasource.project_id)
        
        while project:
            path_parts.insert(0, f"/{project.name}")
            if project.parent_id:
                project = projects_dict.get(project.parent_id)
            else:
                project = None

        return '/'.join(path_parts)

    def update_datasource_name(self):
        full_path = f"{self.rename_path}/{self.name}"
        self.name = self.rename_ds

        matched_datasource = None
        projects_dict = self.get_all_projects(sys_variables.server)

        for datasource in Pager(sys_variables.server.datasources.get):
            if self.get_full_path(datasource, projects_dict) == full_path:
                matched_datasource = datasource
                break

        if matched_datasource:
            matched_datasource.name = self.name
            sys_variables.server.datasources.update(matched_datasource)

            print(f"Datasource '{full_path}' updated to '{self.name}'")

class RecurseDataSourceProjectPath:
    def __init__(self, project_path,
                 source_folder_map,
                 publish_folder_map,
                 recurse,
                 source,
                 each_rule,
                 target=None,
                 name_map=None):
        self.project_path = project_path
        self.recurse = recurse
        self.source_folder_map = source_folder_map
        self.publish_folder_map = publish_folder_map
        self.source = source
        self.projects_to_operate = []
        self.rule = each_rule
        self.target = target
        self.name_map = name_map

        self.reverse_name_map = {value: key for key, value in self.name_map.items()}

        if self.project_path.lower() == "all":
            for key, value in source_folder_map.items():
                self.projects_to_operate.append(key)

        if recurse:
            for key, value in source_folder_map.items():
                if self.project_path.lower() == "all":
                    self.projects_to_operate.append(key)
                if recurse.lower() in ['true', 'yes']:
                    if key.startswith(project_path) and value:
                        self.projects_to_operate.append(key)
                else:
                    if project_path == '/':
                        if key.count('/') == 1:
                            self.projects_to_operate.append(key)
                    elif project_path == key and value:
                        self.projects_to_operate.append(key)

        self.workbooks = []
    def get_datasources(self):
        for project_path in self.projects_to_operate:
            # Iterate through the project paths
            # find the keys
            name_map=None
            try:
                folder_uuid = self.source_folder_map[project_path]
                destination_project_id = self.publish_folder_map[project_path]
                file_dir = os.path.join(self.source, folder_uuid)
                # Iterate through the directory
                for file in os.listdir(file_dir):
                    workbook_name = os.path.splitext(file)[0]
                    actual_name = self.reverse_name_map[workbook_name]
                    if os.path.splitext(file)[1] in ['.tdsx', '.tds']:
                        logger_info(f"\t\tFile exist in the path : {os.path.join(file_dir, file)}")
                        self.workbooks.append(
                            [
                                os.path.join(file_dir, file),
                                Datasource(
                                    # TODO: change the file path read from db
                                    os.path.join(file_dir, file),
                                    workbook_name,
                                    actual_name,
                                    self.rule,
                                    destination_project_id,
                                    "transform",
                                    self.target,
                                    name_map,
                                    project_path
                                ),
                            ]
                        )
            except Exception as e:
                pass
        return self.workbooks

def get_datasource_objects(rule, source_folder_map, project_archive_map, source, target=None, name_map=None
                           , type=None):
    datasources = []
    logger_info(f"\n\n-- Going through the config file.")
    logger_info(f"-------------------------------------------------------------")
    if type == "transform":
        rules = rule["directives"]["datasources"]["transform"]["datasource"]
    else:
        rules = rule["directives"]["datasources"]["publish"]["datasource"]
    if isinstance(rules, dict):
        rules = [rules]
    for each_rule in rules:
        try:
            project_path = each_rule["@project"]
            content_name = each_rule["@name"]
            required = each_rule["@enabled"]
            logger_info(f"\n\nProject : {project_path} , Datasource : {content_name} , Enabled : {required}")
            # Search the project path in source archive map
            if required.lower() in ["true", "yes", "ok"]:
                if content_name.lower() in ['all', '*']:
                    recurse = each_rule.get("@recurse", None)
                    rpp = RecurseDataSourceProjectPath(project_path, source_folder_map,
                                             project_archive_map,
                                             recurse, source, each_rule, target, name_map)
                    datasources = datasources + rpp.get_datasources()
                    print("datasources",datasources)
                    continue

                if project_path in source_folder_map and content_name in name_map:
                        logger_info(f"\t\tProject path exist in destination : {urllib.parse.unquote(project_path)}")
                        # Get the folder name where the workbook is lying
                        project_id = source_folder_map[project_path]
                        destination_project_id = project_archive_map[project_path]

                        if destination_project_id is None:
                            logger_error(f"\t\tThe project in destination doesn't exist : {project_path}")
                            # continue

                        file_dir = os.path.join(source, project_id)

                        #content_id = name_map[content_name]
                        sanitized_datasource_name = replace_special_characters(content_name)
                        file_path = get_file_path_with_extension(sanitized_datasource_name, file_dir)

                        # if type == "transform":
                        #     content_name = content_id

                        # Check whether the filename exist in the path or not
                        if file_path and os.path.exists(file_path):
                            logger_info(f"\t\tFile exist in the path : {file_path}")
                            datasources.append(
                                [
                                    # os.path.join(dirpath, file),
                                    file_path,
                                    Datasource(
                                        # TODO: change the file path read from db
                                        file_path,
                                        content_name,
                                        sanitized_datasource_name,
                                        each_rule,
                                        destination_project_id,
                                        "transform",
                                        target,
                                        name_map,
                                        project_path
                                    ),
                                ]
                            )
                        else:
                            logger_info(f"File doesn't exist in the path : {file_path}. "
                                        f"The datasource might not be tagged.")
                else:
                    logger_info(f"\t\tThe datasource : {content_name} is not tagged in project path : {project_path}."
                                 f"Please check the config file.")
            else:
                logger_info(f"Disabled for Datasource : {content_name}")
        except Exception as e:
            logger_error(f"\t\tThe datasource : {content_name} in project : {project_path} "
                         f" is not tagged.")
            pass
    return datasources
Leave a Comment