Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
26 kB
1
Indexable
Never
# ================================================================================================================================================================================
# HOBS MULTI-TENANCY FUNCTIONALITY REQUEST APIS
#   DATE     AUTHOR     VER   CHANGE DESCRIPTION
# --------  ---------  -----  ------------------
# 30.03.23  Lakkshman  1.0    Init version
# ================================================================================================================================================================================
from datetime import datetime, timezone
from sys import exc_info as sys_exc_info
from json import loads as json_loads
from json import dumps as json_dumps
from pandas import read_sql as pd_read_sql
import uuid
from sqlalchemy import create_engine as sqlalchemy_create_engine

from models.hos_event_logs import hos_event_logs
from models.hos_event_logs import *
from models.hos_event_log_by_businesskeys import hos_event_log_by_businesskeys


# define Python user-defined exceptions
class Error(Exception):
    """Base class for other exceptions"""
    pass

# define Python user-defined exceptions
class VALIDATION_ERRORS(Error):
    pass

# define Python user-defined exceptions
class DAG_ERRORS(Error):
    pass


def apilogfile(api):
    import logging
    from os import environ as os_environ
    log_file_path = os_environ.get('ANALYTICSDATASERVICELOGPATH')
    logging.shutdown()
    log_handler = logging.FileHandler(log_file_path+'/logs/'+api + '.log',mode='w+')
    formatter = logging.Formatter(fmt='%(asctime)s [Process:%(process)d] [%(levelname)-8s] [%(filename)s - %(module)s] : %(message)s',datefmt= '%Y-%m-%d %H:%M:%S')
    #formatter.converter = time.gmtime  # if you want UTC tGime
    log_handler.setFormatter(formatter)
    LOGGER = logging.getLogger('apilog'+str(uuid.uuid1()))
    if not LOGGER.handlers:
        LOGGER.addHandler(log_handler)
        LOGGER.setLevel(logging.DEBUG)
    LOGGER.info('LOGGER activated for %s' % api)
    return LOGGER



def update_event_log(apilogger, pv_mode, pd_hosel_date, pd_hosel_event_date, pv_batchId, pv_event_status, pv_args_keys, pv_enriched_data = None):
    try:
        # QUERIES: EventLogInsert
        if pv_mode == "init":
            query = "global queryString;queryString = hos_event_logs.objects(" \
                                        "hosel_date='" + pd_hosel_date + "',"\
                                        "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
                                        "hosel_event_seq='" + pv_batchId + "')" \
                                        ".update(hosel_event_type='createUser'" \
                                        ", hosel_event_status = #replaceEventStatus#" \
                                        ", hosel_triggered_by = 'multiTenantJob'" \
                                        ", hosel_event_data = '" + json_dumps(pv_args_keys) + "'" \
                                        + ")"
        
            queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \
                                                    "hosel_event_value='createUser'," \
                                                    "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
                                                    "hosel_event_seq='" + pv_batchId + "')" \
                                                    ".update(" \
                                                    "hosel_date='" + pd_hosel_date +"',"\
                                                    "hosel_event_data='" + json_dumps(pv_args_keys) +"'," \
                                                    "hosel_event_type='createUser'," \
                                                    "hosel_event_status = #replaceEventStatus#," \
                                                    "hosel_triggered_by = 'multiTenantJob')"
        elif pv_mode == "interim":
            # QUERIES: EventLogInsert - opID/buID replace
            query = "global queryString;queryString = hos_event_logs.objects(" \
                                            "hosel_date='" + pd_hosel_date + "',"\
                                            "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
                                            "hosel_event_seq='" + pv_batchId + "')" \
                                            ".update(" \
                                            " hosel_bu_id = '" + pv_args_keys["nameType"] + "'" \
                                            ",hosel_op_id = '" + pv_args_keys["name"] + "'" \
                                            ")"
            
            queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \
                                            "hosel_event_value='createUser'," \
                                            "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
                                            "hosel_event_seq='" + pv_batchId + "')" \
                                            ".update(" \
                                            " hosel_bu_id = '" + pv_args_keys["nameType"] + "'" \
                                            ",hosel_op_id = '" + pv_args_keys["name"] + "'" \
                                            ")"
        elif pv_mode == "update":
            query = "global queryString;queryString = hos_event_logs.objects(" \
                                        "hosel_date='" + pd_hosel_date + "',"\
                                        "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
                                        "hosel_event_seq='" + pv_batchId + "')" \
                                        ".update(hosel_event_status = #replaceEventStatus# )"
        
            queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \
                                        "hosel_event_value='createUser'," \
                                        "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
                                        "hosel_event_seq='" + pv_batchId + "')" \
                                        ".update(hosel_event_status = #replaceEventStatus# )"
        elif pv_mode == "updateEnrichedData":
            tt = {}
            for k,v in pv_enriched_data.items():
                if not isinstance(v,str):
                    tt.update({k: str(v)})
                else:
                    tt.update({k: v})
            
            apilogger.debug("%s|tt   : %s" % (pv_mode.upper(), tt))
            query = "global queryString;queryString = hos_event_logs.objects(" \
                                        "hosel_date='" + pd_hosel_date + "',"\
                                        "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
                                        "hosel_event_seq='" + pv_batchId + "')" \
                                        ".update(hosel_enriched_data = " + str(tt) +")"
        
            queryBS = "global queryString;queryString = hos_event_log_by_businesskeys.objects(" \
                                        "hosel_event_value='createUser'," \
                                        "hosel_event_date=datetime.strptime('" + pd_hosel_event_date + "','%Y-%m-%dT%H:%M:%S.%f%z')," \
                                        "hosel_event_seq='" + pv_batchId + "')" \
                                        ".update(hosel_enriched_data = " + str(tt) +")"
        # endIf
        
        query = query.replace("#replaceEventStatus#", pv_event_status)
        queryBS = queryBS.replace("#replaceEventStatus#", pv_event_status)
        apilogger.debug("%s|query   : %s" % (pv_mode.upper(), query))
        apilogger.debug("%s|queryBS : %s" % (pv_mode.upper(), queryBS))
        exec(query)
        exec(queryBS)
    except Exception as cupexp:
        statusMessage = "Error - {%s} . Line No - {%s} " % (str(cupexp),str(sys_exc_info()[-1].tb_lineno))
        apilogger.error(statusMessage)
        raise Exception(statusMessage)
    return



def postData(api, payLoad, configProperties):
    import requests
    import re
    statusCode = "403"
    statusMessage = ""
    response = {}
    batchId = None
    update_query = None
    update_queryBS = None
    log_event_result = {'status': 'failed', 'validation': 'notInitialized'}
    
    try:
        # Received parameters
        args_keys = payLoad.copy()
        convert_to_UTC = True
        
        # Initialize logger
        apilogger=apilogfile(api)
        apilogger.debug("args_keys ####   : %s" % type(json_dumps(args_keys)))
        apilogger.debug("args_keys/payLoad: %s" % json_dumps(args_keys))
        
        # Initialization
        if configProperties.has_section('config') \
            and configProperties.has_option('config','convert_to_UTC') \
            and configProperties.get('config','convert_to_UTC',raw=True) \
            and configProperties.get('config','convert_to_UTC',raw=True).capitalize() in ('True','False'):
            convert_to_UTC = bool(configProperties.has_option('config', 'convert_to_UTC'))
        
        #withoutTimeZoneForUTC#date_info1 = datetime.utcnow().isoformat() if convert_to_UTC else datetime.now().astimezone().isoformat()
        date_info = datetime.now(tz=timezone.utc).isoformat() if convert_to_UTC else datetime.now().astimezone().isoformat()
        date_info = '2023-09-01T18:33:00.962+05:30'
        hosel_event_date = (date_info[:date_info.find(".")+4] + (date_info[date_info.find("+"):] if date_info.find("+")>0 else ""))
        hosel_date = date_info[:10]
        apilogger.debug("convert_to_UTC  : %s" % convert_to_UTC)
        apilogger.debug("hosel_event_date: %s" % hosel_event_date)
        apilogger.debug("hosel_date      : %s" % hosel_date)
        
        
        # batchId
        if 'id' not in args_keys:
            batchId = str(uuid.uuid1())
        else:
            batchId = args_keys["id"]
        
        # --------------------------------------------------------------------------------------------------------------------------------------
        # hos_event_log - initialized
        log_event_result = "{'status': 'initialized', 'validation': 'initialized'}"
        update_event_log(apilogger=apilogger
                        , pv_mode = "init"
                        , pd_hosel_date = hosel_date
                        , pd_hosel_event_date = hosel_event_date
                        , pv_batchId = batchId
                        , pv_event_status = log_event_result
                        , pv_args_keys = args_keys)
        
        # Validation: Configuration
        if not configProperties.has_section('apiconfiguration'):
            raise VALIDATION_ERRORS("Configuration for validation not properly set.")
        
        # Validation: job_details
        if not configProperties.has_section('job_details'):
            raise VALIDATION_ERRORS("DAG execution details not provided.")
        
        # Validation: airflow DAG details
        if not configProperties.has_section('job_details'):
            raise VALIDATION_ERRORS("DAG execution details not provided.")
        airflowURL = configProperties.get('job_details', 'airflowURL', raw=True) if configProperties.has_option('job_details', 'airflowURL') else None
        certificate_path = configProperties.get('job_details', 'certificate_path', raw=True) if configProperties.has_option('job_details', 'certificate_path') else None
        
        if not airflowURL:
            raise VALIDATION_ERRORS("Airflow URL not set for execution.")
        
        if "https://" in airflowURL and not certificate_path:
            raise VALIDATION_ERRORS("Certificate path not specified.")
        
        #airflowURL = "https://tcs-hob-sir-env01:11400/datapipeline/api/v1/dags/createNewUser/dagRuns"
        apilogger.debug("Airflow end point: %s" % airflowURL)
        apilogger.debug("certificate_path : %s" % certificate_path)
        # Configuration: Read the validation
        validkeys = json_loads(configProperties.get('apiconfiguration', 'validkeys', raw=True)) if configProperties.has_option('apiconfiguration', 'validkeys') else []
        relatedPartyValidKeys = json_loads(configProperties.get('apiconfiguration', 'relatedPartyValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'relatedPartyValidKeys') else []
        contactMediumValidKeys = json_loads(configProperties.get('apiconfiguration', 'contactMediumValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'contactMediumValidKeys') else []
        contactPreferredValidKeys = json_loads(configProperties.get('apiconfiguration', 'contactPreferredValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'contactPreferredValidKeys') else []
        postalAddressValidKeys = json_loads(configProperties.get('apiconfiguration', 'postalAddressValidKeys', raw=True)) if configProperties.has_option('apiconfiguration', 'postalAddressValidKeys') else []
        
        # VALIDATION: parent keys
        apilogger.info("args_keys: %s" % type(args_keys))
        apilogger.info(args_keys)
        apilogger.info("validkeys: %s" % type(validkeys))
        apilogger.info(validkeys)
        
        if args_keys == None or not isinstance(args_keys, dict) or len(args_keys)==0:
            statusCode = "422"
            return VALIDATION_ERRORS("Missing required attribute in payLoad. " + ','.join(validkeys) + ".")
        
        if (set(list(args_keys.keys()))-set(["id"])) != set(validkeys):
            #raise VALIDATION_ERRORS("Validation failed on parent keys. '%s'" % ("','".join([v for v in list(args_keys.keys()) if v not in validkeys]+list(set(validkeys)-set(list(args_keys.keys()))))))
            statusCode = "422"
            t1 = list(set(validkeys)-(set(list(args_keys.keys()))-set(["id"])))
            apilogger.debug("t1: %s" % t1)
            if len(t1)>0:
                raise VALIDATION_ERRORS("Validation failed on parent keys. '%s'" % ("','".join(t1)))
            t2 = list(set(list(args_keys.keys()))-set(["id"])-set(validkeys))
            apilogger.debug("t2: %s" % t2)
            if len(t2)>0:
                raise VALIDATION_ERRORS("Unprocessable parent keys found. '%s'" % ("','".join(t2)))
        
        # hos_event_log - interim (opId, buId)
        update_event_log(apilogger=apilogger
                        , pv_mode = "interim"
                        , pd_hosel_date = hosel_date
                        , pd_hosel_event_date = hosel_event_date
                        , pv_batchId = batchId
                        , pv_event_status = log_event_result
                        , pv_args_keys = args_keys)
        
        
        # VALIDATION: relatedParty
        if 'relatedParty' not in list(args_keys.keys()) \
            or (not isinstance(args_keys['relatedParty'],list)) \
            or (len(args_keys['relatedParty'])==0 or  len(args_keys['relatedParty'])> 1):
            statusCode = "422"
            raise VALIDATION_ERRORS("Validation failed on relatedParty. Invalid construct.")
        
        if set(list(args_keys['relatedParty'][0].keys())) != set(relatedPartyValidKeys):
            statusCode = "422"
            raise VALIDATION_ERRORS("Validation failed on relatedParty: {'%s'}" % ("','".join(list(set(relatedPartyValidKeys)-set(list(args_keys['relatedParty'][0].keys()))))))
        
        
        # VALIDATION: contactMedium
        if 'contactMedium' not in list(args_keys.keys()) \
            or (not isinstance(args_keys['contactMedium'],list)) \
            or (len(args_keys['contactMedium'])==0 or  len(args_keys['contactMedium'])!= 3):
            statusCode = "422"
            raise VALIDATION_ERRORS("Validation failed on contactMedium. Invalid construct.")
        
        # VALIDATION: contactMedium - preferred contact either one
        if len([v['preferred'] for v in args_keys['contactMedium'] if 'preferred' in v and v['preferred'] == 'true'])==0:
            statusCode = "422"
            raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one preferred contact to be set true.")
        
        # VALIDATION: contactMedium - core attributes
        if len(['False' for v in args_keys['contactMedium'] if set(list(v.keys())) != set(contactMediumValidKeys)])>0:
            statusCode = "422"
            raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one contact missing core attribute. {'%s'}" % "','".join(contactMediumValidKeys))
        
        # VALIDATION: contactMedium - attributes postal, contact, email
        if len(['True' for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] in contactPreferredValidKeys])!=3:
            statusCode = "422"
            raise VALIDATION_ERRORS("Validation failed on contactMedium. Either of one contact missing core attribute. {'%s'}" % "','".join(contactPreferredValidKeys))
        
        
        
        # VALIDATION: contactMedium - email
        # Make a regular expression for validating an Email
        regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
        emailID = "".join([v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "email"])
        if not re.fullmatch(regex, emailID):
            statusCode = "422"
            raise VALIDATION_ERRORS("Invalid email id")
        
        
        
        # VALIDATION: contactMedium - address
        if len(['False' for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "postalAddress" and set(list(v['characteristic'][0].keys()))!=set(postalAddressValidKeys)])!=0:
            statusCode = "422"
            raise VALIDATION_ERRORS("Validation failed on postalAddress. Missing core attribute. {'%s'}" % "','".join(postalAddressValidKeys))
        
        
        # DAG: Json construction for airflow
        op_args = {"opID" : args_keys["name"]
                   ,"buID" : args_keys["nameType"]
                   ,"eventID" : batchId
                  }
        
        # DAG: Json data - include relatedParty
        op_args.update(args_keys['relatedParty'][0])
        
        # DAG: Json data - include email, contact
        contact = "".join([v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == "contact"])
        op_args.update({v['mediumType']:v['characteristic'] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] in ("contact", "email")})
        
        # DAG: Json data - include postalAddress
        op_args.update({v['mediumType']:v['characteristic'][0] for v in args_keys['contactMedium'] if 'mediumType' in v and v['mediumType'] == ("postalAddress")})
        
        # Validate User
        user = "tib_sso"
        password = "tib_sso"
        host = "10.16.16.128"
        port = "3308"
        schema = "tib_sso"
        connection_text = ('mysql+mysqlconnector://%s:%s@%s:%s/%s' % (user
                                                                     , password
                                                                     , host
                                                                     , port
                                                                     , schema))
        cnx = sqlalchemy_create_engine(connection_text
                                       , encoding="utf-8"
                                       , connect_args={'connect_timeout': 600})
        cursor = cnx.connect()
        
        query_sso = f"SELECT * FROM {schema}.sso_user WHERE lower(login)='{emailID}' OR lower(email)='{emailID}' OR primary_contact_number='{contact}'"
        apilogger.debug("query(sso): %s" % query_sso)
        df_sso = pd_read_sql(query_sso, con=cnx)
        apilogger.debug("df_sso: %s" % len(df_sso))
        if len(df_sso)>1:
            statusCode = "409"
            raise VALIDATION_ERRORS("Login/email id/contact requested already exists in the system.")
        
        # hos_event_log - validationCompleted and userCreation process started
        log_event_result = "{'validation': 'success', 'userCreation': 'DAGinitialized'}"
        update_event_log(apilogger=apilogger
                        , pv_mode = "updateEnrichedData"
                        , pd_hosel_date = hosel_date
                        , pd_hosel_event_date = hosel_event_date
                        , pv_batchId = batchId
                        , pv_event_status = log_event_result
                        , pv_args_keys = args_keys
                        , pv_enriched_data = op_args)
        
        
        apilogger.debug("INTRIM|op_args: %s" % op_args)
        dag_data = {"dag_run_id": batchId}
        dag_data.update({"conf": op_args.copy()})
        dag_data = json_dumps(dag_data.copy())
        apilogger.debug("FINAL|dag_data: %s" % dag_data)
        airflowURL_new = airflowURL+"createNewUser/dagRuns"
        apilogger.debug("airflowURL    : %s" % airflowURL_new)
        
        from requests.auth import HTTPBasicAuth
        #dag_response = requests.post(airflowURL,headers = {'Content-Type': 'application/json'}, auth=HTTPBasicAuth('hobsadmin', 'hobsadmin') ,data = dag_data,verify=False)
        dag_response = requests.post(airflowURL_new
                                     , headers = {"Content-Type": "application/json", "Authorization": "Basic aG9ic2FkbWluOmhvYnNhZG1pbg=="}
                                     , data = dag_data
                                     , verify=certificate_path)
        
        apilogger.debug("dag_response.status_code: %s" % dag_response.status_code)
        apilogger.debug("dag_response.status_code: %s" % type(dag_response.status_code))
        if str(dag_response.status_code) == "409":
            apilogger.debug("DAG already exists, retrying the DAG")
            airflowURL_new = airflowURL+"createNewUser/dagRuns/"+batchId+"/clear"
            dl = ['true', 'false']
            payload = {'dry_run': d1[0]}
            apilogger.debug("airflowURL    : %s" % airflowURL_new)
            apilogger.debug("payload       : %s" % payload)
            dag_response = requests.post(airflowURL_new
                                         , headers = {"Content-Type": "application/json", "Authorization": "Basic aG9ic2FkbWluOmhvYnNhZG1pbg=="}
                                         , json = payload
                                         , verify=certificate_path)
        
        apilogger.debug(str(type(dag_response.text)))
        dag_response= json_loads(dag_response.text)
        apilogger.debug(str(dag_response))
        if 'id' in list(dag_response.keys()):
            apilogger.debug(str(type(dag_response)))
            apilogger.debug(dag_response)
            statusMessage = str(dag_response)
        else:
            statusCode = "500"
            raise DAG_ERRORS("DAG Invoke failed")
        
        
        # Respone
        statusCode = "202"
        statusMessage = "Request under process, use getCreateUserStatus for tracking"
        log_event_result = "{'status': 'success', 'validation': 'success', 'userCreation': 'success'}"
    except DAG_ERRORS as cupverrd:
        log_event_result = "{'status': 'failed', 'validation': 'failed', 'userCreation': 'failed', 'errorDesc': '"+str(cupverrd).replace("'",'"')+"'}"
        statusMessage = str(cupverrd)
        apilogger.error(statusMessage)
    except VALIDATION_ERRORS as cupverr:
        log_event_result = "{'status': 'failed', 'validation': 'failed', 'userCreation': 'failed', 'errorDesc': '"+str(cupverr).replace("'",'"')+"'}"
        statusMessage = str(cupverr)
        apilogger.error(statusMessage)
    except Exception as cupexp:
        log_event_result = "{'status': 'exception', 'validation': 'exception', 'userCreation': 'exception', 'errorDesc': '"+str(cupexp).replace("'",'"')+"'}"
        statusCode = "500"
        statusMessage = "Error - {%s} . Line No - {%s} " % (str(cupexp),str(sys_exc_info()[-1].tb_lineno))
        apilogger.error(statusMessage)
    
    apilogger.info("statusCode   : %s" % statusCode)
    apilogger.info("statusMessage: %s" % statusMessage)
    
    # ##update##eventLog##
    update_event_log(apilogger=apilogger
                    , pv_mode = "update"
                    , pd_hosel_date = hosel_date
                    , pd_hosel_event_date = hosel_event_date
                    , pv_batchId = batchId
                    , pv_event_status = log_event_result
                    , pv_args_keys = args_keys)
    
    response.update({"statusDescription": statusMessage
                    ,"id": batchId
                    ,"href": "http://172.16.177.58:11395//multitenant/getCreateUserStatus?id="+batchId
                    })
    #if statusCode=="202":
    #    response.update({"response": op_args})
    
    return ((statusCode, response))

remember above pattern of code in the same way i want you to design me an api for the given requirements