Untitled
unknown
plain_text
a year ago
4.9 kB
5
Indexable
import logging
import os
from datetime import datetime, timedelta
import boto3
import awswrangler as wr
import pandas as pd
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
req_ss_df_cols = [
"ISIN",
"SEDOL",
"CUSIP",
"Security Internal Code",
"Bloomberg",
"Primary Exchange",
"Security Description",
"Security Currency",
"Maturity date",
]
transformed_ss_df_cols = [
"SecurityID",
"SecurityIDType",
"InternalSecID",
"Ticker",
"ExchangeCode",
"SecurityDescription",
"SecurityCurrency",
"MaturityDate",
"ExpiryDate",
]
mapping_config = {
"Bloomberg": "Ticker",
"Primary Exchange": "ExchangeCode",
"Security Description": "SecurityDescription",
"Security Currency": "SecurityCurrency",
"Maturity date": "MaturityDate",
"Maturity date": "ExpiryDate",
}
sm_df_cols = [
"CADIS_ID",
"SecurityID",
"SecurityIDType",
"InternalSecID",
"Ticker",
"ExchangeCode",
"SecurityDescription",
"SecurityCurrency",
"MaturityDate",
"ExpiryDate",
]
s3 = boto3.client("s3")
def derive_security_id_and_type(row: pd.Series) -> pd.Series:
"""Derive SecurityID and SecurityIDType based on hierarchy of ISIN > SEDOL > CUSIP > Internal"""
if pd.notnull(row["ISIN"]):
row["SecurityID"] = row["ISIN"]
row["SecurityIDType"] = "ISIN"
elif pd.notnull(row["SEDOL"]):
row["SecurityID"] = row["SEDOL"]
row["SecurityIDType"] = "SEDOL"
elif pd.notnull(row["CUSIP"]):
row["SecurityID"] = row["CUSIP"]
row["SecurityIDType"] = "CUSIP"
else:
row["SecurityID"] = row["Security Internal Code"]
row["SecurityIDType"] = "Internal"
def read_securities_source(bucket: str, key_name: str) -> pd.DataFrame:
"""Read and transform securities source file from S3 for further processing"""
LOGGER.info(f"Reading securities source file from S3 bucket.")
# Read the source file
ss_df = wr.s3.read_csv(
f"s3://{bucket}/{key_name}",
dtype=str,
skipinitialspace=True,
encoding="unicode_escape",
).replace(r"\s+$", "", regex=True)
# Filter source file to keep only mandatory attributes
ss_df_filtered = ss_df[req_ss_df_cols]
# Derive SecurityID and SecurityIDType based on hierarchy
ss_df_filtered = ss_df_filtered.apply(derive_security_id_and_type, axis=1)
# Rename remaining attributes according to mapping config
ss_df_filtered.rename(columns=mapping_config, inplace=True)
# Re-arrange the attributes in the required order
ss_df_filtered = ss_df_filtered[transformed_ss_df_cols]
return ss_df_filtered
def read_securities_master(bucket: str) -> pd.Dataframe:
"""Read the securities_master from S3"""
LOGGER.info(f"Reading securities master file from S3 bucket.")
try:
# Read the securities_master file
sm_df = wr.s3.read_csv(
f"s3://{bucket}/{os.environ['securities_master']}",
dtype=str,
skipinitialspace=True,
encoding="unicode_escape",
compression="gzip",
).replace(r"\s+$", "", regex=True)
except wr.exceptions.NoFilesFound:
sm_df = pd.DataFrame(columns=sm_df_cols)
return sm_df
def find_delta_secuities(ss_df: pd.DataFrame, sm_df: pd.DataFrame) -> pd.DataFrame:
"""Find delta records between source and master"""
# Merge both dataframes on SecurityID
delta_df = ss_df.merge(sm_df, on=["SecurityID"], how="left", indicator=True)
# Keep only records that are in source but not in master
delta_df = delta_df[delta_df["_merge"] == "left_only"].drop("_merge", axis=1)
if delta_df.empty:
LOGGER.info("No delta records found.")
else:
LOGGER.info(f"Found {len(delta_df)} delta records.")
return delta_df
def process_securities(bucket: str, key_name: str):
"""Main function to process securities source file and find delta securities."""
LOGGER.info("Capturing Delta Securities Process Started...")
# Read the securities source and securities master files
ss_df = read_securities_source(bucket, key_name)
sm_df = read_securities_master(bucket)
# Find delta records
delta_df = find_delta_secuities(ss_df, sm_df)
if not delta_df.empty:
# Save the delta records to a file
delta_file_name = f"/tmp/PIHKSOURCE_Delta_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.csv"
delta_df.to_csv(delta_file_name, index=None)
# # Transfer the file
# gamft_file_transfer(delta_file_name, secrets)
# LOGGER.info(f"Delta records file saved and transferred: {delta_file_name}")
else:
LOGGER.info("No delta records to process.")
Editor is loading...
Leave a Comment