Untitled
unknown
plain_text
a year ago
26 kB
1
Indexable
Never
# ================================================================================================================================================================================ # HOBS MULTI-TENANCY FUNCTIONALITY REQUEST APIS # DATE AUTHOR VER CHANGE DESCRIPTION # -------- --------- ----- ------------------ # 30.03.23 Lakkshman 1.0 Init version # ================================================================================================================================================================================ from datetime import datetime, timezone from sys import exc_info as sys_exc_info from json import loads as json_loads from json import dumps as json_dumps from pandas import read_sql as pd_read_sql import uuid from sqlalchemy import create_engine as sqlalchemy_create_engine from models.hos_event_logs import hos_event_logs from models.hos_event_logs import * from models.hos_event_log_by_businesskeys import hos_event_log_by_businesskeys # define Python user-defined exceptions class Error(Exception): """Base class for other exceptions""" pass # define Python user-defined exceptions class VALIDATION_ERRORS(Error): pass # define Python user-defined exceptions class DAG_ERRORS(Error): pass def apilogfile(api): import logging from os import environ as os_environ log_file_path = os_environ.get('ANALYTICSDATASERVICELOGPATH') logging.shutdown() log_handler = logging.FileHandler(log_file_path+'/logs/'+api + '.log',mode='w+') formatter = logging.Formatter(fmt='%(asctime)s [Process:%(process)d] [%(levelname)-8s] [%(filename)s - %(module)s] : %(message)s',datefmt= '%Y-%m-%d %H:%M:%S') #formatter.converter = time.gmtime # if you want UTC tGime log_handler.setFormatter(formatter) LOGGER = logging.getLogger('apilog'+str(uuid.uuid1())) if not LOGGER.handlers: LOGGER.addHandler(log_handler) LOGGER.setLevel(logging.DEBUG) LOGGER.info('LOGGER activated for %s' % api) return LOGGER def update_event_log(apilogger, pv_mode, pd_hosel_date, pd_hosel_event_date, pv_batchId, pv_event_status, pv_args_keys, pv_enriched_data = None): try: # QUERIES: EventLogInsert if pv_mode == "init": query = "global queryString;queryString = hos_event_logs.objects(" \ "hosel_date='" + pd_hosel_date + "',"\ "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \ "hosel_event_seq='" + pv_batchId + "')" \ ".update(hosel_event_type='createUser'" \ ", hosel_event_status = #replaceEventStatus#" \ ", hosel_triggered_by = 'multiTenantJob'" \ ", hosel_event_data = '" + json_dumps(pv_args_keys) + "'" \ + ")" queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \ "hosel_event_value='createUser'," \ "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \ "hosel_event_seq='" + pv_batchId + "')" \ ".update(" \ "hosel_date='" + pd_hosel_date +"',"\ "hosel_event_data='" + json_dumps(pv_args_keys) +"'," \ "hosel_event_type='createUser'," \ "hosel_event_status = #replaceEventStatus#," \ "hosel_triggered_by = 'multiTenantJob')" elif pv_mode == "interim": # QUERIES: EventLogInsert - opID/buID replace query = "global queryString;queryString = hos_event_logs.objects(" \ "hosel_date='" + pd_hosel_date + "',"\ "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \ "hosel_event_seq='" + pv_batchId + "')" \ ".update(" \ " hosel_bu_id = '" + pv_args_keys["nameType"] + "'" \ ",hosel_op_id = '" + pv_args_keys["name"] + "'" \ ")" queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \ "hosel_event_value='createUser'," \ "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \ "hosel_event_seq='" + pv_batchId + "')" \ ".update(" \ " hosel_bu_id = '" + pv_args_keys["nameType"] + "'" \ ",hosel_op_id = '" + pv_args_keys["name"] + "'" \ ")" elif pv_mode == "update": query = "global queryString;queryString = hos_event_logs.objects(" \ "hosel_date='" + pd_hosel_date + "',"\ "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \ "hosel_event_seq='" + pv_batchId + "')" \ ".update(hosel_event_status = #replaceEventStatus# )" queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \ "hosel_event_value='createUser'," \ "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \ "hosel_event_seq='" + pv_batchId + "')" \ ".update(hosel_event_status = #replaceEventStatus# )" elif pv_mode == "updateEnrichedData": tt = {} for k,v in pv_enriched_data.items(): if not isinstance(v,str): tt.update({k: str(v)}) else: tt.update({k: v}) apilogger.debug("%s|tt : %s" % (pv_mode.upper(), tt)) query = "global queryString;queryString = hos_event_logs.objects(" \ "hosel_date='" + pd_hosel_date + "',"\ "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \ "hosel_event_seq='" + pv_batchId + "')" \ ".update(hosel_enriched_data = " + str(tt) +")" queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \ "hosel_event_value='createUser'," \ "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \ "hosel_event_seq='" + pv_batchId + "')" \ ".update(hosel_enriched_data = " + str(tt) +")" # endIf query = query.replace("#replaceEventStatus#", pv_event_status) queryBS = queryBS.replace("#replaceEventStatus#", pv_event_status) apilogger.debug("%s|query : %s" % (pv_mode.upper(), query)) apilogger.debug("%s|queryBS : %s" % (pv_mode.upper(), queryBS)) exec(query) exec(queryBS) except Exception as cupexp: statusMessage = "Error - {%s} . Line No - {%s} " % (str(cupexp),str(sys_exc_info()[-1].tb_lineno)) apilogger.error(statusMessage) raise Exception(statusMessage) return def postData(api, payLoad, configProperties): import requests import re statusCode = "403" statusMessage = "" response = {} batchId = None update_query = None update_queryBS = None log_event_result = {'status': 'failed', 'validation': 'notInitialized'} try: # Received parameters args_keys = payLoad.copy() convert_to_UTC = True # Initialize logger apilogger=apilogfile(api) apilogger.debug("args_keys #### : %s" % type(json_dumps(args_keys))) apilogger.debug("args_keys/payLoad: %s" % json_dumps(args_keys)) # Initialization if configProperties.has_section('config') \ and configProperties.has_option('config','convert_to_UTC') \ and configProperties.get('config','convert_to_UTC',raw=True) \ and configProperties.get('config','convert_to_UTC',raw=True).capitalize() in ('True','False'): convert_to_UTC = bool(configProperties.has_option('config', 'convert_to_UTC')) #withoutTimeZoneForUTC#date_info1 = datetime.utcnow().isoformat() if convert_to_UTC else datetime.now().astimezone().isoformat() date_info = datetime.now(tz=timezone.utc).isoformat() if convert_to_UTC else datetime.now().astimezone().isoformat() date_info = '2023-09-01T18:33:00.962+05:30' hosel_event_date = (date_info[:date_info.find(".")+4] + (date_info[date_info.find("+"):] if date_info.find("+")>0 else "")) hosel_date = date_info[:10] apilogger.debug("convert_to_UTC : %s" % convert_to_UTC) apilogger.debug("hosel_event_date: %s" % hosel_event_date) apilogger.debug("hosel_date : %s" % hosel_date) # batchId if 'id' not in args_keys: batchId = str(uuid.uuid1()) else: batchId = args_keys["id"] # -------------------------------------------------------------------------------------------------------------------------------------- # hos_event_log - initialized log_event_result = "{'status': 'initialized', 'validation': 'initialized'}" update_event_log(apilogger=apilogger , pv_mode = "init" , pd_hosel_date = hosel_date , pd_hosel_event_date = hosel_event_date , pv_batchId = batchId , pv_event_status = log_event_result , pv_args_keys = args_keys) # Validation: Configuration if not configProperties.has_section('apiconfiguration'): raise VALIDATION_ERRORS("Configuration for validation not properly set.") # Validation: job_details if not configProperties.has_section('job_details'): raise VALIDATION_ERRORS("DAG execution details not provided.") # Validation: airflow DAG details if not configProperties.has_section('job_details'): raise VALIDATION_ERRORS("DAG execution details not provided.") airflowURL = configProperties.get('job_details', 'airflowURL', raw=True) if configProperties.has_option('job_details', 'airflowURL') else None certificate_path = configProperties.get('job_details', 'certificate_path', raw=True) if configProperties.has_option('job_details', 'certificate_path') else None if not airflowURL: raise VALIDATION_ERRORS("Airflow URL not set for execution.") if "https://" in airflowURL and not certificate_path: raise VALIDATION_ERRORS("Certificate path not specified.") #airflowURL = "https://tcs-hob-sir-env01:11400/datapipeline/api/v1/dags/createNewUser/dagRuns" apilogger.debug("Airflow end point: %s" % airflowURL) apilogger.debug("certificate_path : %s" % certificate_path) # Configuration: Read the validation validkeys = json_loads(configProperties.get('apiconfiguration', 'validkeys', raw=True)) if configProperties.has_option('apiconfiguration', 'validkeys') else [] relatedPartyValidKeys = json_loads(configProperties.get('apiconfiguration', 'relatedPartyValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'relatedPartyValidKeys') else [] contactMediumValidKeys = json_loads(configProperties.get('apiconfiguration', 'contactMediumValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'contactMediumValidKeys') else [] contactPreferredValidKeys = json_loads(configProperties.get('apiconfiguration', 'contactPreferredValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'contactPreferredValidKeys') else [] postalAddressValidKeys = json_loads(configProperties.get('apiconfiguration', 'postalAddressValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'postalAddressValidKeys') else [] # VALIDATION: parent keys apilogger.info("args_keys: %s" % type(args_keys)) apilogger.info(args_keys) apilogger.info("validkeys: %s" % type(validkeys)) apilogger.info(validkeys) if args_keys == None or not isinstance(args_keys, dict) or len(args_keys)==0: statusCode = "422" return VALIDATION_ERRORS("Missing required attribute in payLoad. " + ','.join(validkeys) + ".") if (set(list(args_keys.keys()))-set(["id"])) != set(validkeys): #raise VALIDATION_ERRORS("Validation failed on parent keys. '%s'" % ("','".join([v for v in list(args_keys.keys()) if v not in validkeys]+list(set(validkeys)-set(list(args_keys.keys())))))) statusCode = "422" t1 = list(set(validkeys)-(set(list(args_keys.keys()))-set(["id"]))) apilogger.debug("t1: %s" % t1) if len(t1)>0: raise VALIDATION_ERRORS("Validation failed on parent keys. '%s'" % ("','".join(t1))) t2 = list(set(list(args_keys.keys()))-set(["id"])-set(validkeys)) apilogger.debug("t2: %s" % t2) if len(t2)>0: raise VALIDATION_ERRORS("Unprocessable parent keys found. '%s'" % ("','".join(t2))) # hos_event_log - interim (opId, buId) update_event_log(apilogger=apilogger , pv_mode = "interim" , pd_hosel_date = hosel_date , pd_hosel_event_date = hosel_event_date , pv_batchId = batchId , pv_event_status = log_event_result , pv_args_keys = args_keys) # VALIDATION: relatedParty if 'relatedParty' not in list(args_keys.keys()) \ or (not isinstance(args_keys['relatedParty'],list)) \ or (len(args_keys['relatedParty'])==0 or len(args_keys['relatedParty'])> 1): statusCode = "422" raise VALIDATION_ERRORS("Validation failed on relatedParty. Invalid construct.") if set(list(args_keys['relatedParty'][0].keys())) != set(relatedPartyValidKeys): statusCode = "422" raise VALIDATION_ERRORS("Validation failed on relatedParty: {'%s'}" % ("','".join(list(set(relatedPartyValidKeys)-set(list(args_keys['relatedParty'][0].keys())))))) # VALIDATION: contactMedium if 'contactMedium' not in list(args_keys.keys()) \ or (not isinstance(args_keys['contactMedium'],list)) \ or (len(args_keys['contactMedium'])==0 or len(args_keys['contactMedium'])!= 3): statusCode = "422" raise VALIDATION_ERRORS("Validation failed on contactMedium. Invalid construct.") # VALIDATION: contactMedium - preferred contact either one if len([v['preferred'] for v in args_keys['contactMedium'] if 'preferred' in v and v['preferred'] == 'true'])==0: statusCode = "422" raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one preferred contact to be set true.") # VALIDATION: contactMedium - core attributes if len(['False' for v in args_keys['contactMedium'] if set(list(v.keys())) != set(contactMediumValidKeys)])>0: statusCode = "422" raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one contact missing core attribute. {'%s'}" % "','".join(contactMediumValidKeys)) # VALIDATION: contactMedium - attributes postal, contact, email if len(['True' for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] in contactPreferredValidKeys])!=3: statusCode = "422" raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one contact missing core attribute. {'%s'}" % "','".join(contactPreferredValidKeys)) # VALIDATION: contactMedium - email # Make a regular expression for validating an Email regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' emailID = "".join([v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "email"]) if not re.fullmatch(regex, emailID): statusCode = "422" raise VALIDATION_ERRORS("Invalid email id") # VALIDATION: contactMedium - address if len(['False' for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "postalAddress" and set(list(v['characteristic'][0].keys()))!=set(postalAddressValidKeys)])!=0: statusCode = "422" raise VALIDATION_ERRORS("Validation failed on postalAddress. Missing core attribute. {'%s'}" % "','".join(postalAddressValidKeys)) # DAG: Json construction for airflow op_args = {"opID" : args_keys["name"] ,"buID" : args_keys["nameType"] ,"eventID" : batchId } # DAG: Json data - include relatedParty op_args.update(args_keys['relatedParty'][0]) # DAG: Json data - include email, contact contact = "".join([v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "contact"]) op_args.update({v['mediumType']:v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] in ("contact", "email")}) # DAG: Json data - include postalAddress op_args.update({v['mediumType']:v['characteristic'][0] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == ("postalAddress")}) # Validate User user = "tib_sso" password = "tib_sso" host = "10.16.16.128" port = "3308" schema = "tib_sso" connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user , password , host , port , schema)) cnx = sqlalchemy_create_engine(connection_text , encoding="utf-8" , connect_args={'connect_timeout': 600}) cursor = cnx.connect() query_sso = f"SELECT * FROM {schema}.sso_user WHERE lower(login)='{emailID}' OR lower(email)='{emailID}' OR primary_contact_number='{contact}'" apilogger.debug("query(sso): %s" % query_sso) df_sso = pd_read_sql(query_sso, con=cnx) apilogger.debug("df_sso: %s" % len(df_sso)) if len(df_sso)>1: statusCode = "409" raise VALIDATION_ERRORS("Login/email id/contact requested already exists in the system.") # hos_event_log - validationCompleted and userCreation process started log_event_result = "{'validation': 'success', 'userCreation': 'DAGinitialized'}" update_event_log(apilogger=apilogger , pv_mode = "updateEnrichedData" , pd_hosel_date = hosel_date , pd_hosel_event_date = hosel_event_date , pv_batchId = batchId , pv_event_status = log_event_result , pv_args_keys = args_keys , pv_enriched_data = op_args) apilogger.debug("INTRIM|op_args: %s" % op_args) dag_data = {"dag_run_id": batchId} dag_data.update({"conf": op_args.copy()}) dag_data = json_dumps(dag_data.copy()) apilogger.debug("FINAL|dag_data: %s" % dag_data) airflowURL_new = airflowURL+"createNewUser/dagRuns" apilogger.debug("airflowURL : %s" % airflowURL_new) from requests.auth import HTTPBasicAuth #dag_response = requests.post(airflowURL,headers = {'Content-Type': 'application/json'}, auth=HTTPBasicAuth('hobsadmin', 'hobsadmin') ,data = dag_data,verify=False) dag_response = requests.post(airflowURL_new , headers = {"Content-Type": "application/json", "Authorization": "Basic aG9ic2FkbWluOmhvYnNhZG1pbg=="} , data = dag_data , verify=certificate_path) apilogger.debug("dag_response.status_code: %s" % dag_response.status_code) apilogger.debug("dag_response.status_code: %s" % type(dag_response.status_code)) if str(dag_response.status_code) == "409": apilogger.debug("DAG already exists, retrying the DAG") airflowURL_new = airflowURL+"createNewUser/dagRuns/"+batchId+"/clear" dl = ['true', 'false'] payload = {'dry_run': d1[0]} apilogger.debug("airflowURL : %s" % airflowURL_new) apilogger.debug("payload : %s" % payload) dag_response = requests.post(airflowURL_new , headers = {"Content-Type": "application/json", "Authorization": "Basic aG9ic2FkbWluOmhvYnNhZG1pbg=="} , json = payload , verify=certificate_path) apilogger.debug(str(type(dag_response.text))) dag_response= json_loads(dag_response.text) apilogger.debug(str(dag_response)) if 'id' in list(dag_response.keys()): apilogger.debug(str(type(dag_response))) apilogger.debug(dag_response) statusMessage = str(dag_response) else: statusCode = "500" raise DAG_ERRORS("DAG Invoke failed") # Respone statusCode = "202" statusMessage = "Request under process, use getCreateUserStatus for tracking" log_event_result = "{'status': 'success', 'validation': 'success', 'userCreation': 'success'}" except DAG_ERRORS as cupverrd: log_event_result = "{'status': 'failed', 'validation': 'failed', 'userCreation': 'failed', 'errorDesc': '"+str(cupverrd).replace("'",'"')+"'}" statusMessage = str(cupverrd) apilogger.error(statusMessage) except VALIDATION_ERRORS as cupverr: log_event_result = "{'status': 'failed', 'validation': 'failed', 'userCreation': 'failed', 'errorDesc': '"+str(cupverr).replace("'",'"')+"'}" statusMessage = str(cupverr) apilogger.error(statusMessage) except Exception as cupexp: log_event_result = "{'status': 'exception', 'validation': 'exception', 'userCreation': 'exception', 'errorDesc': '"+str(cupexp).replace("'",'"')+"'}" statusCode = "500" statusMessage = "Error - {%s} . Line No - {%s} " % (str(cupexp),str(sys_exc_info()[-1].tb_lineno)) apilogger.error(statusMessage) apilogger.info("statusCode : %s" % statusCode) apilogger.info("statusMessage: %s" % statusMessage) # ##update##eventLog## update_event_log(apilogger=apilogger , pv_mode = "update" , pd_hosel_date = hosel_date , pd_hosel_event_date = hosel_event_date , pv_batchId = batchId , pv_event_status = log_event_result , pv_args_keys = args_keys) response.update({"statusDescription": statusMessage ,"id": batchId ,"href": "http://172.16.177.58:11395//multitenant/getCreateUserStatus?id="+batchId }) #if statusCode=="202": # response.update({"response": op_args}) return ((statusCode, response)) remember above pattern of code in the same way i want you to design me an api for the given requirements