# ================================================================================================================================================================================
# HOBS MULTI-TENANCY FUNCTIONALITY REQUEST APIS
# DATE AUTHOR VER CHANGE DESCRIPTION
# -------- --------- ----- ------------------
# 30.03.23 Lakkshman 1.0 Init version
# ================================================================================================================================================================================
from datetime import datetime, timezone
from sys import exc_info as sys_exc_info
from json import loads as json_loads
from json import dumps as json_dumps
from pandas import read_sql as pd_read_sql
import uuid
from sqlalchemy import create_engine as sqlalchemy_create_engine
from models.hos_event_logs import hos_event_logs
from models.hos_event_logs import *
from models.hos_event_log_by_businesskeys import hos_event_log_by_businesskeys
# define Python user-defined exceptions
class Error(Exception):
"""Base class for other exceptions"""
pass
# define Python user-defined exceptions
class VALIDATION_ERRORS(Error):
pass
# define Python user-defined exceptions
class DAG_ERRORS(Error):
pass
def apilogfile(api):
import logging
from os import environ as os_environ
log_file_path = os_environ.get('ANALYTICSDATASERVICELOGPATH')
logging.shutdown()
log_handler = logging.FileHandler(log_file_path+'/logs/'+api + '.log',mode='w+')
formatter = logging.Formatter(fmt='%(asctime)s [Process:%(process)d] [%(levelname)-8s] [%(filename)s - %(module)s] : %(message)s',datefmt= '%Y-%m-%d %H:%M:%S')
#formatter.converter = time.gmtime # if you want UTC tGime
log_handler.setFormatter(formatter)
LOGGER = logging.getLogger('apilog'+str(uuid.uuid1()))
if not LOGGER.handlers:
LOGGER.addHandler(log_handler)
LOGGER.setLevel(logging.DEBUG)
LOGGER.info('LOGGER activated for %s' % api)
return LOGGER
def update_event_log(apilogger, pv_mode, pd_hosel_date, pd_hosel_event_date, pv_batchId, pv_event_status, pv_args_keys, pv_enriched_data = None):
try:
# QUERIES: EventLogInsert
if pv_mode == "init":
query = "global queryString;queryString = hos_event_logs.objects(" \
"hosel_date='" + pd_hosel_date + "',"\
"hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
"hosel_event_seq='" + pv_batchId + "')" \
".update(hosel_event_type='createUser'" \
", hosel_event_status = #replaceEventStatus#" \
", hosel_triggered_by = 'multiTenantJob'" \
", hosel_event_data = '" + json_dumps(pv_args_keys) + "'" \
+ ")"
queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \
"hosel_event_value='createUser'," \
"hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
"hosel_event_seq='" + pv_batchId + "')" \
".update(" \
"hosel_date='" + pd_hosel_date +"',"\
"hosel_event_data='" + json_dumps(pv_args_keys) +"'," \
"hosel_event_type='createUser'," \
"hosel_event_status = #replaceEventStatus#," \
"hosel_triggered_by = 'multiTenantJob')"
elif pv_mode == "interim":
# QUERIES: EventLogInsert - opID/buID replace
query = "global queryString;queryString = hos_event_logs.objects(" \
"hosel_date='" + pd_hosel_date + "',"\
"hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
"hosel_event_seq='" + pv_batchId + "')" \
".update(" \
" hosel_bu_id = '" + pv_args_keys["nameType"] + "'" \
",hosel_op_id = '" + pv_args_keys["name"] + "'" \
")"
queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \
"hosel_event_value='createUser'," \
"hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
"hosel_event_seq='" + pv_batchId + "')" \
".update(" \
" hosel_bu_id = '" + pv_args_keys["nameType"] + "'" \
",hosel_op_id = '" + pv_args_keys["name"] + "'" \
")"
elif pv_mode == "update":
query = "global queryString;queryString = hos_event_logs.objects(" \
"hosel_date='" + pd_hosel_date + "',"\
"hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
"hosel_event_seq='" + pv_batchId + "')" \
".update(hosel_event_status = #replaceEventStatus# )"
queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \
"hosel_event_value='createUser'," \
"hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
"hosel_event_seq='" + pv_batchId + "')" \
".update(hosel_event_status = #replaceEventStatus# )"
elif pv_mode == "updateEnrichedData":
tt = {}
for k,v in pv_enriched_data.items():
if not isinstance(v,str):
tt.update({k: str(v)})
else:
tt.update({k: v})
apilogger.debug("%s|tt : %s" % (pv_mode.upper(), tt))
query = "global queryString;queryString = hos_event_logs.objects(" \
"hosel_date='" + pd_hosel_date + "',"\
"hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
"hosel_event_seq='" + pv_batchId + "')" \
".update(hosel_enriched_data = " + str(tt) +")"
queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \
"hosel_event_value='createUser'," \
"hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
"hosel_event_seq='" + pv_batchId + "')" \
".update(hosel_enriched_data = " + str(tt) +")"
# endIf
query = query.replace("#replaceEventStatus#", pv_event_status)
queryBS = queryBS.replace("#replaceEventStatus#", pv_event_status)
apilogger.debug("%s|query : %s" % (pv_mode.upper(), query))
apilogger.debug("%s|queryBS : %s" % (pv_mode.upper(), queryBS))
exec(query)
exec(queryBS)
except Exception as cupexp:
statusMessage = "Error - {%s} . Line No - {%s} " % (str(cupexp),str(sys_exc_info()[-1].tb_lineno))
apilogger.error(statusMessage)
raise Exception(statusMessage)
return
def postData(api, payLoad, configProperties):
import requests
import re
statusCode = "403"
statusMessage = ""
response = {}
batchId = None
update_query = None
update_queryBS = None
log_event_result = {'status': 'failed', 'validation': 'notInitialized'}
try:
# Received parameters
args_keys = payLoad.copy()
convert_to_UTC = True
# Initialize logger
apilogger=apilogfile(api)
apilogger.debug("args_keys #### : %s" % type(json_dumps(args_keys)))
apilogger.debug("args_keys/payLoad: %s" % json_dumps(args_keys))
# Initialization
if configProperties.has_section('config') \
and configProperties.has_option('config','convert_to_UTC') \
and configProperties.get('config','convert_to_UTC',raw=True) \
and configProperties.get('config','convert_to_UTC',raw=True).capitalize() in ('True','False'):
convert_to_UTC = bool(configProperties.has_option('config', 'convert_to_UTC'))
#withoutTimeZoneForUTC#date_info1 = datetime.utcnow().isoformat() if convert_to_UTC else datetime.now().astimezone().isoformat()
date_info = datetime.now(tz=timezone.utc).isoformat() if convert_to_UTC else datetime.now().astimezone().isoformat()
date_info = '2023-09-01T18:33:00.962+05:30'
hosel_event_date = (date_info[:date_info.find(".")+4] + (date_info[date_info.find("+"):] if date_info.find("+")>0 else ""))
hosel_date = date_info[:10]
apilogger.debug("convert_to_UTC : %s" % convert_to_UTC)
apilogger.debug("hosel_event_date: %s" % hosel_event_date)
apilogger.debug("hosel_date : %s" % hosel_date)
# batchId
if 'id' not in args_keys:
batchId = str(uuid.uuid1())
else:
batchId = args_keys["id"]
# --------------------------------------------------------------------------------------------------------------------------------------
# hos_event_log - initialized
log_event_result = "{'status': 'initialized', 'validation': 'initialized'}"
update_event_log(apilogger=apilogger
, pv_mode = "init"
, pd_hosel_date = hosel_date
, pd_hosel_event_date = hosel_event_date
, pv_batchId = batchId
, pv_event_status = log_event_result
, pv_args_keys = args_keys)
# Validation: Configuration
if not configProperties.has_section('apiconfiguration'):
raise VALIDATION_ERRORS("Configuration for validation not properly set.")
# Validation: job_details
if not configProperties.has_section('job_details'):
raise VALIDATION_ERRORS("DAG execution details not provided.")
# Validation: airflow DAG details
if not configProperties.has_section('job_details'):
raise VALIDATION_ERRORS("DAG execution details not provided.")
airflowURL = configProperties.get('job_details', 'airflowURL', raw=True) if configProperties.has_option('job_details', 'airflowURL') else None
certificate_path = configProperties.get('job_details', 'certificate_path', raw=True) if configProperties.has_option('job_details', 'certificate_path') else None
if not airflowURL:
raise VALIDATION_ERRORS("Airflow URL not set for execution.")
if "https://" in airflowURL and not certificate_path:
raise VALIDATION_ERRORS("Certificate path not specified.")
#airflowURL = "https://tcs-hob-sir-env01:11400/datapipeline/api/v1/dags/createNewUser/dagRuns"
apilogger.debug("Airflow end point: %s" % airflowURL)
apilogger.debug("certificate_path : %s" % certificate_path)
# Configuration: Read the validation
validkeys = json_loads(configProperties.get('apiconfiguration', 'validkeys', raw=True)) if configProperties.has_option('apiconfiguration', 'validkeys') else []
relatedPartyValidKeys = json_loads(configProperties.get('apiconfiguration', 'relatedPartyValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'relatedPartyValidKeys') else []
contactMediumValidKeys = json_loads(configProperties.get('apiconfiguration', 'contactMediumValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'contactMediumValidKeys') else []
contactPreferredValidKeys = json_loads(configProperties.get('apiconfiguration', 'contactPreferredValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'contactPreferredValidKeys') else []
postalAddressValidKeys = json_loads(configProperties.get('apiconfiguration', 'postalAddressValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'postalAddressValidKeys') else []
# VALIDATION: parent keys
apilogger.info("args_keys: %s" % type(args_keys))
apilogger.info(args_keys)
apilogger.info("validkeys: %s" % type(validkeys))
apilogger.info(validkeys)
if args_keys == None or not isinstance(args_keys, dict) or len(args_keys)==0:
statusCode = "422"
return VALIDATION_ERRORS("Missing required attribute in payLoad. " + ','.join(validkeys) + ".")
if (set(list(args_keys.keys()))-set(["id"])) != set(validkeys):
#raise VALIDATION_ERRORS("Validation failed on parent keys. '%s'" % ("','".join([v for v in list(args_keys.keys()) if v not in validkeys]+list(set(validkeys)-set(list(args_keys.keys()))))))
statusCode = "422"
t1 = list(set(validkeys)-(set(list(args_keys.keys()))-set(["id"])))
apilogger.debug("t1: %s" % t1)
if len(t1)>0:
raise VALIDATION_ERRORS("Validation failed on parent keys. '%s'" % ("','".join(t1)))
t2 = list(set(list(args_keys.keys()))-set(["id"])-set(validkeys))
apilogger.debug("t2: %s" % t2)
if len(t2)>0:
raise VALIDATION_ERRORS("Unprocessable parent keys found. '%s'" % ("','".join(t2)))
# hos_event_log - interim (opId, buId)
update_event_log(apilogger=apilogger
, pv_mode = "interim"
, pd_hosel_date = hosel_date
, pd_hosel_event_date = hosel_event_date
, pv_batchId = batchId
, pv_event_status = log_event_result
, pv_args_keys = args_keys)
# VALIDATION: relatedParty
if 'relatedParty' not in list(args_keys.keys()) \
or (not isinstance(args_keys['relatedParty'],list)) \
or (len(args_keys['relatedParty'])==0 or len(args_keys['relatedParty'])> 1):
statusCode = "422"
raise VALIDATION_ERRORS("Validation failed on relatedParty. Invalid construct.")
if set(list(args_keys['relatedParty'][0].keys())) != set(relatedPartyValidKeys):
statusCode = "422"
raise VALIDATION_ERRORS("Validation failed on relatedParty: {'%s'}" % ("','".join(list(set(relatedPartyValidKeys)-set(list(args_keys['relatedParty'][0].keys()))))))
# VALIDATION: contactMedium
if 'contactMedium' not in list(args_keys.keys()) \
or (not isinstance(args_keys['contactMedium'],list)) \
or (len(args_keys['contactMedium'])==0 or len(args_keys['contactMedium'])!= 3):
statusCode = "422"
raise VALIDATION_ERRORS("Validation failed on contactMedium. Invalid construct.")
# VALIDATION: contactMedium - preferred contact either one
if len([v['preferred'] for v in args_keys['contactMedium'] if 'preferred' in v and v['preferred'] == 'true'])==0:
statusCode = "422"
raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one preferred contact to be set true.")
# VALIDATION: contactMedium - core attributes
if len(['False' for v in args_keys['contactMedium'] if set(list(v.keys())) != set(contactMediumValidKeys)])>0:
statusCode = "422"
raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one contact missing core attribute. {'%s'}" % "','".join(contactMediumValidKeys))
# VALIDATION: contactMedium - attributes postal, contact, email
if len(['True' for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] in contactPreferredValidKeys])!=3:
statusCode = "422"
raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one contact missing core attribute. {'%s'}" % "','".join(contactPreferredValidKeys))
# VALIDATION: contactMedium - email
# Make a regular expression for validating an Email
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
emailID = "".join([v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "email"])
if not re.fullmatch(regex, emailID):
statusCode = "422"
raise VALIDATION_ERRORS("Invalid email id")
# VALIDATION: contactMedium - address
if len(['False' for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "postalAddress" and set(list(v['characteristic'][0].keys()))!=set(postalAddressValidKeys)])!=0:
statusCode = "422"
raise VALIDATION_ERRORS("Validation failed on postalAddress. Missing core attribute. {'%s'}" % "','".join(postalAddressValidKeys))
# DAG: Json construction for airflow
op_args = {"opID" : args_keys["name"]
,"buID" : args_keys["nameType"]
,"eventID" : batchId
}
# DAG: Json data - include relatedParty
op_args.update(args_keys['relatedParty'][0])
# DAG: Json data - include email, contact
contact = "".join([v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "contact"])
op_args.update({v['mediumType']:v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] in ("contact", "email")})
# DAG: Json data - include postalAddress
op_args.update({v['mediumType']:v['characteristic'][0] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == ("postalAddress")})
# Validate User
user = "tib_sso"
password = "tib_sso"
host = "10.16.16.128"
port = "3308"
schema = "tib_sso"
connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user
, password
, host
, port
, schema))
cnx = sqlalchemy_create_engine(connection_text
, encoding="utf-8"
, connect_args={'connect_timeout': 600})
cursor = cnx.connect()
query_sso = f"SELECT * FROM {schema}.sso_user WHERE lower(login)='{emailID}' OR lower(email)='{emailID}' OR primary_contact_number='{contact}'"
apilogger.debug("query(sso): %s" % query_sso)
df_sso = pd_read_sql(query_sso, con=cnx)
apilogger.debug("df_sso: %s" % len(df_sso))
if len(df_sso)>1:
statusCode = "409"
raise VALIDATION_ERRORS("Login/email id/contact requested already exists in the system.")
# hos_event_log - validationCompleted and userCreation process started
log_event_result = "{'validation': 'success', 'userCreation': 'DAGinitialized'}"
update_event_log(apilogger=apilogger
, pv_mode = "updateEnrichedData"
, pd_hosel_date = hosel_date
, pd_hosel_event_date = hosel_event_date
, pv_batchId = batchId
, pv_event_status = log_event_result
, pv_args_keys = args_keys
, pv_enriched_data = op_args)
apilogger.debug("INTRIM|op_args: %s" % op_args)
dag_data = {"dag_run_id": batchId}
dag_data.update({"conf": op_args.copy()})
dag_data = json_dumps(dag_data.copy())
apilogger.debug("FINAL|dag_data: %s" % dag_data)
airflowURL_new = airflowURL+"createNewUser/dagRuns"
apilogger.debug("airflowURL : %s" % airflowURL_new)
from requests.auth import HTTPBasicAuth
#dag_response = requests.post(airflowURL,headers = {'Content-Type': 'application/json'}, auth=HTTPBasicAuth('hobsadmin', 'hobsadmin') ,data = dag_data,verify=False)
dag_response = requests.post(airflowURL_new
, headers = {"Content-Type": "application/json", "Authorization": "Basic aG9ic2FkbWluOmhvYnNhZG1pbg=="}
, data = dag_data
, verify=certificate_path)
apilogger.debug("dag_response.status_code: %s" % dag_response.status_code)
apilogger.debug("dag_response.status_code: %s" % type(dag_response.status_code))
if str(dag_response.status_code) == "409":
apilogger.debug("DAG already exists, retrying the DAG")
airflowURL_new = airflowURL+"createNewUser/dagRuns/"+batchId+"/clear"
dl = ['true', 'false']
payload = {'dry_run': d1[0]}
apilogger.debug("airflowURL : %s" % airflowURL_new)
apilogger.debug("payload : %s" % payload)
dag_response = requests.post(airflowURL_new
, headers = {"Content-Type": "application/json", "Authorization": "Basic aG9ic2FkbWluOmhvYnNhZG1pbg=="}
, json = payload
, verify=certificate_path)
apilogger.debug(str(type(dag_response.text)))
dag_response= json_loads(dag_response.text)
apilogger.debug(str(dag_response))
if 'id' in list(dag_response.keys()):
apilogger.debug(str(type(dag_response)))
apilogger.debug(dag_response)
statusMessage = str(dag_response)
else:
statusCode = "500"
raise DAG_ERRORS("DAG Invoke failed")
# Respone
statusCode = "202"
statusMessage = "Request under process, use getCreateUserStatus for tracking"
log_event_result = "{'status': 'success', 'validation': 'success', 'userCreation': 'success'}"
except DAG_ERRORS as cupverrd:
log_event_result = "{'status': 'failed', 'validation': 'failed', 'userCreation': 'failed', 'errorDesc': '"+str(cupverrd).replace("'",'"')+"'}"
statusMessage = str(cupverrd)
apilogger.error(statusMessage)
except VALIDATION_ERRORS as cupverr:
log_event_result = "{'status': 'failed', 'validation': 'failed', 'userCreation': 'failed', 'errorDesc': '"+str(cupverr).replace("'",'"')+"'}"
statusMessage = str(cupverr)
apilogger.error(statusMessage)
except Exception as cupexp:
log_event_result = "{'status': 'exception', 'validation': 'exception', 'userCreation': 'exception', 'errorDesc': '"+str(cupexp).replace("'",'"')+"'}"
statusCode = "500"
statusMessage = "Error - {%s} . Line No - {%s} " % (str(cupexp),str(sys_exc_info()[-1].tb_lineno))
apilogger.error(statusMessage)
apilogger.info("statusCode : %s" % statusCode)
apilogger.info("statusMessage: %s" % statusMessage)
# ##update##eventLog##
update_event_log(apilogger=apilogger
, pv_mode = "update"
, pd_hosel_date = hosel_date
, pd_hosel_event_date = hosel_event_date
, pv_batchId = batchId
, pv_event_status = log_event_result
, pv_args_keys = args_keys)
response.update({"statusDescription": statusMessage
,"id": batchId
,"href": "http://172.16.177.58:11395//multitenant/getCreateUserStatus?id="+batchId
})
#if statusCode=="202":
# response.update({"response": op_args})
return ((statusCode, response))
remember above pattern of code in the same way i want you to design me an api for the given requirements