Untitled

 avatar
unknown
plain_text
2 years ago
15 kB
4
Indexable
if over_ride == 'NO':
            dbConnectRetVal = setCassandraDBConnection(pandas_enabled=True)
            if not dbConnectRetVal[0]:
                raise Exception("Cassandra database connection failed: " + dbConnectRetVal[1])
            cassDbSession = dbConnectRetVal[1]
            print("Cassandra database connection established")
            print(over_ride)

            # Source database connection
            # dbConnectRetVal = setSourceConnection(configMainProperties)
            # --Invoke utility based function
            dbConnectRetVal = setDbEngine()
            if not dbConnectRetVal[0]:
                raise Exception("Source database connection failed: " + dbConnectRetVal[1])
            sourceDbEngine = dbConnectRetVal[1]
            print("Source database connection established")

            print("opId                : %s" % opId)
            print("buId                : %s" % buId)
            print("cassPath            : %s" % cassPath)

            for indx in entityType:
                if configProperties.has_section(indx):
                    print("-------- PROCESSING ENTITY: %s ----------------" % indx)
                    if configProperties.has_option(indx, "datasets"):
                        dataset = json.loads(configProperties.get(indx, "datasets"))
                        print("dataset: %s" % dataset)
                        for indxDS in dataset:
                            print("--- Processing dataset: %s" % indxDS)
                            query = configProperties.get(indx, indxDS + ".query", raw=True)
                            columnsForComparison = json.loads(configProperties.get(indx, indxDS + ".columnsForComparison"))
                            lkupColumns = json.loads(configProperties.get(indx, indxDS + ".lkupColumns"))
                            joinCondition = configProperties.get(indx, indxDS + ".joinCondition", raw=True)
                            primaryKey = configProperties.get(indx, indxDS + ".primaryKey",
                                                            raw=True) if configProperties.has_option(indx,
                                                                                                   indxDS + ".primaryKey") else None
                            dateTimeColumns = json.loads(
                                configProperties.get(indx, indxDS + ".dateTimeColumns")) if configProperties.has_option(
                                indx, indxDS + ".dateTimeColumns") else []
                            dateColumns = json.loads(
                                configProperties.get(indx, indxDS + ".dateColumns")) if configProperties.has_option(indx,
                                                                                                                indxDS + ".dateColumns") else []
                            columnsToRemove = json.loads(
                                configProperties.get(indx, indxDS + ".columnsToRemove")) if configProperties.has_option(
                                indx, indxDS + ".columnsToRemove") else []
                            connDatabase = configProperties.get(indx, indxDS + ".database",
                                                            raw=True) if configProperties.has_option(indx,
                                                                                                     indxDS + ".database") else None
                            columnsToInt = json.loads(
                                configProperties.get(indx, indxDS + ".columnsToInt")) if configProperties.has_option(indx,
                                                                                                                 indxDS + ".columnsToInt") else []
                            columnsToText = json.loads(
                                configProperties.get(indx, indxDS + ".columnsToText")) if configProperties.has_option(indx,
                                                                                                                  indxDS + ".columnsToText") else []

                            if 'homm_batch' in query.lower():
                                query = (configProperties.get(indx, indxDS + ".query", raw=True) % (batchId))
                            elif "op_id='%s'" in query.lower().replace(' ', '') and "bu_id='%s'" in query.lower().replace(
                                ' ', ''):
                                query = (configProperties.get(indx, indxDS + ".query", raw=True) % (opId, buId))

                            print("Query               : %s" % query)
                            print("columnsForComparison: %s" % columnsForComparison)
                            print("lkupColumns         : %s" % lkupColumns)
                            print("joinCondition       : %s" % joinCondition)
                            print("primaryKey          : %s" % primaryKey)
                            print("dateTimeColumns     : %s" % dateTimeColumns)
                            print("dateColumns         : %s" % dateColumns)
                            print("columnsToInt        : %s" % columnsToInt)
                            print("columnsToText       : %s" % columnsToText)
                            print("connDatabase        : %s" % connDatabase)

                            if indxDS == "stageData":
                                dfSrcData = cassDbSession.execute(query, timeout=1000)
                                dfSrcData = dfSrcData._current_rows
                                dfSrcData.columns = dfSrcData.columns.str.lower()
                                print("dfSrcData", type(dfSrcData))
                                print("Extracted           : %s [%s]" % (len(dfSrcData), indxDS))
                                if len(dfSrcData) == 0:
                                    raise Exception("Entity Interaction not stagged for batch[%s]" % batchId)
                                dfSrcData['homm_entity_value'] = dfSrcData['homm_entity_value'].astype(str)
                                dfDataSet = dfSrcData.copy().loc[(dfSrcData['homm_entity_status'] == 'staged')]
                                print("homm_entity_status  : %s [Filter]" % (len(dfDataSet)))
                                if len(dfDataSet) == 0:
                                    raise Exception(
                                        "Entity Interaction not stagged for batch[%s] after filter=staged" % batchId)
                                dfMigMasterFailed = dfSrcData.copy()
                            else:
                                if not (len(lkupColumns) > 0 and len(columnsForComparison) > 0 and joinCondition):
                                    raise Exception(
                                        "Merge entity lkupColumns/columnsForComparison/joinCondition not defined properly")

                                dfSrcData = pd.read_sql(query, con=sourceDbEngine)
                                dfSrcData.columns = dfSrcData.columns.str.lower()
                                print("Extracted           : %s [%s]" % (len(dfSrcData), indxDS))
                                if len(dfSrcData) == 0:
                                    raise Exception("Entity Interaction failed in traversing datasets.")

                                if 'hosi_account_extn_attributes' in list(dfSrcData):
                                    for indx, row in dfSrcData.copy().iterrows():
                                        try:
                                            extnVal = {}
                                            temp = json.loads(row['hosi_account_extn_attributes']) if row[
                                                'hosi_account_extn_attributes'] else []
                                            for v in temp:
                                                try:
                                                    if 'attributeName' in v and 'attributeValue' in v and v[
                                                        'attributeValue']:
                                                        extnVal.update({v['attributeName']: v['attributeValue']})
                                                except Exception as errx:
                                                    print("Error - {} . Line No - {} ".format(str(errx), str(
                                                        sys.exc_info()[-1].tb_lineno)))
                                            if len(extnVal) > 0:
                                                dfSrcData.at[indx, 'hosi_account_extn_attributes'] = extnVal
                                        except Exception as err:
                                            print("Error - {} . Line No - {} ".format(str(err),
                                                                                  str(sys.exc_info()[-1].tb_lineno)))

                                if primaryKey:
                                    dfSrcData[primaryKey] = uuid.uuid4()
                                    dfSrcData[primaryKey] = dfSrcData[primaryKey].apply(lambda _: uuid.uuid4())

                                for indxDateTimCol in dateTimeColumns:
                                    # dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat)
                                    dfSrcData[indxDateTimCol] = dfSrcData[indxDateTimCol].dt.strftime(dateTimeFormat).where(
                                        dfSrcData[indxDateTimCol].notnull(), None)
                                for indxDateCol in dateColumns:
                                    dfSrcData[indxDateCol] = dfSrcData[indxDateCol].dt.strftime(dateFormat).where(
                                        dfSrcData[indxDateCol].notnull(), None)
                                for indxCharCol in columnsToInt:
                                    dfSrcData[indxCharCol] = pd.to_numeric(dfSrcData[indxCharCol], errors='coerce')
                                for indxMiscelCol in columnsToText:
                                    dfSrcData[indxMiscelCol] = dfSrcData[indxMiscelCol].astype(str)

                                print("dfDataSet           : %s [Before Merge]" % len(dfDataSet))
                                dfDataSet = dfDataSet.copy().merge(dfSrcData
                                                               , how=joinCondition
                                                               , left_on=lkupColumns
                                                               , right_on=columnsForComparison
                                                               )

                                print("dfDataSet           : %s [After Merge]" % len(dfDataSet))
                                print("Columns             : %s" % (list(dfDataSet)))
                                if len(dfDataSet) == 0:
                                    raise Exception("LOST RECORDS POST MERGE")

                                dfDataSet.drop_duplicates(subset=None, keep='first', inplace=True)
                                if len(columnsToRemove) > 0:
                                    print("columnsToRemove     : %s {%s}" % (columnsToRemove, type(columnsToRemove)))
                                    dfDataSet.drop(columnsToRemove, axis=1, inplace=True)

                    # endFor==dataset
                    else:
                        failedEntities.update({indx: "datasets section not found"})
                else:
                    failedEntities.update({indx: "entityInteraction section not found"})
        # endFor==>entityType

        # DataLoading and update Status
            if len(failedEntities):
                raise Exception("Failed entities: " + str({k: v for k, v in failedEntities.items()}))
            elif len(dfDataSet) == 0:
                raise Exception("NO RECORDS TO PROCESS")
            else:
                print("dfDataSet : type{%s}" % type(list(dfDataSet)))
                print(list(dfDataSet))
                print("")

                print("Total               : %s" % len(dfDataSet))

                tableName = 'hos_interaction'
                if not loadDataFrameToCassandra(connection_id="cassandrahost"
                        , table_name=tableName
                        , df_data=dfDataSet
                        , load_type_cqlldr=True
                        , dag_name=kwargs['dag_run'].dag_id
                        , file_suffix="_" + str(batchId)):
                    raise Exception("ODS data store load failed.")
                print("Cassandra dataload submitted")

                tableName = 'hos_migration_master'
                dfDataSet['homm_entity_value'] = dfDataSet['hosi_interaction_id']
                dfDataSet['homm_batch'] = batchId
                dfDataSet['homm_entity_id'] = 'interaction'
                dfDataSet['homm_entity_status'] = 'odsStaged'
                dfDataSet['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat)
                if not loadDataFrameToCassandra(connection_id="cassandrahost"
                        , table_name=tableName
                        , df_data=dfDataSet[
                            ['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_updated_on',
                             'homm_entity_value']]
                        , load_type_cqlldr=True
                        , dag_name=kwargs['dag_run'].dag_id
                        , file_suffix="_" + str(batchId)):
                    raise Exception("Staging master odsStaged update failed")

                dfDataSet['homm_entity_value'] = dfDataSet['homm_entity_value'].astype(str)
                dfMigMasterFailed = dfMigMasterFailed[['homm_entity_value']].merge(dfDataSet[['homm_entity_value']]
                                                                               , how='left'
                                                                               , on=['homm_entity_value']
                                                                               , indicator=True
                                                                               ).loc[lambda x: x['_merge'] != 'both']
                print("dfMigMasterFailed   : %s" % len(dfMigMasterFailed))
                dfMigMasterFailed['homm_batch'] = batchId
                dfMigMasterFailed['homm_entity_id'] = 'interaction'
                dfMigMasterFailed['homm_entity_status'] = 'odsFailed'
                dfMigMasterFailed['homm_updated_on'] = datetime.strftime(datetime.utcnow(), dateTimeFormat)
                if not loadDataFrameToCassandra(connection_id="cassandrahost"
                        , table_name=tableName
                        , df_data=dfMigMasterFailed[
                            ['homm_entity_id', 'homm_entity_status', 'homm_batch', 'homm_updated_on',
                             'homm_entity_value']]
                        , load_type_cqlldr=True
                        , dag_name='odsBulkMigration_Interaction'
                        , file_suffix="_" + str(batchId) + "_Failed"):
                    raise Exception("Staging master odsFailed update failed")
        # endIf
            print("SUCCESS")
Editor is loading...