Untitled
unknown
plain_text
5 months ago
4.9 kB
2
Indexable
import logging import os from datetime import datetime, timedelta import boto3 import awswrangler as wr import pandas as pd LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) req_ss_df_cols = [ "ISIN", "SEDOL", "CUSIP", "Security Internal Code", "Bloomberg", "Primary Exchange", "Security Description", "Security Currency", "Maturity date", ] transformed_ss_df_cols = [ "SecurityID", "SecurityIDType", "InternalSecID", "Ticker", "ExchangeCode", "SecurityDescription", "SecurityCurrency", "MaturityDate", "ExpiryDate", ] mapping_config = { "Bloomberg": "Ticker", "Primary Exchange": "ExchangeCode", "Security Description": "SecurityDescription", "Security Currency": "SecurityCurrency", "Maturity date": "MaturityDate", "Maturity date": "ExpiryDate", } sm_df_cols = [ "CADIS_ID", "SecurityID", "SecurityIDType", "InternalSecID", "Ticker", "ExchangeCode", "SecurityDescription", "SecurityCurrency", "MaturityDate", "ExpiryDate", ] s3 = boto3.client("s3") def derive_security_id_and_type(row: pd.Series) -> pd.Series: """Derive SecurityID and SecurityIDType based on hierarchy of ISIN > SEDOL > CUSIP > Internal""" if pd.notnull(row["ISIN"]): row["SecurityID"] = row["ISIN"] row["SecurityIDType"] = "ISIN" elif pd.notnull(row["SEDOL"]): row["SecurityID"] = row["SEDOL"] row["SecurityIDType"] = "SEDOL" elif pd.notnull(row["CUSIP"]): row["SecurityID"] = row["CUSIP"] row["SecurityIDType"] = "CUSIP" else: row["SecurityID"] = row["Security Internal Code"] row["SecurityIDType"] = "Internal" def read_securities_source(bucket: str, key_name: str) -> pd.DataFrame: """Read and transform securities source file from S3 for further processing""" LOGGER.info(f"Reading securities source file from S3 bucket.") # Read the source file ss_df = wr.s3.read_csv( f"s3://{bucket}/{key_name}", dtype=str, skipinitialspace=True, encoding="unicode_escape", ).replace(r"\s+$", "", regex=True) # Filter source file to keep only mandatory attributes ss_df_filtered = ss_df[req_ss_df_cols] # Derive SecurityID and SecurityIDType based on hierarchy ss_df_filtered = ss_df_filtered.apply(derive_security_id_and_type, axis=1) # Rename remaining attributes according to mapping config ss_df_filtered.rename(columns=mapping_config, inplace=True) # Re-arrange the attributes in the required order ss_df_filtered = ss_df_filtered[transformed_ss_df_cols] return ss_df_filtered def read_securities_master(bucket: str) -> pd.Dataframe: """Read the securities_master from S3""" LOGGER.info(f"Reading securities master file from S3 bucket.") try: # Read the securities_master file sm_df = wr.s3.read_csv( f"s3://{bucket}/{os.environ['securities_master']}", dtype=str, skipinitialspace=True, encoding="unicode_escape", compression="gzip", ).replace(r"\s+$", "", regex=True) except wr.exceptions.NoFilesFound: sm_df = pd.DataFrame(columns=sm_df_cols) return sm_df def find_delta_secuities(ss_df: pd.DataFrame, sm_df: pd.DataFrame) -> pd.DataFrame: """Find delta records between source and master""" # Merge both dataframes on SecurityID delta_df = ss_df.merge(sm_df, on=["SecurityID"], how="left", indicator=True) # Keep only records that are in source but not in master delta_df = delta_df[delta_df["_merge"] == "left_only"].drop("_merge", axis=1) if delta_df.empty: LOGGER.info("No delta records found.") else: LOGGER.info(f"Found {len(delta_df)} delta records.") return delta_df def process_securities(bucket: str, key_name: str): """Main function to process securities source file and find delta securities.""" LOGGER.info("Capturing Delta Securities Process Started...") # Read the securities source and securities master files ss_df = read_securities_source(bucket, key_name) sm_df = read_securities_master(bucket) # Find delta records delta_df = find_delta_secuities(ss_df, sm_df) if not delta_df.empty: # Save the delta records to a file delta_file_name = f"/tmp/PIHKSOURCE_Delta_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.csv" delta_df.to_csv(delta_file_name, index=None) # # Transfer the file # gamft_file_transfer(delta_file_name, secrets) # LOGGER.info(f"Delta records file saved and transferred: {delta_file_name}") else: LOGGER.info("No delta records to process.")
Editor is loading...
Leave a Comment