Untitled

 avatar
unknown
plain_text
3 years ago
38 kB
4
Indexable
DaFileName:'/home/windmill/adm-471307/projects/dividends-uploader/dividends_uploader/utils/sql_collection.py'
import os

import yaml

SQL_YAML = "sql.yaml"


class DictWrapper(dict):
    """Converts multi layered dict to dot accessible object."""

    def __getattr__(self, name):
        value = self.get(name)
        if isinstance(value, dict):
            return DictWrapper(value)

        else:
            return value


class SqlCollection:
    def __init__(self):
        sql_yaml = os.path.join(os.path.dirname(__file__), SQL_YAML)
        self.yaml_content = yaml.safe_load(open(sql_yaml))
        self.collection = DictWrapper(self.yaml_content)


collection = SqlCollection().collection
DaFileName:'/home/windmill/adm-471307/projects/mifid/tests/test_mifid.py'
"""
    Tests to ensure the mifid package works as expected
"""

import os.path
import pathlib
import tempfile
import datetime
from unittest import mock
import yaml
import pandas as pd
import numpy as np

import pytest

from pandas.testing import assert_series_equal, assert_frame_equal
from python_common import log as logger
from mifid import database as db
from mifid import settings
from mifid import mifid

logger.init_logger(__file__)
db_connector = db.get_database_connector(logger)

processing_date = datetime.datetime.now().strftime("%Y%m%d")


def test_get_output_file_name():
    base_file_name = mifid.get_base_output_file_name(
        logger,
        processing_date,
    )

    number_of_revisions = 5

    with tempfile.TemporaryDirectory() as temp_output_dir:
        for counter in range(1, number_of_revisions):
            with open(
                pathlib.Path(temp_output_dir, base_file_name + str(counter)),
                "w+",
                encoding="utf-8",
            ) as test_file:
                test_file.write("Writing contents into the test file.")
        output_file_name = mifid.get_output_file_name(
            logger,
            temp_output_dir,
            processing_date,
        )

        expected_file_name = "".join(
            [
                base_file_name,
                str(number_of_revisions),
                ".csv",
            ]
        )

        assert expected_file_name == output_file_name


def test_populate_business_date():
    # mock up the trades data
    trades_df = pd.DataFrame(
        {
            "etc": pd.Series(
                [
                    "one",
                    "two",
                    "three",
                ],
            ),
        }
    )

    updated_trades_df = mifid.populate_business_date(
        logger,
        processing_date,
        trades_df,
    )

    expected_identification_code_series = pd.Series(
        np.array(
            [
                processing_date,
                processing_date,
                processing_date,
            ],
        ),
        name="Business Date",
    )

    assert_series_equal(
        expected_identification_code_series,
        updated_trades_df["Business Date"],
    )


def test_populate_bcc_entities():
    mocked_external_numbers = [
        33427818,
        33428905,
        90783989,
        445041,
        90783637,
        90783725,
        90784037,
        90784075,
        90783234,
        236622,
        746090713003234119,
    ]
    mocked_system_codes_from_feeds = [
        "MG",
        "MG",
        "",
        "MH",
        "MH",
        "MH",
        "MG",
        "MH",
        None,
        "MH",
        None,
    ]

    mics_df = pd.DataFrame(
        {
            "external_number": pd.Series(mocked_external_numbers),
            "system_code_to": pd.Series(mocked_system_codes_from_feeds),
        }
    )

    # mock up the trades data
    trades_df = pd.DataFrame(
        {
            "tsnumber": pd.Series(mocked_external_numbers),
        }
    )

    trades_df = mifid.populate_bcc_entities(
        logger,
        trades_df,
        mics_df,
    )

    expected_bcc_entities = pd.Series(
        [
            "FCG",
            "FCG",
            np.NaN,
            "FCH",
            "FCH",
            "FCH",
            "FCG",
            "FCH",
            np.NaN,
            "FCH",
            np.NaN,
        ],
        name="BCC Entity",
    )

    assert_series_equal(expected_bcc_entities, trades_df["BCC Entity"])


def test_populate_populate_product_ids_negative_test():
    database_args = {
        "MSFIL_LIB": "vd03MSFIL",
    }

    mocked_external_numbers = [
        33427818,
        33428905,
        746090713003234119,
    ]

    # mock up the trades data
    trades_df = pd.DataFrame(
        {
            "tsnumber": pd.Series(mocked_external_numbers),
            "etc": pd.Series(
                [
                    "one",
                    "two",
                    "three",
                ],
            ),
        }
    )

    ## Negative test with a non existent processing date
    non_existent_processing_date = 0
    sql_template = None
    with open(settings.SQL_FILE, "r", encoding="utf-8") as sql_file_handler:
        sql_template = yaml.load(sql_file_handler, Loader=yaml.FullLoader)

    assert sql_template is not None

    trades_df = mifid.populate_product_ids(
        logger,
        db_connector,
        sql_template,
        trades_df,
        non_existent_processing_date,
        **database_args,
    )

    assert trades_df is not None

    expected_identification_code_series = pd.Series(
        [
            np.NaN,
            np.NaN,
            np.NaN,
        ],
        name="Product Id",
    )

    assert_series_equal(
        expected_identification_code_series, trades_df["Product Id"], check_dtype=False
    )


@mock.patch.object(mifid, "get_df_from_sql")
def test_populate_instrument_identification_codes_positive_test(mock_get_df_from_sql):

    # mock up the trades data
    mocked_external_numbers = [
        1110001545,
        1110001545,
    ]
    trades_df = pd.DataFrame(
        {
            "tsnumber": pd.Series(mocked_external_numbers),
            "etc": pd.Series(
                [
                    "random-John",
                    "random-Mayer",
                ],
            ),
        }
    )

    sql_template = None
    with open(settings.SQL_FILE, "r", encoding="utf-8") as sql_file_handler:
        sql_template = yaml.load(sql_file_handler, Loader=yaml.FullLoader)

    assert sql_template is not None
    database_args = {
        "MSFIL_LIB": "vd03MSFIL",
    }

    # This simulates a data frame passed back as a result of a db sql
    # query for instrument identification code. It accurately demonstrates
    # the problem we had with the MICS db table's column,
    # ALTERNATE_PRODUCT_ID.identification is declared as 40 chars
    # but the output file for this MIFID data file is only given 30 characters.
    mocked_df_return_value = pd.DataFrame(
        data={
            "instrument_identification_code": ["AU000000BHP4                   "],
        }
    )

    mock_get_df_from_sql.return_value = mocked_df_return_value
    # positive test
    processing_date_that_exists_in_db = 1220303

    trades_df = mifid.populate_instrument_identification_codes(
        logger,
        db_connector,
        sql_template,
        trades_df,
        processing_date_that_exists_in_db,
        **database_args,
    )

    expected_identification_code_series = pd.Series(
        np.array(
            [
                "AU000000BHP4",
                "AU000000BHP4",
            ],
        ),
        name="Instrument Identification Code",
    )

    assert expected_identification_code_series.equals(
        other=trades_df["Instrument Identification Code"]
    )

    assert_series_equal(
        expected_identification_code_series,
        trades_df["Instrument Identification Code"],
    )


def test_write_all_data_to_csv():
    mocked_external_numbers = [
        "33427818           ",
        "33428905           ",
        "90783989           ",
        "445041             ",
        "90783637           ",
        "90783725           ",
        "90784037           ",
        "90784075           ",
        "90783234           ",
        "236622             ",
        "746090713003234119 ",
    ]
    # mock up the trades data
    trades_df = pd.DataFrame(
        {
            "tsnumber": pd.Series(mocked_external_numbers, dtype=object),
            "etc": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                    "four       ",
                    "five       ",
                    "six        ",
                    "seven      ",
                    "eight      ",
                    "nine       ",
                    "ten        ",
                    "eleven     ",
                ],
            ),
        }
    )

    with tempfile.NamedTemporaryFile() as output_path:
        mifid.write_data_to_csv(
            logger,
            output_path.name,
            trades_df,
        )

        assert os.path.exists(output_path.name)

        written_df = pd.read_csv(
            output_path.name,
            dtype=str,
        )

        assert_frame_equal(
            trades_df,
            written_df,
            check_names=False,
            check_like=True,
            check_index_type=False,
        )

        assert_series_equal(
            trades_df["tsnumber"],
            written_df["tsnumber"],
            check_series_type=False,
            check_dtype=False,
        )


def test_write_selected_column_data_to_csv():
    # mock up the trades data
    trades_df = pd.DataFrame(
        {
            "i should not exist": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Record Code": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Release Code": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Processing Date": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Processing Time": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "BCC Entity": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Source Transaction ID/External Number": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Client ID Short Code": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Execution within Firm": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Trading Date Time": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Instrument Identification Code": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Product ID": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Business Date": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
            "Investment Decision within Firm": pd.Series(
                [
                    "one        ",
                    "two        ",
                    "three      ",
                ],
            ),
        }
    )

    # We refer to the columsn by the "attribute" element.
    # The displays are controlled by the "header_label".
    columns_to_write_to_file = [
        "Record Code",
        "Release Code",
        "Processing Date",
        "Processing Time",
        "BCC Entity",
        "Source Transaction ID/External Number",
        "Client ID Short Code",
        "Execution within Firm",
        "Trading Date Time",
        "Instrument Identification Code",
        "Product ID",
        "Business Date",
        "Investment Decision within Firm",
    ]

    with tempfile.NamedTemporaryFile() as output_path:
        mifid.write_data_to_csv(
            logger, output_path.name, trades_df, output_columns=columns_to_write_to_file
        )

        # After writing the trades df, we drop the unwanted column and we
        # use it to compare against the written csv which we will reimport
        # back as a dataframe
        trades_df.drop(
            "i should not exist",
            inplace=True,
            axis=1,
        )

        assert os.path.exists(output_path.name)
        columns_to_write_to_file = []

        written_df = pd.read_csv(
            output_path.name,
            dtype=str,
        )

        additional_column_no_longer_detected_in_written_file = (
            "i should not exist" not in trades_df.columns
        )

        assert additional_column_no_longer_detected_in_written_file is True

        assert_frame_equal(
            trades_df,
            written_df,
            check_names=False,
            check_like=True,
            check_index_type=False,
        )

        logger.debug(
            " columns_to_write_to_file: Mocked series :\n"
            + " \nWritten DF: \n"
            + ", ".join(written_df.columns)
        )


def test_format_data_value():
    test_scenarios = [
        # source string is the same as the max length
        {"source": "Mayer", "expected": "Mayer", "max_length": 5},
        # source string exceeds max length
        {"source": "Mayer", "expected": "M", "max_length": 1},
        # source string exceeds max length
        {"source": "FENDER", "expected": "FEN", "max_length": 3},
        # source string is less than max length
        {"source": "Mayer", "expected": "Mayer", "max_length": 6},
    ]

    for test_scenario in test_scenarios:
        cleaned_value = mifid.format_data_value(
            logger,
            test_scenario["max_length"],
            test_scenario["source"],
        )

        assert cleaned_value == test_scenario["expected"]


def test_format_data_values():
    data = __get_test_data_for_format_data()

    # mock up the trades data
    with open(
        settings.CSV_OUTPUT_CONFIG_FILE, "r", encoding="utf-8"
    ) as config_file_handler:
        csv_output_config_file = yaml.load(config_file_handler, Loader=yaml.FullLoader)

    assert csv_output_config_file is not None

    formatted_df = mifid.format_data_values(
        logger,
        csv_output_config_file,
        data["trades_df"],
    )

    assert_frame_equal(
        formatted_df,
        data["expected_df"],
        check_names=False,
        check_like=True,
        check_index_type=False,
    )


def __get_test_data_for_format_data():

    raw = {
        "Record Code": pd.Series(
            [
                "ASX       ",
                "TFX        ",
                "JCM",
                "JM",
            ],
        ),
        "Release Code": pd.Series(
            [
                "ASX       ",
                "KWS        ",
                "JCM",
                "EC",
            ],
        ),
        "Processing Date": pd.Series(
            [
                "20221225          ",
                "20211225     ",
                "20211225",
                "202012",
            ],
        ),
        "Processing time": pd.Series(
            [
                "202212          ",
                "20211225       ",
                "20211225",
                "202012",
            ],
        ),
        "Business date": pd.Series(
            [
                "20221225          ",
                "20211225       ",
                "20211225",
                "202012",
            ],
        ),
        "BCC Entity": pd.Series(
            [
                "ASX       ",
                "KWS        ",
                "JCM",
                "EC",
            ],
        ),
        "Exchange ID": pd.Series(
            [
                "ASX       ",
                "TFKX            ",
                "JCM",
                "JM",
            ],
        ),
        "Transaction Type": pd.Series(
            [
                "WALT GRACE SUBMARINE TEST PART 1 FOLLOWED BY WILD BLUE",
                "BELIEF        ",
                "WAIT TIL TOMORROW WHY GEORGiAneonBELIEF",
                "NEON",
            ],
        ),
        "Source Transaction ID/External Number": pd.Series(
            ["123456", "two        ", "three      ", "X"],
        ),
        "Investment Decision within Firm": pd.Series(
            [
                "20220131,AU,ASX,IEQ,200,20220                      ",
                "20220131,AU,ASX,IEQ,200,202207529422658841495929   ",
                "20220131,AU,ASX,IEQ,200,20220",  # exactly
                "Wait til Tomorrow",
            ],
        ),
        "Execution within Firm": pd.Series(
            [
                "20220131,AU,ASX,IEQ,200,20220                      ",
                "20220131,AU,ASX,IEQ,200,202207529422658841495929   ",
                "20220131,AU,ASX,IEQ,200,20220",  # exactly
                "Wait til Tomorrow",
            ],
        ),
        "Trading Date Time": pd.Series(
            [
                "7529422658841489334742211 Wait til Tomorrow",
                "7529422658841489334742211",
                "two",
                "M",
            ],
        ),
        "Instrument Identification Code": pd.Series(
            [
                "7529422658841489334742211 Wait til Tomorrow",
                "7529422658841489334742211",
                "two",
                "M",
            ],
        ),
        "Product ID": pd.Series(
            ["WAIT TIL TOMORROW", "WHY GEORGIA", "      three", "X      "],
        ),
    }

    expected_df = pd.DataFrame(
        {
            "Record Code": pd.Series(
                [
                    "ASX",
                    "TFX",
                    "JCM",
                    "JM",
                ],
            ),
            "Release Code": pd.Series(
                [
                    "ASX",
                    "KWS",
                    "JCM",
                    "EC",
                ],
            ),
            "Processing Date": pd.Series(
                [
                    "20221225",
                    "20211225",
                    "20211225",
                    "202012",
                ],
            ),
            "Processing Time": pd.Series(
                [
                    "202212",
                    "202112",
                    "202112",
                    "202012",
                ],
            ),
            "Business Date": pd.Series(
                [
                    "20221225",
                    "20211225",
                    "20211225",
                    "202012",
                ],
            ),
            "BCC Entity": pd.Series(
                [
                    "ASX",
                    "KWS",
                    "JCM",
                    "EC",
                ],
            ),
            "Exchange ID": pd.Series(
                [
                    "ASX ",
                    "TFKX",
                    "JCM",
                    "JM",
                ],
            ),
            "Transaction Type": pd.Series(
                [
                    "WALT GRACE SUBMARINE",
                    "BELIEF        ",
                    "WAIT TIL TOMORROW WH",
                    "NEON",
                ],
            ),
            "Source Transaction ID/External Number": pd.Series(
                [
                    "123456",
                    "two        ",
                    "three      ",
                    "X",
                ],
            ),
            "Product ID": pd.Series(
                ["WAIT TIL TOM", "WHY GEORGIA", "      three", "X      "],
            ),
        }
    )

    trades_df = pd.DataFrame(
        {
            # Give strings which have extra white spaces,
            # exactly the same length as expected and less.
            # For the latter, we expect the padding being
            # added to ensure it matches the number of characters
            # defined in the format
            "Record Code": pd.Series(
                raw["Record Code"],
            ),
            "Release Code": pd.Series(
                raw["Release Code"],
            ),
            "Processing Date": pd.Series(
                raw["Processing Date"],
            ),
            "Processing Time": pd.Series(
                raw["Processing time"],
            ),
            "Business Date": pd.Series(
                raw["Business date"],
            ),
            "BCC Entity": pd.Series(
                raw["BCC Entity"],
            ),
            "Exchange ID": pd.Series(
                raw["Exchange ID"],
            ),
            "Transaction Type": pd.Series(
                raw["Transaction Type"],
            ),
            "Source Transaction ID/External Number": pd.Series(
                raw["Source Transaction ID/External Number"],
            ),
            "Product ID": pd.Series(
                raw["Product ID"],
            ),
        }
    )

    return {
        "trades_df": trades_df,
        "expected_df": expected_df,
    }


def test_populate_populate_product_ids_negative_test():
    database_args = {
        "MSFIL_LIB": "vd03MSFIL",
    }

    mocked_external_numbers = [
        33427818,
        33428905,
        746090713003234119,
    ]

    # mock up the trades data
    trades_df = pd.DataFrame(
        {
            "tsnumber": pd.Series(mocked_external_numbers),
            "etc": pd.Series(
                [
                    "one",
                    "two",
                    "three",
                ],
            ),
        }
    )

    ## Negative test with a non existent processing date
    non_existent_processing_date = 0
    sql_template = None
    with open(settings.SQL_FILE, "r", encoding="utf-8") as sql_file_handler:
        sql_template = yaml.load(sql_file_handler, Loader=yaml.FullLoader)

    assert sql_template is not None

    trades_df = mifid.populate_product_ids(
        logger,
        db_connector,
        sql_template,
        trades_df,
        non_existent_processing_date,
        **database_args,
    )

    assert trades_df is not None

    expected_identification_code_series = pd.Series(
        [
            np.NaN,
            np.NaN,
            np.NaN,
        ],
        name="Product Id",
    )

    assert_series_equal(
        expected_identification_code_series, trades_df["Product Id"], check_dtype=False
    )


@mock.patch.object(mifid, "get_df_from_sql")
def test_populate_product_ids_positive_test(mock_get_df_from_sql):

    # mock up the trades data
    mocked_external_numbers = [
        1110001545,
        1110001545,
    ]

    trades_df = pd.DataFrame(
        {
            "tsnumber": pd.Series(mocked_external_numbers),
            "etc": pd.Series(
                [
                    "random-John",
                    "random-Mayer",
                ],
            ),
        }
    )

    sql_template = None
    with open(settings.SQL_FILE, "r", encoding="utf-8") as sql_file_handler:
        sql_template = yaml.load(sql_file_handler, Loader=yaml.FullLoader)

    assert sql_template is not None
    database_args = {
        "MSFIL_LIB": "vd03MSFIL",
    }

    # This simulates a data frame passed back as a result of a db sql
    # query for instrument identification code. It accurately demonstrates
    # the problem we had with the MICS db table's column,
    # ALTERNATE_PRODUCT_ID.identification is declared as 40 chars
    # but the output file for this MIFID data file is only given 30 characters.
    mocked_df_return_value = pd.DataFrame(
        data={
            "product_id": ["AU000000BHP4                   "],
        }
    )

    mock_get_df_from_sql.return_value = mocked_df_return_value

    trades_df = mifid.populate_product_ids(
        logger,
        db_connector,
        sql_template,
        trades_df,
        processing_date,
        **database_args,
    )

    expected_identification_code_series = pd.Series(
        np.array(
            [
                "AU000000BHP4",
                "AU000000BHP4",
            ],
        ),
        name="Product Id",
    )

    assert_series_equal(
        expected_identification_code_series,
        trades_df["Product Id"],
    )
DaFileName:'/home/windmill/adm-471307/projects/python-common/tests/test_utils.py'
"""Unit tests for utils.__init__ module."""
import os
from datetime import datetime

import freezegun
import mock
from parameterized import parameterized

from python_common import settings, utils


class TestUtils:
    """Unit tests for classes and functions in utils module."""

    @staticmethod
    def datetime_to_date(datetime_obj):
        """A helper function to convert datetime object to a string."""
        return datetime.strftime(datetime_obj, "%Y-%m-%d")

    @parameterized.expand(
        [
            (-1, None, "2021-10-11"),
            (-1, datetime(2021, 10, 18), "2021-10-15"),
            (-2, None, "2021-10-08"),
            (0, None, "2021-10-12"),
        ]
    )
    def test_get_business_day(self, offset, other_time, expected_date):
        """Tests get_business_day function."""
        with freezegun.freeze_time("2021-10-12 10:00:00"):
            # Freeze time to a Tuesday.
            assert expected_date == self.datetime_to_date(
                utils.get_business_day(offset, other_time)
            )

    @parameterized.expand(
        [(-1, "-1"), ("", ""), (" 39.0", "39"), (39.123, "39.123"), (3999.0, "3999")]
    )
    def test_safe_strip(self, text_or_number, expected_result):
        """Tests safe_strip function."""
        assert expected_result == utils.safe_strip(text_or_number)

    @parameterized.expand(
        [
            (None, ""),
            ([], ""),
            (["foo", "bar", 123], "'foo','bar','123'"),
        ]
    )
    def test_add_quotes_to_list(self, string_list, expected_result):
        """Tests add_quotes_to_list function."""
        assert utils.add_quotes_to_list(string_list) == expected_result

    @parameterized.expand([(None, 0), ("-2.0", -2.0), ("foo", 0), (12.12, 12.12)])
    def test_safe_float(self, test_data, expected_result):
        """Tests add_quotes_to_list function."""
        assert utils.safe_float(test_data) == expected_result

    def test_get_mics_mapping_db_schema(self):
        """Tests get_mics_mapping_db_schema function."""

        # Please note: before running this test, assume following has been run:
        # $ . /fcs/global/`hostname`/common
        #
        prod_envs = {"MICS_DEFAULT": "PROD"}
        with mock.patch.dict(os.environ, prod_envs):
            assert "VP01MHFIL" == utils.get_mics_mapping_db_schema("MH")

        uat_envs = {"MICS_DEFAULT": "ACCEPT"}
        with mock.patch.dict(os.environ, uat_envs):
            assert "VA01MHFIL" == utils.get_mics_mapping_db_schema("MH")

    def test_get_mics_host_name(self):
        prod_envs = {"MICS_DEFAULT": "PROD"}
        with mock.patch.dict(os.environ, prod_envs):
            assert "mics" == utils.get_mics_host_name()

        uat_envs = {"MICS_DEFAULT": "ACCEPT"}
        with mock.patch.dict(os.environ, uat_envs):
            assert "mics-uat" == utils.get_mics_host_name()

    def test_convert_list_to_dict(self):
        assert {"foo": 1, "bar": 2} == utils.convert_list_to_dict(["foo", 1, "bar", 2])

    def test_get_all_templates(self):
        tmpl_dir = os.path.join(settings.SOURCE_ROOT, "templates")
        tmpl_files = utils.get_all_templates(tmpl_dir)
        assert {"done": "done.yaml", "error": "error.yaml"} == tmpl_files

    def test_return_value(self):
        response = utils.Response(utils.Status.NOT_AVAILABLE)
        assert response.sys_return_value() == settings.DEPENDS_RETRY

        response.status = utils.Status.OK
        assert response.sys_return_value() == settings.DEPENDS_SUCCESS

        response.status = "any"
        assert response.sys_return_value() == "any"
DaFileName:'/home/windmill/adm-471307/projects/tfx/tfx/reconciliation_and_exchange_files.py'
import argparse
import os
import sys
from pathlib import Path

# from our local libraries
from python_common import password as password
from python_common import log as LOGGER
import tfx


# TFX 36
is_abn_ftp = False 

def __manually_delete_file__(LOGGER, filename):
    message = "Deleting temporary file, " + filename
    LOGGER.info(message)
    os.remove(filename)


def acquire_arguments():
    # initialize parser
    parser = argparse.ArgumentParser(
        description="Trade, Cash & Position reconciliation vs exchange" + " files"
    )

    parser.add_argument(
        "-operation",
        required=True,
        type=tfx.validate_operation,
        help="Operation: 'download' or 'validate'. When \"-operation download\""
        + " is provided the process will "
        + " download the reconciliation and exchange files."
        + " \nWhen \"-operation validate\", the"
        + " process will do a post validation of the reconciliation and"
        + " exchange files to ensure they have been downloaded to the expected"
        + " paths."
    )

    parser.add_argument(
        "-t",
        action="store_true",
        help="Test mode. Only applies to operation=download. It will download"
        + " the margin reference file, report if the zip file is corrupted."
        + " No file copy to occur to the target host.",
    )

    parser.add_argument(
        "-LAST_PROC_DATE",
        required=True,
        type=tfx.validate_date,
        help="Last processing date in format of YYYYMMDD",
    )
    parser.add_argument(
        "-source_host",
        required=True,
        help="FTP ip or hostname of the source server to get the files from",
    )
    parser.add_argument("-rde_dbi_name", required=True, help="RDE DBI Name")
    parser.add_argument(
        "-rde_schema", default="reference_data", help="RDE DBI Schema Name"
    )
    parser.add_argument("-rde_port", default="5444", help="RDE DBI port")
    parser.add_argument(
        "-destination_host",
        type=str,
        required=True,
        help="destination server's ip address or hostname. This is where the files will be stored in ultimately",
    )
    parser.add_argument(
        "-destination_host_user_id",
        type=str,
        default="jpauto",
        help="Output server's user id to use when transferring the output"
        + " files over",
    )
    parser.add_argument(
        "-destination_output_dir",
        type=str,
        default="/tmp",
        help="Output directory to store the downloaded files in. This would be the directory of the destination server",
    )

    parser.add_argument(
        "-margin_file_host",
        type=str,
        help="The server hostname or IP for the margin file to be copied to"
        + " files over",
    )
    parser.add_argument(
        "-margin_file_host_user_id",
        type=str,
        default="jpauto",
        help="The user name to be used to copy the margin file to the margin file host"
        + " files over",
    )
    parser.add_argument(
        "-margin_file_output_dir",
        type=str,
        help="The directory path for the margin file to be copied to",
    )
    parser.add_argument(
        "-validation_email_list",
        type=str,
        help='When a validation (ie. -operation "validation") is to be done, an email'
        + " list is required so that failures can be emailed to",
    )

    return parser.parse_args()


def main():
    return_code = None
    LOGGER.init_logger(__file__)

    # fit in the credentials for the source ftp server
    args = acquire_arguments()

    args.t=True # TODO - setting this on - test mode
    if args.t:
        LOGGER.info("RUNNING ON TEST MODE.")

    #    Connect to the database using command line arguments for -rde_dbi_name,
    #    -rde_schema, -rde_port and database credentials.
    #    Get the database credentials by querying the pgp module with the
    #    value of -rde_dbi_name
    #credential = password.get_password(args.rde_dbi_name) # Temp removed

    ##if (tfx.credentials_defined(credential) == 1): # Temp removed
    if (1==1):        
        #args.rde_db_user = credential[0]   # Temp removed
        #args.rde_db_password = credential[1] # Temp removed
        args.rde_db_user = "reference_data_user", # Temp removed 
        args.rde_db_password = "F0vcMRyF"         # Temp removed

        ftp_accounts = tfx.get_ftp_accounts(LOGGER, args, is_abn_ftp)

        for ftp_account in ftp_accounts:
            print(f" Currently working with ftp profile: {ftp_account}")
            args.source_path = ""
            args.source_user_id = ftp_account["ftp_user_id"]
            args.source_user_password = ftp_account["ftp_password"]
            args.LAST_PROC_DATE = str(args.LAST_PROC_DATE)


            ftp_key = tfx.get_ftp_accounts(LOGGER, args, is_abn_ftp)

            print(f"I got the ftp key(s):\n {ftp_key}")

            if args.operation == "download":
                target_directory = tfx.get_target_directory(
                    {
                        "member_code": str(ftp_account["tfx_member_code"]),
                        "last_processing_date": args.LAST_PROC_DATE,
                    }
                )

                LOGGER.info("Performing download of reconciliation and exchange files.")
                return_code = tfx.perform_reconciliation_and_exchange_files_download(
                    LOGGER, ftp_account, args, target_directory
                )
            elif args.operation == "validate":
                target_directory = tfx.get_target_directory(
                    {
                        "target_base_dir": args.destination_output_dir,
                        "member_code": str(ftp_account["tfx_member_code"]),
                        "last_processing_date": args.LAST_PROC_DATE,
                    }
                )

                if args.validation_email_list is None:
                    message = "Error: -validation_email_list argument must be provided"
                    LOGGER.error(message)

                    return_code = tfx.get_failure_return_code()
                else:
                    LOGGER.info(
                        "Performing validation of reconciliation and exchange files."
                    )

                    # return success - if all checks done without failures (ie. ftp  can be logged onto and checked for file)
                    return_code = tfx.perform_reconciliation_and_exchange_files_validation(
                        LOGGER, ftp_account, args, target_directory, args.validation_email_list
                    )
    else:
        LOGGER.error(
            "No ftp credentials can be retrieved off the database name"
            + "(rde_dbi_name argument), '"
            + args.rde_dbi_name
            + "'"
        )
        return_code = tfx.update_return_code(return_code, tfx.get_failure_return_code())

    tfx.log_overall_status(LOGGER, return_code)
    sys.exit(return_code)


if __name__ == "__main__":
    main()
Editor is loading...