Untitled
unknown
plain_text
2 years ago
29 kB
4
Indexable
from datetime import datetime as dt_datetime
from datetime import timedelta as dt_timedelta
from pandas import DataFrame as pd_DataFrame
from pandas import read_sql as pd_read_sql
from sqlalchemy import types as sqlalchemy_types
from json import loads as json_loads
import sqlalchemy
from sys import exc_info as sys_exc_info
import os
def func_sqlcol(logger, __pv_df_param, __pdf_large_cols):
__dtypedict = {}
__cols_clob = []
__cols_blob = []
try:
#logger.debug(("__df_large_cols: \n%s" % __pdf_large_cols))
if len(__pdf_large_cols) > 0:
__cols_clob = __pdf_large_cols['column_name_clob'].values[0]
__cols_clob = __cols_clob.split(',') if __cols_clob else []
__cols_blob = __pdf_large_cols['column_name_blob'].values[0]
__cols_blob = __cols_blob.split(',') if __cols_blob else []
logger.info(("__cols_clob : type{%s} len{%s}" % (type(__cols_clob), len(__cols_clob))))
logger.info(("__cols_blob : type{%s} len{%s}" % (type(__cols_blob), len(__cols_blob))))
for i,j in zip(__pv_df_param.columns, __pv_df_param.dtypes):
# table(s): additionaldet_xml_template.xml_template, tom_batch_order_template_ref.template
if "object" in str(j) and i in __cols_blob:
__dtypedict.update({i: sqlalchemy_types.BLOB(length=None)})
elif "object" in str(j) and i in __cols_clob:
# table(s): tom_service_action_master.extended_rule, generic_template_ref.template_text
__dtypedict.update({i: sqlalchemy_types.CLOB(length=None)})
elif "object" in str(j):
__dtypedict.update({i: sqlalchemy_types.NVARCHAR(length=255)})
if "datetime" in str(j):
__dtypedict.update({i: sqlalchemy_types.DateTime()})
if "float" in str(j):
__dtypedict.update({i: sqlalchemy_types.Float(precision=3, asdecimal=True)})
if "int" in str(j):
__dtypedict.update({i: sqlalchemy_types.INT()})
except Exception as __exp:
logger.error(("func_sqlcol exception. Error - {%s} . Line No - {%s} " % (str(__exp),str(sys_exc_info()[-1].tb_lineno))))
return __dtypedict
# endDef==>func_sqlcol()
def processDataFrame(args, configProperties, logger, db_engine, pv_schema, pv_query, pv_table=None, pk_column=None, pv_sequence=None, pdf_large_cols=None):
__df_data=pd_DataFrame()
__connection_text=''
__pv_query=''
__logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH')
try:
dbType = args['dbType']
__df_data = pd_read_sql(pv_query, con=db_engine)
__df_data.columns =__df_data.columns.str.lower()
if pv_table in ('sso_user','sso_user_details'):
__df_data = __df_data.head(1)
logger.info(("Count: %s " % len(__df_data)))
if len(__df_data) > 0:
if 'start_date' in __df_data.columns:
__df_data['start_date'] = dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S")
__df_data['start_date'] = __df_data['start_date'].astype('datetime64[ns]')
if 'end_date' in __df_data.columns:
__df_data['end_date'] = (dt_datetime.now()+dt_timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S")
__df_data['end_date'] = __df_data['end_date'].astype('datetime64[ns]')
# TIB_SSO.SSO_OPERATOR_REF
if 'operator_id' in __df_data.columns and pv_table=='sso_operator_ref':
__df_data['operator_id']=args['userName']
elif 'operator_id' in __df_data.columns:
__df_data['operator_id']=args['opIdNew']
if 'op_id' in __df_data.columns:
__df_data['op_id']=args['opIdNew']
if 'op_desc' in __df_data.columns:
__df_data['op_desc']=args['opIdNew']
if 'bu_id' in __df_data.columns:
__df_data['bu_id']=args['buIdNew']
if 'updated_by' in __df_data.columns:
__df_data['updated_by']='newTenantInitialSetup'
if 'created_by' in __df_data.columns:
__df_data['created_by']='newTenantInitialSetup'
if 'created_date' in __df_data.columns:
__df_data['created_date']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S")
__df_data['created_date'] = __df_data['created_date'].astype('datetime64[ns]')
if 'created_on' in __df_data.columns:
__df_data['created_on']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S")
__df_data['created_on'] = __df_data['created_on'].astype('datetime64[ns]')
if 'created_dt' in __df_data.columns:
__df_data['created_dt']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S")
__df_data['created_dt'] = __df_data['created_dt'].astype('datetime64[ns]')
if 'custom_created_date' in __df_data.columns:
__df_data['custom_created_date']=str(dt_datetime.now().strftime("%d-%b-%Y %H:%M:%S"))
if 'custom_updated_date' in __df_data.columns:
__df_data['custom_created_date']=str(dt_datetime.now().strftime("%d-%b-%Y %H:%M:%S"))
if 'updated_date' in __df_data.columns:
__df_data['updated_date']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S")
__df_data['updated_date'] = __df_data['updated_date'].astype('datetime64[ns]')
if 'updated_on' in __df_data.columns:
__df_data['updated_on']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S")
__df_data['updated_on'] = __df_data['updated_on'].astype('datetime64[ns]')
if 'updated_dt' in __df_data.columns:
__df_data['updated_dt']=dt_datetime.now().strftime("%Y-%m-%d %H:%M:%S")
__df_data['updated_dt'] = __df_data['updated_dt'].astype('datetime64[ns]')
# TIB_SSO.sso_user
if pv_table == 'sso_user':
__df_data['login']=args['userName']
__df_data['password']='$2a$10$QKzQqwLagJKBkdcITeKsReoEL8Jd2JfMpHFF2695z.LZNiSpsSNXO'
__df_data['first_name']=args['userName']
__df_data['last_name']=args['userName']
__df_data['email']='hobs-support@tcs.com'
__df_data['status']='A'
__df_data['country']=configProperties.get('environment','country',raw=True)
__df_data['language_cd']=configProperties.get('environment','language_cd',raw=True)
__df_data['last_pswd_change'] = (dt_datetime.now()+dt_timedelta(days=30)).strftime("%Y-%m-%d %H:%M:%S")
__df_data['last_pswd_change'] = __df_data['last_pswd_change'].astype('datetime64[ns]')
__df_data['currency_cd']=configProperties.get('environment','currency',raw=True)
__df_data['failed_logins']=0
__df_data['acct_lock']='N'
# TIB_SSO.sso_user_details/sso_user_role
if 'login' in __df_data.columns and pv_table in ('sso_user_details', 'sso_user_role'):
__df_data['login']=args['userName']
# TIB_SSO.SSO_OU_REF
if pv_table == 'sso_ou_ref':
__df_data['ou_id']=args['buIdNew']
__df_data['ou_name']=args['buIdNew']
if dbType=='ORACLE':
if pv_table == 'sso_circle_ref' and pv_sequence == 'sso_hibernate_sequence':
pv_query = ("SELECT MAX(circle_nbr)+10 AS %s FROM %s.%s" % (pk_column
,pv_schema
,pv_table))
pv_query = db_engine.execute(pv_query)
pv_query = pv_query.fetchone()
__start_value = pv_query[pk_column]
#__df_data.apply(lambda col: col.drop_duplicates().reset_index(drop=True))
__df_data[pk_column] = range(__start_value, __start_value+len(__df_data))
pv_query = ("UPDATE %s.%s SET sequence_next_hi_value=%s WHERE sequence_name='CIRCLE_NBR_SEQ'"
% (pv_schema
, pv_sequence
, str(int(__start_value) + len(__df_data) + 100)
))
elif pv_sequence and len(__df_data) > 0:
__df_temp = __df_data.copy()
for index, row in __df_temp.iterrows():
pv_query = ("SELECT %s.%s.NEXTVAL AS %s FROM DUAL" % (pv_schema
, pv_sequence
, pk_column))
pv_query = db_engine.execute(pv_query)
pv_query = pv_query.fetchone()
__df_data.set_value(index, pk_column, pv_query[pk_column])
logger.info(("Oracle pv_sequence reset completed"))
elif dbType in ('MYSQL', 'MARIA'):
if pv_table == 'sso_circle_ref' and pv_sequence == 'sso_hibernate_sequence':
pv_query = ("SELECT MAX(circle_nbr)+10 AS %s FROM %s.%s" % (pk_column
,pv_schema
,pv_table))
pv_query = db_engine.execute(pv_query)
pv_query = pv_query.fetchone()
__start_value = pv_query[pk_column]
#__df_data.apply(lambda col: col.drop_duplicates().reset_index(drop=True))
__df_data[pk_column] = range(__start_value, __start_value+len(__df_data))
pv_query = ("UPDATE %s.%s SET sequence_next_hi_value=%s WHERE sequence_name='CIRCLE_NBR_SEQ'"
% (pv_schema
,pv_sequence
,str(int(__start_value) + len(__df_data) + 100)
))
elif pv_sequence and len(__df_data) > 0:
__query = ("SELECT id AS %s FROM %s.%s" % (pk_column
,pv_schema
,pv_sequence))
__query = db_engine.execute(__query)
__query = __query.fetchone()
__max_seq = __query[pk_column]+1 if __query[pk_column] else 0
# Fix- Table Max
__query = ("SELECT MAX(%s) AS %s FROM %s.%s" % (pk_column, pk_column
,pv_schema
,pv_table))
__query = db_engine.execute(__query)
__query = __query.fetchone()
__max_tbl = __query[pk_column]+1 if __query[pk_column] else 0
# Either of Max (Table/Sequence)
__start_value = max(__max_tbl, __max_seq)
logger.info(("MAX Sequence: %s Table: %s" % (__max_seq, __max_tbl)))
__df_data[pk_column] = range(__start_value, __start_value+len(__df_data))
pv_query = ("UPDATE %s.%s SET id=%s" % (pv_schema
,pv_sequence
,str(int(__start_value) + len(__df_data) + 100)
))
db_engine.execute(pv_query)
logger.info(("Maria/Mysql pv_sequence reset completed"))
db_engine.execute("SET FOREIGN_KEY_CHECKS=0")
db_engine = db_engine.execution_options(autocommit=False)
__outputdict = func_sqlcol(logger, __df_data, pdf_large_cols)
__df_data.to_sql(name= pv_table, con=db_engine, if_exists='append', index=False, dtype=__outputdict)
logger.info("Database Insert got completed successfully")
# endIf==>len(__df_data)
except Exception as __exp:
if 'unique constraint' in str(__exp):
__df_data.to_csv(sep='|', header=True, index=False, path_or_buf=__logpath+'/logs/'+pv_table + '.csv')
return ('9999','uniqueConstraint')
elif 'invalid identifier' in str(__exp):
logger.error("invalid identifier|"+str(__exp))
return ('9999','columnNameMisMatch')
else:
logger.error(("Exception: %s" % str(__exp)))
return ('9999','pyJobException')
return ('0000','Success')
def getData(schema, args, db_engine, configProperties, logger):
__query = ''
__sequence = ''
__df_data = pd_DataFrame()
__table_failed = {}
__table_success = {}
tblName = ''
__pk_column = ''
__large_cols = ''
args = dict(args)
__logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH')
try:
tblName = 'INIT'
db_connection = db_engine.connect()
logger.info(("db_engine: %s" % db_engine))
logger.info(("ArgsType: %s" % type(args)))
logger.info(("Args: %s" % args))
logger.info(("Query: %s" % args['opId']+'|'+args['buId']))
try:
__query = ("SELECT COUNT(*) as \"user_count\" FROM %s.%s WHERE bu_id='%s' AND op_id='%s' AND login='%s'" %(schema, 'sso_user', args['buIdNew'], args['opIdNew'], args['userName']))
__query = db_connection.execute(__query)
__query = __query.fetchone()
__query = __query['user_count']
logger.info(("user Existence : %s" % __query))
if __query > 0:
return ('0002'
,(('User {%s} already exists. Try with new userName' % args['userName']))
,list(__table_success.keys())
,list(__table_failed.keys())
)
__query = ("SELECT COUNT(*) as \"bu_ref_count\" FROM %s.%s WHERE bu_id='%s' AND op_id='%s'" %(schema, 'sso_bu_ref', args['buIdNew'], args['opIdNew']))
__query = db_connection.execute(__query)
__query = __query.fetchone()
__query = __query['bu_ref_count']
logger.info(("buId Existence : %s" % __query))
if __query > 0:
return ('0002'
,(('buId {%s} already exists. Try with new buId' % args['buIdNew']))
,list(__table_success.keys())
,list(__table_failed.keys())
)
__query = ("SELECT COUNT(*) as \"op_ref_count\" FROM %s.%s WHERE bu_id='%s' AND op_id='%s'" %(schema, 'sso_operator_ref', args['buIdNew'], args['opIdNew']))
__query = db_connection.execute(__query)
__query = __query.fetchone()
__query = __query['op_ref_count']
logger.info(("sso_operator Existence : %s" % __query))
if __query > 0:
return ('0002'
,(('sso_operator {%s} already exists. Try with new sso_operator' % args['opIdNew']))
,list(__table_success.keys())
,list(__table_failed.keys())
)
if configProperties.has_option('environment', 'tenant_tables'):
tenant_tables = json_loads(configProperties.get('environment','tenant_tables',raw=True))
if 'oracle' in str(db_engine) and configProperties.has_option('database_queries', 'cols_ora'):
__large_cols = ((configProperties.get('database_queries', 'cols_ora',raw=True) % (schema, str(tenant_tables).replace('[','').replace(']','').upper())))
elif 'mysql' in str(db_engine) and configProperties.has_option('database_queries', 'cols_mysql'):
__large_cols = ((configProperties.get('database_queries', 'cols_mysql',raw=True) % (schema, str(tenant_tables).replace('[','').replace(']','').upper())))
else:
raise Exception('Undefined database type or large objects query not defined')
__df_large_cols = pd_read_sql(__large_cols
,con=db_engine)
__df_large_cols.columns =__df_large_cols.columns.str.lower()
for tblName in tenant_tables:
logger.info(("--------------------+ tableName: %s.%s " % (schema, tblName)))
__query = ''
__df = pd_DataFrame()
__df = __df_large_cols[['column_name_clob','column_name_blob']].loc[(__df_large_cols['table_name']==tblName)].reset_index(drop=True)
if tblName == 'sso_bu_ref':
__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId'])))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='bu_nbr', pv_sequence='sso_bu_ref_seq', pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
elif tblName == 'sso_operator_ref':
__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId'])))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column=None, pv_sequence=None, pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
elif tblName == 'sso_user':
#__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' AND login = '152169' " %(schema, tblName, args['buId'], args['opId'])))
__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId'])))
#__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' AND login = '%s' " % (schema
# ,tblName
# ,args['buId']
# ,args['opId']
# ,configProperties.get('environment', 'clone_user',raw=True))))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column=None, pv_sequence=None, pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
elif tblName == 'sso_user_details':
#__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND operator_id='%s' AND login = '152169' " %(schema, tblName, args['buId'], args['opId'])))
__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND operator_id='%s' " %(schema, tblName, args['buId'], args['opId'])))
#__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND operator_id='%s' AND login = '%s' " % (schema
# ,tblName
# ,args['buId']
# ,args['opId']
# ,configProperties.get('environment', 'clone_user',raw=True))))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column=None, pv_sequence=None, pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
elif tblName == 'sso_user_role' or tblName == 'mk_sso_user_role':
# __query = (("SELECT NULL AS id, '%s' AS login, role_name FROM %s.%s WHERE LOGIN = '152169' AND role_name in ('AmsUser','SupportGroup','Non-BillingOps', 'PPM_TRANS_MGR','BillingOps','crms.salesuser','crms.user','crms.superuser','crms.masteruser','PPM_TRANS_MGR','WLIST_GENERIC','HOBS_USER','WLIST_ADMIN_USER','WLIST_CACHE_USER','WLIST_USER') "
# %(args['userName']
# ,schema
# ,tblName
# )))
#__query = (("SELECT NULL AS id, '%s' AS login, role_name FROM %s.%s WHERE role_name in ('AmsUser','SupportGroup','Non-BillingOps', 'PPM_TRANS_MGR','BillingOps','crms.salesuser','crms.user','crms.superuser','crms.masteruser','PPM_TRANS_MGR','WLIST_GENERIC','HOBS_USER','WLIST_ADMIN_USER','WLIST_CACHE_USER','WLIST_USER') "
# %(args['userName']
# ,schema
# ,tblName
# )))
__query = (("SELECT NULL AS id, '%s' AS login, role_name FROM %s.%s WHERE LOGIN = '%s' AND role_name in ('AmsUser','SupportGroup','Non-BillingOps', 'PPM_TRANS_MGR','BillingOps','crms.salesuser','crms.user','crms.superuser','crms.masteruser','PPM_TRANS_MGR','WLIST_GENERIC','HOBS_USER','WLIST_ADMIN_USER','WLIST_CACHE_USER','WLIST_USER') "
%(args['userName']
,schema
,tblName
,configProperties.get('environment', 'clone_user',raw=True)
)))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='id', pv_sequence='sso_user_role_seq', pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
elif tblName == 'sso_attr_ref':
__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId'])))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='attr_seq_no', pv_sequence='sso_attr_ref_seq', pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
elif tblName == 'sso_hob_config':
__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId'])))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='hob_config_nbr', pv_sequence='sso_hob_config_seq', pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
elif tblName == 'sso_ou_ref':
__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND op_id='%s' " %(schema, tblName, args['buId'], args['opId'])))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column=None, pv_sequence=None, pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
elif tblName == 'sso_circle_ref':
__query = (("SELECT * FROM %s.%s WHERE bu_id='%s' AND operator_id='%s' " %(schema, tblName, args['buId'], args['opId'])))
__execStatus = processDataFrame(args, configProperties, logger, db_engine, pv_schema=schema, pv_query=__query, pv_table=tblName, pk_column='circle_nbr', pv_sequence='sso_hibernate_sequence', pdf_large_cols=__df)
if __execStatus[0] == '0000':
__table_success.__setitem__(tblName,"Success")
else:
__table_failed.__setitem__(tblName, __execStatus[1])
else:
raise Exception("Undefined table in list")
# endFor==>___iterate_tenant_tables___
else:
raise Exception("Table list not defined")
# endIf==>___configProperties_tenant_tables___
except Exception as __exp:
if 'unique constraint' in str(__exp):
__table_failed.__setitem__(tblName,"uniqueConstraint")
__df_data.to_csv(sep='|', header=True, index=False, path_or_buf=__logpath+'/logs/'+tblName + '.csv')
elif 'invalid identifier' in str(__exp):
__table_failed.__setitem__(tblName, "columnNameMisMatch")
else:
__table_failed.__setitem__(tblName, "pyJobException")
logger.error(("Error - {%s} . Line No - {%s} " % (str(__exp),str(sys_exc_info()[-1].tb_lineno))))
# endTry==>___processing___
logger.info(("--------------------+ stats"))
if len(__table_success) == 0 or len(__table_failed) > 0:
logger.error(("__table_failed: %s" % __table_failed))
return ('9999'
,str(__table_failed).replace('\'\'','"')
,list(__table_success.keys())
,list(__table_failed.keys())
)
except Exception as __exp:
logger.error(("Error - {%s} . Line No - {%s} " % (str(__exp),str(sys_exc_info()[-1].tb_lineno))))
return ('9999'
,schema+' ' + str(__exp)
,list(__table_success.keys())
,list(__table_failed.keys())
)
return ('0000'
,'Success'
,list(__table_success.keys())
,list(__table_failed.keys())
)
if __name__ == '__main__':
getData(schema, args, db_engine, configProperties, logger)Editor is loading...