Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
2.0 kB
5
Indexable
Never
import pandas as pd
import json
import ast
import gc


def validateData(value):
    try:
        return ast.literal_eval(value)

    except Exception as e:
        # apilogger.debug(str(e))
        return {}


def fetchData(apilogger, connection, sql, configProperties, dataset, entity):
    # print(sql)
    pdEngagedPartyData = pd.DataFrame()
    dictColumns = json.loads(configProperties.get(entity, dataset + '.dictcolumns'))
    pdEngagedPartyData = pd.read_sql(sql, connection)
    pdEngagedPartyData.fillna('', inplace=True)
    if (len(pdEngagedPartyData) > 0):
        if 'partyId' in list(pdEngagedPartyData.columns):
            pdEngagedPartyData['partyId'] = pdEngagedPartyData['partyId'].astype(str)
        pdEngagedPartyData['stringifiedAddress'] = pdEngagedPartyData['address1'] + ',' + \
                                                   pdEngagedPartyData['address2'] + ',' + \
                                                   pdEngagedPartyData['city'] + ',' + \
                                                   pdEngagedPartyData['state'] + ',' + \
                                                   pdEngagedPartyData['zipcode'] + ',' + \
                                                   pdEngagedPartyData['country']

        pdEngagedPartyData['engagedParty'] = pdEngagedPartyData[dictColumns].to_dict(orient='records')
        pdEngagedPartyData['engagedParty'] = pdEngagedPartyData['engagedParty'].astype(str)
        # print(pdEngagedPartyData['engagedParty'].dtype)
        pdEngagedPartyData.drop_duplicates(inplace=True)

        pdEngagedPartyData['engagedParty'] = pdEngagedPartyData['engagedParty'].apply(validateData)
        # pdEngagedPartyData = pdEngagedPartyData.to_frame()
        pdEngagedPartyData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/pdEngagedPartyData.csv')
        # print(pdEngagedPartyData)
        gc.collect()
        return pdEngagedPartyData


if __name__ == '__main__':
    fetchData(connection)