Untitled

 avatar
unknown
plain_text
5 months ago
4.8 kB
3
Indexable
import logging
import os
from datetime import datetime, timedelta
import boto3
import awswrangler as wr
import pandas as pd
from src.notify import send_mail

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

req_ss_df_cols = [
    "ISIN",
    "SEDOL",
    "CUSIP",
    "Security Internal Code",
    "Bloomberg",
    "Primary Exchange",
    "Security Description",
    "Security Currency",
    "Maturity date",
]

transformed_ss_df_cols = [
    "SecurityID",
    "SecurityIDType",
    "InternalSecID",
    "Ticker",
    "ExchangeCode",
    "SecurityDescription",
    "SecurityCurrency",
    "MaturityDate",
    "ExpiryDate",
]

mapping_config = {
    "Bloomberg": "Ticker",
    "Primary Exchange": "ExchangeCode",
    "Security Description": "SecurityDescription",
    "Security Currency": "SecurityCurrency",
    "Maturity date": "MaturityDate",
    "Maturity date": "ExpiryDate",
}

sm_df_cols = [
    "CADIS_ID",
    "SecurityID",
    "SecurityIDType",
    "InternalSecID",
    "Ticker",
    "ExchangeCode",
    "SecurityDescription",
    "SecurityCurrency",
    "MaturityDate",
    "ExpiryDate",
]

s3 = boto3.client("s3")


def derive_security_id_and_type(row: pd.Series) -> pd.Series:
    """Derive SecurityID and SecurityIDType based on hierarchy of ISIN > SEDOL > CUSIP > Internal"""
    if pd.notnull(row["ISIN"]):
        row["SecurityID"] = row["ISIN"]
        row["SecurityIDType"] = "ISIN"
    elif pd.notnull(row["SEDOL"]):
        row["SecurityID"] = row["SEDOL"]
        row["SecurityIDType"] = "SEDOL"
    elif pd.notnull(row["CUSIP"]):
        row["SecurityID"] = row["CUSIP"]
        row["SecurityIDType"] = "CUSIP"
    else:
        row["SecurityID"] = row["Security Internal Code"]
        row["SecurityIDType"] = "Internal"


def read_securities_source(bucket: str, key_name: str) -> pd.DataFrame:
    """Read and transform securities source file from S3 for further processing"""
    LOGGER.info(f"Reading securities source file from S3 bucket.")

    # Read the source file
    ss_df = wr.s3.read_csv(
        f"s3://{bucket}/{key_name}",
        dtype=str,
        skipinitialspace=True,
        encoding="unicode_escape",
    ).replace(r"\s+$", "", regex=True)

    # Filter source file to keep only mandatory attributes
    ss_df_filtered = ss_df[req_ss_df_cols]

    # Derive SecurityID and SecurityIDType based on hierarchy
    ss_df_filtered = ss_df_filtered.apply(derive_security_id_and_type, axis=1)

    # Rename remaining attributes according to mapping config
    ss_df_filtered.rename(columns=mapping_config, inplace=True)

    # Re-arrange the attributes in the required order
    ss_df_filtered = ss_df_filtered[transformed_ss_df_cols]

    return ss_df_filtered


def read_securities_master(bucket: str) -> pd.Dataframe:
    """Read the securities_master from S3"""
    LOGGER.info(f"Reading securities master file from S3 bucket.")

    try:
        # Read the securities_master file
        sm_df = wr.s3.read_csv(
            f"s3://{bucket}/{os.environ['securities_master']}",
            dtype=str,
            skipinitialspace=True,
            encoding="unicode_escape",
            compression="gzip",
        ).replace(r"\s+$", "", regex=True)
    except wr.exceptions.NoFilesFound:
        sm_df = pd.DataFrame(columns=sm_df_cols)

    return sm_df


def find_delta_secuities(ss_df: pd.DataFrame, sm_df: pd.DataFrame) -> pd.DataFrame:
    """Find delta securities between source and master files"""

    delta_df = ss_df[~ss_df["SecurityID"].isin(sm_df["SecurityID"])]

    if delta_df.empty:
        LOGGER.info("No delta securities found.")
    else:
        LOGGER.info(f"Found {len(delta_df)} delta securities.")

    return delta_df


def process_securities(bucket: str, key_name: str):
    """Main function to process securities source file and find delta securities."""
    LOGGER.info("Capturing Delta Securities Process Started...")

    # Read the securities source and securities master files
    ss_df = read_securities_source(bucket, key_name)
    sm_df = read_securities_master(bucket)

    # Find delta securities
    delta_df = find_delta_secuities(ss_df, sm_df)

    if not delta_df.empty:
        # Save the delta securities to a file
        delta_file_name = f"/tmp/PIHKSecurities_Delta_{datetime.strftime(datetime.now(), '%Y%m%d%H%M%S')}.csv"
        delta_df.to_csv(delta_file_name, index=None)

        if len(delta_df) >= 400:
            send_mail(f"File Name: {delta_file_name.split('/')[-1]}", msg_type="inform")

        LOGGER.info("Delta Securities Capture Completed.")
    else:
        LOGGER.info("No Delta Securities To Process.")
Editor is loading...
Leave a Comment