Untitled
unknown
plain_text
2 years ago
29 kB
3
Indexable
from datetime import datetime as dt_datetime from datetime import timedelta as dt_timedelta from pandas import DataFrame as pd_DataFrame from pandas import read_sql as pd_read_sql from sqlalchemy import types as sqlalchemy_types from json import loads as json_loads import sqlalchemy from sys import exc_info as sys_exc_info import os def func_sqlcol(logger, __pv_df_param, __pdf_large_cols): __dtypedict = {} __cols_clob = [] __cols_blob = [] try: #logger.debug(("__df_large_cols: \n%s" % __pdf_large_cols)) if len(__pdf_large_cols) > 0: __cols_clob = __pdf_large_cols['column_name_clob'].values[0] __cols_clob = __cols_clob.split(',') if __cols_clob else [] __cols_blob = __pdf_large_cols['column_name_blob'].values[0] __cols_blob = __cols_blob.split(',') if __cols_blob else [] logger.info(("__cols_clob : type{%s} len{%s}" % (type(__cols_clob), len(__cols_clob)))) logger.info(("__cols_blob : type{%s} len{%s}" % (type(__cols_blob), len(__cols_blob)))) for i,j in zip(__pv_df_param.columns, __pv_df_param.dtypes): # table(s): additionaldet_xml_template.xml_template, tom_batch_order_template_ref.template if "object" in str(j) and i in __cols_blob: __dtypedict.update({i: sqlalchemy_types.BLOB(length=None)}) elif "object" in str(j) and i in __cols_clob: # table(s): tom_service_action_master.extended_rule, generic_template_ref.template_text __dtypedict.update({i: sqlalchemy_types.CLOB(length=None)}) elif "object" in str(j): __dtypedict.update({i: sqlalchemy_types.NVARCHAR(length=255)}) if "datetime" in str(j): __dtypedict.update({i: sqlalchemy_types.DateTime()}) if "float" in str(j): __dtypedict.update({i: sqlalchemy_types.Float(precision=3, asdecimal=True)}) if "int" in str(j): __dtypedict.update({i: sqlalchemy_types.INT()}) except Exception as __exp: logger.error(("func_sqlcol exception. Error - {%s} . Line No - {%s} " % (str(__exp),str(sys_exc_info()[-1].tb_lineno)))) return __dtypedict # endDef==>func_sqlcol() def processDataFrame(args, configProperties, logger, db_engine, pv_schema, pv_query, pv_table=None, pk_column=None, pv_sequence=None, pdf_large_cols=None): __df_data=pd_DataFrame() __connection_text='' __pv_query='' __logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH') try: dbType = args['dbType'] __df_data = pd_read_sql(pv_query, con=db_engine) __df_data.columns =__df_data.columns.str.lower() if pv_table in ('sso_user','sso_user_details'): __df_data = __df_data.head(1) logger.info(("Count: %s " % len(__df_data))) if len(__df_data) > 0: if 'start_date' in __df_data.columns: __df_data['start_date'] = dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S") __df_data['start_date'] = __df_data['start_date'].astype('datetime64[ns]') if 'end_date' in __df_data.columns: __df_data['end_date'] = (dt_datetime.now()+dt_timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S") __df_data['end_date'] = __df_data['end_date'].astype('datetime64[ns]') # TIB_SSO.SSO_OPERATOR_REF if 'operator_id' in __df_data.columns and pv_table=='sso_operator_ref': __df_data['operator_id']=args['userName'] elif 'operator_id' in __df_data.columns: __df_data['operator_id']=args['opIdNew'] if 'op_id' in __df_data.columns: __df_data['op_id']=args['opIdNew'] if 'op_desc' in __df_data.columns: __df_data['op_desc']=args['opIdNew'] if 'bu_id' in __df_data.columns: __df_data['bu_id']=args['buIdNew'] if 'updated_by' in __df_data.columns: __df_data['updated_by']='newTenantInitialSetup' if 'created_by' in __df_data.columns: __df_data['created_by']='newTenantInitialSetup' if 'created_date' in __df_data.columns: __df_data['created_date']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S") __df_data['created_date'] = __df_data['created_date'].astype('datetime64[ns]') if 'created_on' in __df_data.columns: __df_data['created_on']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S") __df_data['created_on'] = __df_data['created_on'].astype('datetime64[ns]') if 'created_dt' in __df_data.columns: __df_data['created_dt']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S") __df_data['created_dt'] = __df_data['created_dt'].astype('datetime64[ns]') if 'custom_created_date' in __df_data.columns: __df_data['custom_created_date']=str(dt_datetime.now().strftime("%d-%b-%Y %H:%M:%S")) if 'custom_updated_date' in __df_data.columns: __df_data['custom_created_date']=str(dt_datetime.now().strftime("%d-%b-%Y %H:%M:%S")) if 'updated_date' in __df_data.columns: __df_data['updated_date']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S") __df_data['updated_date'] = __df_data['updated_date'].astype('datetime64[ns]') if 'updated_on' in __df_data.columns: __df_data['updated_on']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S") __df_data['updated_on'] = __df_data['updated_on'].astype('datetime64[ns]') if 'updated_dt' in __df_data.columns: __df_data['updated_dt']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S") __df_data['updated_dt'] = __df_data['updated_dt'].astype('datetime64[ns]') # TIB_SSO.sso_user if pv_table == 'sso_user': __df_data['login']=args['userName'] __df_data['password']='$2a$10$QKzQqwLagJKBkdcITeKsReoEL8Jd2JfMpHFF2695z.LZNiSpsSNXO' __df_data['first_name']=args['userName'] __df_data['last_name']=args['userName'] __df_data['email']='hobs-support@tcs.com' __df_data['status']='A' __df_data['country']=configProperties.get('environment','country',raw=True) __df_data['language_cd']=configProperties.get('environment','language_cd',raw=True) __df_data['last_pswd_change'] = (dt_datetime.now()+dt_timedelta(days=30)).strftime("%Y-%m-%d %H:%M:%S") __df_data['last_pswd_change'] = __df_data['last_pswd_change'].astype('datetime64[ns]') __df_data['currency_cd']=configProperties.get('environment','currency',raw=True) __df_data['failed_logins']=0 __df_data['acct_lock']='N' # TIB_SSO.sso_user_details/sso_user_role if 'login' in __df_data.columns and pv_table in ('sso_user_details', 'sso_user_role'): __df_data['login']=args['userName'] # TIB_SSO.SSO_OU_REF if pv_table == 'sso_ou_ref': __df_data['ou_id']=args['buIdNew'] __df_data['ou_name']=args['buIdNew'] if dbType=='ORACLE': if pv_table == 'sso_circle_ref' and pv_sequence == 'sso_hibernate_sequence': pv_query = ("SELECT MAX(circle_nbr)+10 AS %s FROM %s.%s" % (pk_column ,pv_schema ,pv_table)) pv_query = db_engine.execute(pv_query) pv_query = pv_query.fetchone() __start_value = pv_query[pk_column] #__df_data.apply(lambda col: col.drop_duplicates().reset_index(drop=True)) __df_data[pk_column] = range(__start_value, __start_value+len(__df_data)) pv_query = ("UPDATE %s.%s SET sequence_next_hi_value=%s WHERE sequence_name='CIRCLE_NBR_SEQ'" % (pv_schema , pv_sequence , str(int(__start_value) + len(__df_data) + 100) )) elif pv_sequence and len(__df_data) > 0: __df_temp = __df_data.copy() for index, row in __df_temp.iterrows(): pv_query = ("SELECT %s.%s.NEXTVAL AS %s FROM DUAL" % (pv_schema , pv_sequence , pk_column)) pv_query = db_engine.execute(pv_query) pv_query = pv_query.fetchone() __df_data.set_value(index, pk_column, pv_query[pk_column]) logger.info(("Oracle pv_sequence reset completed")) elif dbType in ('MYSQL', 'MARIA'): if pv_table == 'sso_circle_ref' and pv_sequence == 'sso_hibernate_sequence': pv_query = ("SELECT MAX(circle_nbr)+10 AS %s FROM %s.%s" % (pk_column ,pv_schema ,pv_table)) pv_query = db_engine.execute(pv_query) pv_query = pv_query.fetchone() __start_value = pv_query[pk_column] #__df_data.apply(lambda col: col.drop_duplicates().reset_index(drop=True)) __df_data[pk_column] = range(__start_value, __start_value+len(__df_data)) pv_query = ("UPDATE %s.%s SET sequence_next_hi_value=%s WHERE sequence_name='CIRCLE_NBR_SEQ'" % (pv_schema ,pv_sequence ,str(int(__start_value) + len(__df_data) + 100) )) elif pv_sequence and len(__df_data) > 0: __query = ("SELECT id AS %s FROM %s.%s" % (pk_column ,pv_schema ,pv_sequence)) __query = db_engine.execute(__query) __query = __query.fetchone() __max_seq = __query[pk_column]+1 if __query[pk_column] else 0 # Fix- Table Max __query = ("SELECT MAX(%s) AS %s FROM %s.%s" % (pk_column, pk_column ,pv_schema ,pv_table)) __query = db_engine.execute(__query) __query = __query.fetchone() __max_tbl = __query[pk_column]+1 if __query[pk_column] else 0 # Either of Max (Table/Sequence) __start_value = max(__max_tbl, __max_seq) logger.info(("MAX Sequence: %s Table: %s" % (__max_seq, __max_tbl))) __df_data[pk_column] = range(__start_value, __start_value+len(__df_data)) pv_query = ("UPDATE %s.%s SET id=%s" % (pv_schema ,pv_sequence ,str(int(__start_value) + len(__df_data) + 100) )) db_engine.execute(pv_query) logger.info(("Maria/Mysql pv_sequence reset completed")) db_engine.execute("SET FOREIGN_KEY_CHECKS=0") db_engine = db_engine.execution_options(autocommit=False) __outputdict = func_sqlcol(logger, __df_data, pdf_large_cols) __df_data.to_sql(name= pv_table, con=db_engine, if_exists='append', index=False, dtype=__outputdict) logger.info("Database Insert got completed successfully") # endIf==>len(__df_data) except Exception as __exp: if 'unique constraint' in str(__exp): __df_data.to_csv(sep='|', header=True, index=False, path_or_buf=__logpath+'/logs/'+pv_table + '.csv') return ('9999','uniqueConstraint') elif 'invalid identifier' in str(__exp): logger.error("invalid identifier|"+str(__exp)) return ('9999','columnNameMisMatch') else: logger.error(("Exception: %s" % str(__exp))) return ('9999','pyJobException') return ('0000','Success') def getData(schema, args, db_engine, configProperties, logger): __query = '' __sequence = '' __df_data = pd_DataFrame() __table_failed = {} __table_success = {} tblName = '' __pk_column = '' __large_cols = '' args = dict(args) __logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH') try: tblName = 'INIT' db_connection = db_engine.connect() logger.info(("db_engine: %s" % db_engine)) logger.info(("ArgsType: %s" % type(args))) logger.info(("Args: %s" % args)) logger.info(("Query: %s" % args['opId']+'|'+args['buId'])) try: __query = ("SELECT COUNT(*) as \"user_count\" FROM %s.%s WHERE bu_id='%s' AND op_id='%s' AND login='%s'" %(schema, 'sso_user', args['buIdNew'], args['opIdNew'], args['userName'])) __query = db_connection.execute(__query) __query = __query.fetchone() __query = __query['user_count'] logger.info(("user Existence : %s" % __query)) if __query > 0: return ('0002' ,(('User {%s} already exists. Try with new userName' % args['userName'])) ,list(__table_success.keys()) ,list(__table_failed.keys()) ) __query = ("SELECT COUNT(*) as \"bu_ref_count\" FROM %s.%s WHERE bu_id='%s' AND op_id='%s'" %(schema, 'sso_bu_ref', args['buIdNew'], args['opIdNew'])) __query = db_connection.execute(__query) __query = __query.fetchone() __query = __query['bu_ref_count'] logger.info(("buId Existence : %s" % __query)) if __query > 0: return ('0002' ,(('buId {%s} already exists. Try with new buId' % args['buIdNew'])) ,list(__table_success.keys()) ,list(__table_failed.keys()) ) __query = ("SELECT COUNT(*) as \"op_ref_count\" FROM %s.%s WHERE bu_id='%s' AND op_id='%s'" %(schema, 'sso_operator_ref', args['buIdNew'], args['opIdNew'])) __query = db_connection.execute(__query) __query = __query.fetchone() __query = __query['op_ref_count'] logger.info(("sso_operator Existence : %s" % __query)) if __query > 0: return ('0002' ,(('sso_operator {%s} already exists. Try with new sso_operator' % args['opIdNew'])) ,list(__table_success.keys()) ,list(__table_failed.keys()) ) if configProperties.has_option('environment', 'tenant_tables'): tenant_tables = json_loads(configProperties.get('environment','tenant_tables',raw=True)) if 'oracle' in str(db_engine) and configProperties.has_option('database_queries', 'cols_ora'): __large_cols = ((configProperties.get('database_queries', 'cols_ora',raw=True) % (schema, str(tenant_tables).replace('[','').replace(']','').upper()))) elif 'mysql' in str(db_engine) and configProperties.has_option('database_queries', 'cols_mysql'): __large_cols = ((configProperties.get('database_queries', 'cols_mysql',raw=True) % (schema, str(tenant_tables).replace('[','').replace(']','').upper()))) else: raise Exception('Undefined database type or large objects query not defined') __df_large_cols = pd_read_sql(__large_cols ,con=db_engine) __df_large_cols.columns =__df_large_cols.columns.str.lower() for tblName in tenant_tables: logger.info(("--------------------+ tableName: %s.%s " % (schema, tblName))) __query = '' __df = pd_DataFrame() __df = __df_large_cols[['column_name_clob','column_name_blob']].loc[(__df_large_cols['table_name']==tblName)].reset_index(drop=True) if tblName == 'sso_bu_ref': __query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId']))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='bu_nbr', pv_sequence='sso_bu_ref_seq', pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) elif tblName == 'sso_operator_ref': __query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId']))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column=None, pv_sequence=None, pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) elif tblName == 'sso_user': #__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' AND login = '152169' " %(schema, tblName, args['buId'], args['opId']))) __query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId']))) #__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' AND login = '%s' " % (schema # ,tblName # ,args['buId'] # ,args['opId'] # ,configProperties.get('environment', 'clone_user',raw=True)))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column=None, pv_sequence=None, pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) elif tblName == 'sso_user_details': #__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND operator_id='%s' AND login = '152169' " %(schema, tblName, args['buId'], args['opId']))) __query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND operator_id='%s' " %(schema, tblName, args['buId'], args['opId']))) #__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND operator_id='%s' AND login = '%s' " % (schema # ,tblName # ,args['buId'] # ,args['opId'] # ,configProperties.get('environment', 'clone_user',raw=True)))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column=None, pv_sequence=None, pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) elif tblName == 'sso_user_role' or tblName == 'mk_sso_user_role': # __query = (("SELECT NULL AS id, '%s' AS login, role_name FROM %s.%s WHERE LOGIN = '152169' AND role_name in ('AmsUser','SupportGroup','Non-BillingOps', 'PPM_TRANS_MGR','BillingOps','crms.salesuser','crms.user','crms.superuser','crms.masteruser','PPM_TRANS_MGR','WLIST_GENERIC','HOBS_USER','WLIST_ADMIN_USER','WLIST_CACHE_USER','WLIST_USER') " # %(args['userName'] # ,schema # ,tblName # ))) #__query = (("SELECT NULL AS id, '%s' AS login, role_name FROM %s.%s WHERE role_name in ('AmsUser','SupportGroup','Non-BillingOps', 'PPM_TRANS_MGR','BillingOps','crms.salesuser','crms.user','crms.superuser','crms.masteruser','PPM_TRANS_MGR','WLIST_GENERIC','HOBS_USER','WLIST_ADMIN_USER','WLIST_CACHE_USER','WLIST_USER') " # %(args['userName'] # ,schema # ,tblName # ))) __query = (("SELECT NULL AS id, '%s' AS login, role_name FROM %s.%s WHERE LOGIN = '%s' AND role_name in ('AmsUser','SupportGroup','Non-BillingOps', 'PPM_TRANS_MGR','BillingOps','crms.salesuser','crms.user','crms.superuser','crms.masteruser','PPM_TRANS_MGR','WLIST_GENERIC','HOBS_USER','WLIST_ADMIN_USER','WLIST_CACHE_USER','WLIST_USER') " %(args['userName'] ,schema ,tblName ,configProperties.get('environment', 'clone_user',raw=True) ))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='id', pv_sequence='sso_user_role_seq', pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) elif tblName == 'sso_attr_ref': __query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId']))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='attr_seq_no', pv_sequence='sso_attr_ref_seq', pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) elif tblName == 'sso_hob_config': __query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId']))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='hob_config_nbr', pv_sequence='sso_hob_config_seq', pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) elif tblName == 'sso_ou_ref': __query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId']))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column=None, pv_sequence=None, pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) elif tblName == 'sso_circle_ref': __query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND operator_id='%s' " %(schema, tblName, args['buId'], args['opId']))) __execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='circle_nbr', pv_sequence='sso_hibernate_sequence', pdf_large_cols=__df) if __execStatus[0] == '0000': __table_success.__setitem__(tblName,"Success") else: __table_failed.__setitem__(tblName, __execStatus[1]) else: raise Exception("Undefined table in list") # endFor==>___iterate_tenant_tables___ else: raise Exception("Table list not defined") # endIf==>___configProperties_tenant_tables___ except Exception as __exp: if 'unique constraint' in str(__exp): __table_failed.__setitem__(tblName,"uniqueConstraint") __df_data.to_csv(sep='|', header=True, index=False, path_or_buf=__logpath+'/logs/'+tblName + '.csv') elif 'invalid identifier' in str(__exp): __table_failed.__setitem__(tblName, "columnNameMisMatch") else: __table_failed.__setitem__(tblName, "pyJobException") logger.error(("Error - {%s} . Line No - {%s} " % (str(__exp),str(sys_exc_info()[-1].tb_lineno)))) # endTry==>___processing___ logger.info(("--------------------+ stats")) if len(__table_success) == 0 or len(__table_failed) > 0: logger.error(("__table_failed: %s" % __table_failed)) return ('9999' ,str(__table_failed).replace('\'\'','"') ,list(__table_success.keys()) ,list(__table_failed.keys()) ) except Exception as __exp: logger.error(("Error - {%s} . Line No - {%s} " % (str(__exp),str(sys_exc_info()[-1].tb_lineno)))) return ('9999' ,schema+' ' + str(__exp) ,list(__table_success.keys()) ,list(__table_failed.keys()) ) return ('0000' ,'Success' ,list(__table_success.keys()) ,list(__table_failed.keys()) ) if __name__ == '__main__': getData(schema, args, db_engine, configProperties, logger)
Editor is loading...