Untitled
unknown
plain_text
a year ago
45 kB
10
Indexable
import tempfile
import xml.etree.ElementTree as ET
from pathlib import Path
from os import environ
from tableauserverclient import DatasourceItem, ServerResponseError, PersonalAccessTokenAuth, Server, Pager
from tabulate import tabulate
from src.logger.logger import *
from src.msystem import sys_variables
from src.utilities import *
import tableauserverclient as TSC
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
# get the environ variables
forward_slash = environ.get("slashreresent", "_fwd_SLASH")
# env load for set_ds_connection_info function
class TransformDirective:
def __init__(self, file, rules):
self.name = file
self.sheets = rules
def set_ds_connection_info(self, *args, **kwargs):
logger_info(f"\n\t\t-- Applying Set connection info transformation")
logger_info(
f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and "
f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m",
color=Fore.YELLOW)
root = args[0]
database_type = kwargs["value"]["@class"]
caption = kwargs["value"]["@caption"]
# password vault
try:
pvault = sys_variables.pvault
except Exception:
pvault = None
pass
# find the node having connection class as database_type
named_connection_node = root.find(f".//named-connection[@caption='{caption}']")
if named_connection_node is not None:
logger_info(f'\n\t\tNamed Connection node found : {named_connection_node}')
# Get the children connection node which has class defined in database_type
connection_node = named_connection_node.find(f".//connection[@class='{database_type}']")
if connection_node is not None:
logger_info(f'\n\t\tConnection node found : {connection_node}')
# Change the attributes to the new value now
for key, value in kwargs["value"][database_type].items():
logger_info(f"\t\tChanging the value of attribute {key[1:]} to {value}")
if pvault:
if key[1:] in pvault:
pvault_val = pvault[key[1:]]
logger_info(f"\t\tChanging the password of {key[1:]} to {pvault_val}")
connection_node.set(key[1:], pvault_val)
connection_node.set(key[1:], value)
def replace_table_name(self, *args, **kwargs):
logger_info(f"\n\t\t-- Applying Replace table name transformation")
root = args[0]
# print(ET.tostring(root, encoding='unicode', method='xml'))
# Case One:Main Datasoure
# Findeing the name-connection tag in the datasource
# New_Table_name
new_table_name = kwargs["value"]["@new_table_name"]
# Old_Table_name
old_table_name = kwargs["value"]["@old_table_name"]
for element in root.iter():
if "caption" in element.attrib and element.get("caption") == old_table_name:
logger_info(f"\t\tTable Named Matched {old_table_name}")
element.set("caption", new_table_name)
if "name" in element.attrib and element.get("name") == old_table_name:
element.set("name", new_table_name)
# else:
# logger_info(f"\t\tTable Named Doesnt Matched")
def change_custom_sql(self, *args, **kwargs):
logger_info(f"\n\t\t-- Applying Set custom sql transformation")
root = args[0]
name = kwargs["value"]["@name"]
old_query = kwargs["value"]["@old_sql"]
new_query = kwargs["value"]["@new_sql"]
elements_to_replace = root.findall(f".//*[@name='{name}']")
for element_to_replace in elements_to_replace:
if element_to_replace.text == old_query:
logger_info(f"\t\tOld query Matched")
element_to_replace.text = new_query
else:
logger_info(f"\t\tOld query Doesn't Match")
def replace_calculation_formula(self, *args, **kwargs):
"""
Find the node which matches the caption and get the child node of the caption node calculation.
Replace the calculation formula.
:param args:
:param kwargs:
:return:
"""
logger_info(f"\n\t\t-- Applying Replace Calculation Formula transformation")
root = args[0]
caption = kwargs["value"]["@caption"]
new_calculation_formula = kwargs["value"]["@new_formula"]
column_nodes = root.findall(f".//*[@caption='{caption}']")
# Get all the child nodes of the column node which has the calculation tag
for column_node in column_nodes:
calculation_nodes = column_node.findall(".//calculation")
for calculation_node in calculation_nodes:
calculation_node.set("formula", new_calculation_formula)
logger_info(
f"\t\tSetted calculation node : {calculation_node}, new formula as : {new_calculation_formula}"
)
def set_display_name(self, *args, **kwargs):
"""
Find the datasource node which matches the caption and set the display name.
:param args:
:param kwargs:
:return:
"""
logger_info(f"\n\t\t-- Applying Set Display Name transformation", Fore.BLUE)
root = args[0]
old_display_name = kwargs["value"]["@old_display_name"]
new_display_name = kwargs["value"]["@new_display_name"]
datasource_node = root.findall(f".//datasource[@caption='{old_display_name}']")
if datasource_node:
logger_info(
f"\t\tFound {len(datasource_node)} datasource nodes with caption {old_display_name}."
)
for ds_node in datasource_node:
ds_node.set("caption", new_display_name)
logger_info(
f"\t\tSet the caption of the datasource node to {new_display_name}."
)
def remove_extract(self, *args, **kwargs):
"""
Remove the extract from the datasource.
:param args:
:param kwargs:
:return:
"""
logger_info(f"\n\t\t-- Applying Remove Extract transformation")
root = args[0]
new_value = kwargs["value"]["@new_value"]
extract_nodes = root.findall(f".//extract")
if extract_nodes:
logger_info(f"\t\tFound {len(extract_nodes)}.")
for ex_node in extract_nodes:
ex_node.set("enabled", new_value)
logger_info(
f"\t\tSet the enabled attribute of the extract node to {new_value}."
)
def transform_unc_path(self, *args, **kwargs):
logger_info(f"\n\t\t-- Applying UNC transformation")
root = args[0]
caption_name = kwargs["value"]["@caption"]
path = kwargs["value"]["@path"]
file = kwargs["value"]["@file"]
file_name, file_type = os.path.splitext(file)
# Find the named-connection element with the specified caption
for named_connection in root.iter('named-connection'):
caption = named_connection.get('caption')
logger_info(f"\t\t Named connection caption: {caption}")
if caption == caption_name:
logger_info("\t\t Matching named-connection found")
# Find the connection element and update its filename attribute
connection = named_connection.find('connection')
if connection is not None:
class_value = connection.get('class')
if file_type == ".xlsx":
if class_value == "excel-direct":
path = path + file
filename = connection.get('filename')
name = filename.split('/')[-1]
if name == file:
connection.set('filename', path)
logger_info(f"\t\t Updated old path {filename} to new path {path}")
else:
logger_error("\t\t File name is not matched")
else:
logger_error(f"\t\t File type is not matched. Please check the file or file type")
elif file_type == ".csv" or file_type == ".txt":
if class_value == "textscan":
filename = connection.get('filename')
directory = connection.get('directory')
if filename == file:
connection.set('directory', path)
path = path + file
logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
else:
logger_error("\t\t File name is not matched")
else:
logger_error(f"\t\t File type is not matched. Please check the file or file type")
elif file_type == ".pdf":
path = path + file
if class_value == "pdf":
filename = connection.get('origfilename')
name = filename.split('/')[-1]
if name == file:
connection.set('origfilename', path)
logger_info(f"\t\t Updated old path {filename} to new path {path}")
else:
logger_error("\t\t File name is not matched")
else:
logger_error(f"\t\t File type is not matched. Please check the file or file type")
elif file_type == ".kml":
if class_value == "ogrdirect":
filename = connection.get('filename')
directory = connection.get('directory')
if filename == file:
connection.set('directory', path)
path = path + file
logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
else:
logger_error("\t\t File name is not matched")
else:
logger_error(f"\t\t File type is not matched. Please check the file or file type")
elif file_type == ".json":
if class_value == "semistructpassivestore-direct":
filename = connection.get('filename')
directory = connection.get('directory')
if filename == file:
connection.set('directory', path)
path = path + file
logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
else:
logger_error("\t\t File name is not matched")
else:
logger_error(f"\t\t File type is not matched. Please check the file or file type")
elif file_type == ".hyper":
path = path + file
if class_value == "hyper":
filename = connection.get('dbname')
name = filename.split('/')[-1]
if name == file:
connection.set('dbname', path)
logger_info(f"\t\t Updated old path {filename} to new path {path}")
else:
logger_error("\t\t File name is not matched")
else:
logger_error(f"\t\t File type is not matched. Please check the file or file type")
elif file_type == ".sas7bdat":
if class_value == "stat-direct":
filename = connection.get('filename')
directory = connection.get('directory')
if filename == file:
connection.set('directory', path)
path = path + file
logger_info(f"\t\t Updated old path {directory}/{filename} to new path {path}")
else:
logger_error("\t\t File name is not matched")
else:
logger_error(f"\t\t File type is not matched. Please check the file or file type")
else:
print("else")
# elif file_type == ".csv":
# # Find the named-connection element with the specified caption
# for named_connection in root.iter('named-connection'):
# caption = named_connection.get('caption')
# print("Named connection caption:", caption)
# if caption == caption_name:
# print("Matching named-connection found")
# # Find the connection element and update its filename attribute
# connection = named_connection.find('connection')
# if connection is not None:
# connection.set('directory', path)
# print("Updated filename to:", path)
# else:
# print("else")
def tabulate(self, header, data):
"""
Print in tabulate format
:param header:
:param data:
:return:
"""
print(tabulate(data, headers=header, tablefmt="grid"))
class Project:
def __init__(self, server):
self.projects = list(TSC.Pager(server.projects))
self.project_id_parent_id_map = {}
self.project_parent_map = {}
for project in self.projects:
self.project_id_parent_id_map[project.id] = project
self.project_parent_map[project.name] = project.parent_id
self.server = server
def get_project_id_nested(self, project_names):
"""
Get the project ID of the project name.
:param project_name:
:return:
"""
status, project = self.check_project_exists(project_names)
if status:
for project_name in self.projects:
if project_name.name.lower() == project.lower():
return project_name.id
else:
logger_info(f"\t\tProject {project} not found")
def check_project_exists(self, project_names):
"""
Check if the project exists
:param project_name:
:return:
"""
parent = None
logger_info(f"\t\tProject Names for nested checking: {project_names}")
projects_found = []
projects_not_found = []
for each_project_name in project_names:
try:
parent_project_id = self.project_parent_map[each_project_name]
try:
parent_project = self.project_id_parent_id_map[parent_project_id]
except:
parent_project = None
if parent_project is None or parent_project.name.lower() == parent.lower():
projects_found.append(each_project_name)
except KeyError as e:
projects_not_found.append(each_project_name)
break
parent = each_project_name
if projects_not_found:
logger_info(f"\t\tProject {projects_not_found} not found")
return False, projects_not_found
return True, project_names[-1]
def get_project_id(self, project_name, path: None, name: None):
"""
Get the project id of the project name.
:param project_name:
:return:
"""
# loop
# for project in self.projects:
# if project.name == project_name:
# return project.id
if path is None:
print("Path is None.")
return None
if name is None:
path_parts = path.split(os.path.sep)
else:
path_parts = path.split("/")
# path_parts = path.split("\\")
current_project = None
for project in self.projects:
if project.name == path_parts[1]:
current_project = project
break
# If the project doesn't exist, return None
if current_project is None:
return None
# print("Current Project:", current_project.id)
# Traverse through the rest of the path parts
for part in path_parts[2:]:
# Find the child project with the given name and parent ID
child_project_found = False
for child in self.projects:
if child.parent_id == current_project.id and child.name == part:
current_project = child
child_project_found = True
break
# If the child project doesn't exist, return None
if not child_project_found:
return None
# print("Current Project:", current_project.id)
return current_project.id
def get_child_project_id(self, projects, project_path):
# Base case: If the project path is empty, return None
if not project_path:
return None
for project in projects:
project_path_ = project_path[0].replace(forward_slash, '/')
if project.name.lower() == project_path_.lower():
if len(project_path) == 1:
return project.id
else:
child_projects = [p for p in self.projects if p.parent_id == project.id]
child_project_id = self.get_child_project_id(child_projects, project_path[1:])
if child_project_id:
return child_project_id
return None
def get_project_ids(self, path):
main_path = path[1:]
if '/' in main_path:
path_parts = main_path.split('/')
return self.get_child_project_id(self.projects, path_parts)
else:
for pro in self.projects:
main_path = main_path.replace(forward_slash, '/')
if pro.name.lower() == main_path.lower() and pro.parent_id is None:
return pro.id
return None
def make_project(self, name, parent_id=None):
new_project = TSC.ProjectItem(name, description=None, content_permissions=None, parent_id=parent_id)
try:
created_project = self.server.projects.create(new_project)
return created_project
except TSC.ServerResponseError as e:
print(f"Error creating project: {e}")
return None
def create_project(self, path, all_projects, parent_project=None):
project_name = path[0].replace('_fwd_SLASH_', '/')
if parent_project is None:
existing_project = next(
(p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id is None), None)
else:
existing_project = next(
(p for p in all_projects if p.name.lower() == project_name.lower() and p.parent_id == parent_project.id),
None)
if existing_project:
current_project = existing_project
else:
# Create the project if it doesn't exist
new_project = self.make_project(project_name, parent_project.id if parent_project else None)
if new_project:
current_project = new_project
else:
print(f"Failed to create project: {project_name}")
return None
if len(path) > 1:
return self.create_project(path[1:], all_projects, current_project)
else:
return current_project
def get_path_id(self, path):
path=path[1:]
all_projects=list(TSC.Pager(self.server.projects))
if '/' in path:
path=path.split('/')
project_id= self.get_child_project_id(path,all_projects)
return project_id
else:
for project in all_projects:
if project.name==path and project.parent_id==None:
return project.id
class Datasource:
def __init__(self, filepath, filename, content_id, rules, project_id, type, output=None, name_map=None, project_path=None):
# Full path to the file
self.filepath = filepath
# Name of the file
self.name = filename
self.content_id = content_id
self.rule = rules
self.transform = TransformDirective(filepath, rules)
# self.publish = PublishDirective(filepath, rules)
self.project_id = project_id
self.root_project_id = project_id
self.schedule_name = ""
self.rename_path = None
self.rename_ds = None
self.project = Project(sys_variables.server)
self.name_map = name_map
self.project_path = project_path
# if type == "publish":
# self.project = Project(sys_variables.server)
# project_name = self.get_project_path_from_rules()
# if project_name:
# self.root_project_id = self.project.get_project_id_nested(project_name)
# Unzip the files in the temp directory
self.temp_dir = tempfile.NamedTemporaryFile().name
unzip_to_dir(filepath, self.temp_dir)
# Get the .twb file from the temp directory
filename = [
file for file in os.listdir(self.temp_dir) if file.endswith(".tds")
][0]
self.twb_file = os.path.join(self.temp_dir, filename)
# Read the xml file using ElementTree
self.tree = ET.parse(self.twb_file)
self.root = self.tree.getroot()
# Check if output directory exists, if not create it
output_dir = (
os.path.join(Path.home(), output) if output else sys_variables.output_dir
)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# self.output_dir = os.path.join(Path.home(), output_dir, self.project_id)
# if not os.path.exists(self.output_dir):
# os.makedirs(self.output_dir)
if self.project_id:
self.output_dir = os.path.join(Path.home(), output_dir, self.project_id)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
else:
self.output_dir = None
def get_project_path_from_rules(self):
project_name = []
for rule in self.rule:
if rule["@name"] in self.name:
project_name = rule["@project"].split("/")[1:]
if project_name:
logger_info(
f"\t\tProject structure for default publish : {project_name} for {self.name}"
)
return project_name
def apply_transformation(self):
rule = self.rule
rules_mapping = {
"replace_table_name": self.transform.replace_table_name,
"change_custom_sql": self.transform.change_custom_sql,
"set_calculation_formula": self.transform.replace_calculation_formula,
"set_display_name": self.transform.set_display_name,
"remove_extract": self.transform.remove_extract,
"set_ds_connection_info": self.transform.set_ds_connection_info,
"transform_unc_path": self.transform.transform_unc_path
}
try:
for key, value in rules_mapping.items():
if key in rule:
if isinstance(rule[key], list):
for item in rule[key]:
if item["@enabled"].lower() == "false":
continue
rules_mapping[key](self.root, value=item)
else:
if rule[key]["@enabled"].lower() == "false":
continue
value = rule[key]
rules_mapping[key](self.root, value=value)
except Exception as e:
logger_error(f"Error processing rules: {e}")
def apply_publish(self):
# Get the root project ID from the project name in the destination server
# This will get overwritten if the add Project or change project rule is enabled
# self.root_project_id = self.project.get_project_id(
# project_name=rule["@project"], path=path, name=None
# )
rule = self.rule
rules_mapping = {
"set_folder": self.set_folder,
"apply_schedule": self.apply_schedule,
"rename_datasource": self.rename_datasource,
"set_connection_secrets": self.set_connection_param,
}
if rule["@project"].lower() == "all" and rule["@name"].lower() == "all":
# logger_info("\n\t\tThe 'Add' and 'Change Project' directives have no effect.")
# logger_info("\n\t\tThe workbook will be published in the same project as on the source server.")
path = self.project_path
project_id = self.project.get_path_id(path)
all_projects=list(TSC.Pager(self.project.server.projects))
if project_id is None:
path=path[1:]
if '/' in path:
path=path.split('/')
project_id=self.project.create_project(path,all_projects)
project_id=project_id.id
else:
project_id=self.project.make_project(path)
project_id=project_id.id
else:
print("Project path already exists:",project_id)
if project_id:
self.root_project_id = project_id
logger_info(f"\t\tProject {path} found")
try:
for key, value in rules_mapping.items():
if key in rule:
if isinstance(rule[key], list):
for item in rule[key]:
if item["@enabled"].lower() == "false":
continue
rules_mapping[key](self.root, value=item)
else:
if rule[key]["@enabled"].lower() == "false":
continue
value = rule[key]
rules_mapping[key](self.root, value=value)
except Exception as e:
logger_error(f"Error processing rules: {e}")
def set_connection_param(self, *args, **kwargs):
logger_info(f"\n\t\t-- Applying Set connection param changes.")
logger_info(
f"\n\t\t\x1B[3mBefore applying the transformation, please ensure that all the values are correct and "
f"matches with the server database connection. Otherwise it will fail while publishing.\x1B[0m",
color=Fore.YELLOW)
root = args[0]
database_type = kwargs["value"]["@class"]
caption = kwargs["value"]["@caption"]
# password vault
try:
pvault = sys_variables.pvault
except Exception:
pvault = None
pass
# find the node having connection class as database_type
named_connection_node = root.find(f".//named-connection[@caption='{caption}']")
if named_connection_node is not None:
logger_info(f'\n\t\tDatabase Connection found with caption : {named_connection_node}')
# Get the children connection node which has class defined in database_type
connection_node = named_connection_node.find(f".//connection[@class='{database_type}']")
if connection_node is not None:
logger_info(f'\n\t\tConnection found : {connection_node}')
# Change the attributes to the new value now
for key, value in kwargs["value"][database_type].items():
if pvault:
for key1,val in pvault.items():
if value == key1:
pvault_val = pvault[key1]
logger_info(f"\t\tChanging")
connection_node.set(key[1:], pvault_val)
logger_info(f"\t\tSaving the datasource:")
self.save_datasource()
self.zip_datasource("publish")
else:
logger_info(f"\n\t\tNo database connection found with the caption : {caption}")
def apply_rename(self, rule, path):
rules_mapping = {
"rename_datasource": self.rename_datasource,
}
for key, value in rules_mapping.items():
if key in rule:
if rule[key]["@enabled"].lower() == "false":
continue
value = rule[key]
self.rename_ds = value['@new_ds_name']
self.rename_path = value['@current_path']
def set_folder(self, *args, **kwargs):
"""
Folder To place the datasource:
:return:
"""
logger_info(f"\n\t\t-- Setting the folder to place the datasource")
project_name = kwargs["value"]["@folder_name"]
path = project_name
# get the project id of the project name
# project_id = self.project.get_project_id(project_name, path, name="set_folder")
project_id = self.project.get_project_ids(path)
if project_id:
self.root_project_id = project_id
logger_info(f"\t\tProject {path} found")
else:
self.root_project_id = ""
logger_error(f"\t\tProject {project_name} not found")
return
def apply_schedule(self, *args, **kwargs):
"""
Apply schedules on target datasource:
:return:
"""
logger_info(f"\n\t\t-- Applying schedules on target datasource datasource")
schedule_name = kwargs["value"]["@schedule_name"]
self.schedule_name = schedule_name
def save_datasource(self):
"""
Save the changes in the datasource to datasource directory.
:return:
"""
logger_info(f"\t\tSaving the datasource : {self.twb_file}")
self.tree.write(os.path.join(self.temp_dir, self.twb_file))
# zip the temp directory into .twbx file
#self.zip_datasource()
def zip_datasource(self, publish=None):
"""
Zip the datasource.
:return:
"""
#new_filename = os.path.join()
filename = f"{os.path.basename(self.name).split('.')[0]}.tdsx"
if publish:
output_dir = os.path.dirname(self.filepath)
else:
output_dir = self.output_dir
zip_directory_to_twbx(self.temp_dir, os.path.join(output_dir, filename))
def publish_(self):
"""
Publish the workbook.
:return:
"""
if ".tdsx" in self.content_id:
self.content_id = self.content_id.replace(".tdsx", "")
if self.name_map:
for key, value in self.name_map.items():
if value == self.name:
self.name = key
logger_info(f"\t\tPublishing the DataSource : {self.name}")
if self.root_project_id:
published_datasource = sys_variables.server.datasources.publish(
DatasourceItem(
name=self.name,
project_id=self.root_project_id,
),
self.filepath,
"Overwrite",
)
self.apply_schedules(published_datasource.id)
self.update_datasource_name()
else:
logger_info(
"\n\t\tNo project found to publish. There could be multiple reasons for this"
"\n\t\t1. The project structure is not correct in config. "
"\n\t\t2. The project structure is not available in the destination site. "
"\n\t\tPlease check the spelling of the names of each project in the project structure "
"\n\t\tCheck in the destination site whether the project structure exists or not "
f"\n\t\tDATASOURCE FILENAME : {self.content_id}")
def apply_schedules(self, data_source_id):
"""
Apply Schedules.
:return:
"""
# get a datasource item that should be added to a schedule
datasource_item = sys_variables.server.datasources.get_by_id(data_source_id)
datasource_schedules = sys_variables.server.schedules.get()
specific_name = self.schedule_name
if specific_name:
matching_schedule_ids = ""
for schedule in datasource_schedules[0]:
if schedule.name == specific_name:
matching_schedule_ids = schedule.id.strip()
# print("Matching Schedule IDs for '{}': {}".format(specific_name, matching_schedule_ids))
# retrieve the id of the target schedule
schedule_id = matching_schedule_ids
try:
# Add the data source to the schedule
sys_variables.server.schedules.add_to_schedule(
schedule_id=schedule_id,
datasource=datasource_item,
)
logger_info(
f"\tSuccessfully applied schedules for the {self.name} data source."
)
except ServerResponseError as e:
if e.code == "403078":
logger_info(
"Permission Denied: Not allowed to create extract refresh tasks on this object."
)
else:
# Handle other types of errors if needed
logger_error("An error occurred:", e)
def __repr__(self):
return f"Datasource : {self.name}"
def rename_datasource(self, *args, **kwargs):
"""
Rename the datasource.
:return:
"""
logger_info(
f"\n\t\t-- Renaming the datasource to : {kwargs['value']['@new_ds_name']}"
)
new_name = kwargs["value"]["@new_ds_name"]
current_path = kwargs["value"]["@current_path"]
self.content_id = new_name
def get_all_projects(self, server):
all_projects = {}
for project in list(TSC.Pager(server.projects)):
all_projects[project.id] = project
return all_projects
def get_full_path(self, datasource, projects_dict):
path_parts = [datasource.name]
project = projects_dict.get(datasource.project_id)
while project:
path_parts.insert(0, f"/{project.name}")
if project.parent_id:
project = projects_dict.get(project.parent_id)
else:
project = None
return '/'.join(path_parts)
def update_datasource_name(self):
full_path = f"{self.rename_path}/{self.name}"
self.name = self.rename_ds
matched_datasource = None
projects_dict = self.get_all_projects(sys_variables.server)
for datasource in Pager(sys_variables.server.datasources.get):
if self.get_full_path(datasource, projects_dict) == full_path:
matched_datasource = datasource
break
if matched_datasource:
matched_datasource.name = self.name
sys_variables.server.datasources.update(matched_datasource)
print(f"Datasource '{full_path}' updated to '{self.name}'")
class RecurseDataSourceProjectPath:
def __init__(self, project_path,
source_folder_map,
publish_folder_map,
recurse,
source,
each_rule,
target=None,
name_map=None):
self.project_path = project_path
self.recurse = recurse
self.source_folder_map = source_folder_map
self.publish_folder_map = publish_folder_map
self.source = source
self.projects_to_operate = []
self.rule = each_rule
self.target = target
self.name_map = name_map
self.reverse_name_map = {value: key for key, value in self.name_map.items()}
if self.project_path.lower() == "all":
for key, value in source_folder_map.items():
self.projects_to_operate.append(key)
if recurse:
for key, value in source_folder_map.items():
if self.project_path.lower() == "all":
self.projects_to_operate.append(key)
if recurse.lower() in ['true', 'yes']:
if key.startswith(project_path) and value:
self.projects_to_operate.append(key)
else:
if project_path == '/':
if key.count('/') == 1:
self.projects_to_operate.append(key)
elif project_path == key and value:
self.projects_to_operate.append(key)
self.workbooks = []
def get_datasources(self):
for project_path in self.projects_to_operate:
# Iterate through the project paths
# find the keys
name_map=None
try:
folder_uuid = self.source_folder_map[project_path]
destination_project_id = self.publish_folder_map[project_path]
file_dir = os.path.join(self.source, folder_uuid)
# Iterate through the directory
for file in os.listdir(file_dir):
workbook_name = os.path.splitext(file)[0]
actual_name = self.reverse_name_map[workbook_name]
if os.path.splitext(file)[1] in ['.tdsx', '.tds']:
logger_info(f"\t\tFile exist in the path : {os.path.join(file_dir, file)}")
self.workbooks.append(
[
os.path.join(file_dir, file),
Datasource(
# TODO: change the file path read from db
os.path.join(file_dir, file),
workbook_name,
actual_name,
self.rule,
destination_project_id,
"transform",
self.target,
name_map,
project_path
),
]
)
except Exception as e:
pass
return self.workbooks
def get_datasource_objects(rule, source_folder_map, project_archive_map, source, target=None, name_map=None
, type=None):
datasources = []
logger_info(f"\n\n-- Going through the config file.")
logger_info(f"-------------------------------------------------------------")
if type == "transform":
rules = rule["directives"]["datasources"]["transform"]["datasource"]
else:
rules = rule["directives"]["datasources"]["publish"]["datasource"]
if isinstance(rules, dict):
rules = [rules]
for each_rule in rules:
try:
project_path = each_rule["@project"]
content_name = each_rule["@name"]
required = each_rule["@enabled"]
logger_info(f"\n\nProject : {project_path} , Datasource : {content_name} , Enabled : {required}")
# Search the project path in source archive map
if required.lower() in ["true", "yes", "ok"]:
if content_name.lower() in ['all', '*']:
recurse = each_rule.get("@recurse", None)
rpp = RecurseDataSourceProjectPath(project_path, source_folder_map,
project_archive_map,
recurse, source, each_rule, target, name_map)
datasources = datasources + rpp.get_datasources()
print("datasources",datasources)
continue
if project_path in source_folder_map and content_name in name_map:
logger_info(f"\t\tProject path exist in destination : {urllib.parse.unquote(project_path)}")
# Get the folder name where the workbook is lying
project_id = source_folder_map[project_path]
destination_project_id = project_archive_map[project_path]
if destination_project_id is None:
logger_error(f"\t\tThe project in destination doesn't exist : {project_path}")
# continue
file_dir = os.path.join(source, project_id)
#content_id = name_map[content_name]
sanitized_datasource_name = replace_special_characters(content_name)
file_path = get_file_path_with_extension(sanitized_datasource_name, file_dir)
# if type == "transform":
# content_name = content_id
# Check whether the filename exist in the path or not
if file_path and os.path.exists(file_path):
logger_info(f"\t\tFile exist in the path : {file_path}")
datasources.append(
[
# os.path.join(dirpath, file),
file_path,
Datasource(
# TODO: change the file path read from db
file_path,
content_name,
sanitized_datasource_name,
each_rule,
destination_project_id,
"transform",
target,
name_map,
project_path
),
]
)
else:
logger_info(f"File doesn't exist in the path : {file_path}. "
f"The datasource might not be tagged.")
else:
logger_info(f"\t\tThe datasource : {content_name} is not tagged in project path : {project_path}."
f"Please check the config file.")
else:
logger_info(f"Disabled for Datasource : {content_name}")
except Exception as e:
logger_error(f"\t\tThe datasource : {content_name} in project : {project_path} "
f" is not tagged.")
pass
return datasourcesEditor is loading...
Leave a Comment