Untitled

mail@pastecode.io avatar
unknown
plain_text
2 years ago
16 kB
1
Indexable
python-common=mayer

DaFileName:'/home/windmill/adm-471307/projects/python-common/tests/test_utils.py'
"""Unit tests for utils.__init__ module."""
import os
from datetime import datetime

import freezegun
import mock
from parameterized import parameterized

from python_common import settings, utils


class TestUtils:
    """Unit tests for classes and functions in utils module."""

    @staticmethod
    def datetime_to_date(datetime_obj):
        """A helper function to convert datetime object to a string."""
        return datetime.strftime(datetime_obj, "%Y-%m-%d")

    @parameterized.expand(
        [
            (-1, None, "2021-10-11"),
            (-1, datetime(2021, 10, 18), "2021-10-15"),
            (-2, None, "2021-10-08"),
            (0, None, "2021-10-12"),
        ]
    )
    def test_get_business_day(self, offset, other_time, expected_date):
        """Tests get_business_day function."""
        with freezegun.freeze_time("2021-10-12 10:00:00"):
            # Freeze time to a Tuesday.
            assert expected_date == self.datetime_to_date(
                utils.get_business_day(offset, other_time)
            )

    @parameterized.expand(
        [(-1, "-1"), ("", ""), (" 39.0", "39"), (39.123, "39.123"), (3999.0, "3999")]
    )
    def test_safe_strip(self, text_or_number, expected_result):
        """Tests safe_strip function."""
        assert expected_result == utils.safe_strip(text_or_number)

    @parameterized.expand(
        [
            (None, ""),
            ([], ""),
            (["foo", "bar", 123], "'foo','bar','123'"),
        ]
    )
    def test_add_quotes_to_list(self, string_list, expected_result):
        """Tests add_quotes_to_list function."""
        assert utils.add_quotes_to_list(string_list) == expected_result

    @parameterized.expand([(None, 0), ("-2.0", -2.0), ("foo", 0), (12.12, 12.12)])
    def test_safe_float(self, test_data, expected_result):
        """Tests add_quotes_to_list function."""
        assert utils.safe_float(test_data) == expected_result

    def test_get_mics_mapping_db_schema(self):
        """Tests get_mics_mapping_db_schema function."""

        # Please note: before running this test, assume following has been run:
        # $ . /fcs/global/`hostname`/common
        #
        prod_envs = {"MICS_DEFAULT": "PROD"}
        with mock.patch.dict(os.environ, prod_envs):
            assert "VP01MHFIL" == utils.get_mics_mapping_db_schema("MH")

        uat_envs = {"MICS_DEFAULT": "ACCEPT"}
        with mock.patch.dict(os.environ, uat_envs):
            assert "VA01MHFIL" == utils.get_mics_mapping_db_schema("MH")

    def test_get_mics_host_name(self):
        prod_envs = {"MICS_DEFAULT": "PROD"}
        with mock.patch.dict(os.environ, prod_envs):
            assert "mics" == utils.get_mics_host_name()

        uat_envs = {"MICS_DEFAULT": "ACCEPT"}
        with mock.patch.dict(os.environ, uat_envs):
            assert "mics-uat" == utils.get_mics_host_name()

    def test_convert_list_to_dict(self):
        assert {"foo": 1, "bar": 2} == utils.convert_list_to_dict(["foo", 1, "bar", 2])

    def test_get_all_templates(self):
        tmpl_dir = os.path.join(settings.SOURCE_ROOT, "templates")
        tmpl_files = utils.get_all_templates(tmpl_dir)
        assert {"done": "done.yaml", "error": "error.yaml"} == tmpl_files

    def test_return_value(self):
        response = utils.Response(utils.Status.NOT_AVAILABLE)
        assert response.sys_return_value() == settings.DEPENDS_RETRY

        response.status = utils.Status.OK
        assert response.sys_return_value() == settings.DEPENDS_SUCCESS

        response.status = "any"
        assert response.sys_return_value() == "any"



dividends-uploader: mayer
DaFileName:'/home/windmill/adm-471307/projects/dividends-uploader/dividends_uploader/utils/sql_collection.py'




import os

import yaml

SQL_YAML = "sql.yaml"


class DictWrapper(dict):
    """Converts multi layered dict to dot accessible object."""

    def __getattr__(self, name):
        value = self.get(name)
        if isinstance(value, dict):
            return DictWrapper(value)

        else:
            return value


class SqlCollection:
    def __init__(self):
        sql_yaml = os.path.join(os.path.dirname(__file__), SQL_YAML)
        self.yaml_content = yaml.safe_load(open(sql_yaml))
        self.collection = DictWrapper(self.yaml_content)


collection = SqlCollection().collection
Mifid - mayer

DaFileName:'/home/windmill/adm-471307/projects/python-common/tests/test_utils.py'
"""Unit tests for utils.__init__ module."""
import os
from datetime import datetime

import freezegun
import mock
from parameterized import parameterized

from python_common import settings, utils


class TestUtils:
    """Unit tests for classes and functions in utils module."""

    @staticmethod
    def datetime_to_date(datetime_obj):
        """A helper function to convert datetime object to a string."""
        return datetime.strftime(datetime_obj, "%Y-%m-%d")

    @parameterized.expand(
        [
            (-1, None, "2021-10-11"),
            (-1, datetime(2021, 10, 18), "2021-10-15"),
            (-2, None, "2021-10-08"),
            (0, None, "2021-10-12"),
        ]
    )
    def test_get_business_day(self, offset, other_time, expected_date):
        """Tests get_business_day function."""
        with freezegun.freeze_time("2021-10-12 10:00:00"):
            # Freeze time to a Tuesday.
            assert expected_date == self.datetime_to_date(
                utils.get_business_day(offset, other_time)
            )

    @parameterized.expand(
        [(-1, "-1"), ("", ""), (" 39.0", "39"), (39.123, "39.123"), (3999.0, "3999")]
    )
    def test_safe_strip(self, text_or_number, expected_result):
        """Tests safe_strip function."""
        assert expected_result == utils.safe_strip(text_or_number)

    @parameterized.expand(
        [
            (None, ""),
            ([], ""),
            (["foo", "bar", 123], "'foo','bar','123'"),
        ]
    )
    def test_add_quotes_to_list(self, string_list, expected_result):
        """Tests add_quotes_to_list function."""
        assert utils.add_quotes_to_list(string_list) == expected_result

    @parameterized.expand([(None, 0), ("-2.0", -2.0), ("foo", 0), (12.12, 12.12)])
    def test_safe_float(self, test_data, expected_result):
        """Tests add_quotes_to_list function."""
        assert utils.safe_float(test_data) == expected_result

    def test_get_mics_mapping_db_schema(self):
        """Tests get_mics_mapping_db_schema function."""

        # Please note: before running this test, assume following has been run:
        # $ . /fcs/global/`hostname`/common
        #
        prod_envs = {"MICS_DEFAULT": "PROD"}
        with mock.patch.dict(os.environ, prod_envs):
            assert "VP01MHFIL" == utils.get_mics_mapping_db_schema("MH")

        uat_envs = {"MICS_DEFAULT": "ACCEPT"}
        with mock.patch.dict(os.environ, uat_envs):
            assert "VA01MHFIL" == utils.get_mics_mapping_db_schema("MH")

    def test_get_mics_host_name(self):
        prod_envs = {"MICS_DEFAULT": "PROD"}
        with mock.patch.dict(os.environ, prod_envs):
            assert "mics" == utils.get_mics_host_name()

        uat_envs = {"MICS_DEFAULT": "ACCEPT"}
        with mock.patch.dict(os.environ, uat_envs):
            assert "mics-uat" == utils.get_mics_host_name()

    def test_convert_list_to_dict(self):
        assert {"foo": 1, "bar": 2} == utils.convert_list_to_dict(["foo", 1, "bar", 2])

    def test_get_all_templates(self):
        tmpl_dir = os.path.join(settings.SOURCE_ROOT, "templates")
        tmpl_files = utils.get_all_templates(tmpl_dir)
        assert {"done": "done.yaml", "error": "error.yaml"} == tmpl_files

    def test_return_value(self):
        response = utils.Response(utils.Status.NOT_AVAILABLE)
        assert response.sys_return_value() == settings.DEPENDS_RETRY

        response.status = utils.Status.OK
        assert response.sys_return_value() == settings.DEPENDS_SUCCESS

        response.status = "any"
        assert response.sys_return_value() == "any"
tfx - mayer


DaFileName:'/home/windmill/adm-471307/projects/tfx/tfx/reconciliation_and_exchange_files.py'
import argparse
import os
import sys
from pathlib import Path

# from our local libraries
from python_common import password as password
from python_common import log as LOGGER
import tfx


# TFX 36
is_abn_ftp = False 

def __manually_delete_file__(LOGGER, filename):
    message = "Deleting temporary file, " + filename
    LOGGER.info(message)
    os.remove(filename)


def acquire_arguments():
    # initialize parser
    parser = argparse.ArgumentParser(
        description="Trade, Cash & Position reconciliation vs exchange" + " files"
    )

    parser.add_argument(
        "-operation",
        required=True,
        type=tfx.validate_operation,
        help="Operation: 'download' or 'validate'. When \"-operation download\""
        + " is provided the process will "
        + " download the reconciliation and exchange files."
        + " \nWhen \"-operation validate\", the"
        + " process will do a post validation of the reconciliation and"
        + " exchange files to ensure they have been downloaded to the expected"
        + " paths."
    )

    parser.add_argument(
        "-t",
        action="store_true",
        help="Test mode. Only applies to operation=download. It will download"
        + " the margin reference file, report if the zip file is corrupted."
        + " No file copy to occur to the target host.",
    )

    parser.add_argument(
        "-LAST_PROC_DATE",
        required=True,
        type=tfx.validate_date,
        help="Last processing date in format of YYYYMMDD",
    )
    parser.add_argument(
        "-source_host",
        required=True,
        help="FTP ip or hostname of the source server to get the files from",
    )
    parser.add_argument("-rde_dbi_name", required=True, help="RDE DBI Name")
    parser.add_argument(
        "-rde_schema", default="reference_data", help="RDE DBI Schema Name"
    )
    parser.add_argument("-rde_port", default="5444", help="RDE DBI port")
    parser.add_argument(
        "-destination_host",
        type=str,
        required=True,
        help="destination server's ip address or hostname. This is where the files will be stored in ultimately",
    )
    parser.add_argument(
        "-destination_host_user_id",
        type=str,
        default="jpauto",
        help="Output server's user id to use when transferring the output"
        + " files over",
    )
    parser.add_argument(
        "-destination_output_dir",
        type=str,
        default="/tmp",
        help="Output directory to store the downloaded files in. This would be the directory of the destination server",
    )

    parser.add_argument(
        "-margin_file_host",
        type=str,
        help="The server hostname or IP for the margin file to be copied to"
        + " files over",
    )
    parser.add_argument(
        "-margin_file_host_user_id",
        type=str,
        default="jpauto",
        help="The user name to be used to copy the margin file to the margin file host"
        + " files over",
    )
    parser.add_argument(
        "-margin_file_output_dir",
        type=str,
        help="The directory path for the margin file to be copied to",
    )
    parser.add_argument(
        "-validation_email_list",
        type=str,
        help='When a validation (ie. -operation "validation") is to be done, an email'
        + " list is required so that failures can be emailed to",
    )

    return parser.parse_args()


def main():
    return_code = None
    LOGGER.init_logger(__file__)

    # fit in the credentials for the source ftp server
    args = acquire_arguments()

    args.t=True # TODO - setting this on - test mode
    if args.t:
        LOGGER.info("RUNNING ON TEST MODE.")

    #    Connect to the database using command line arguments for -rde_dbi_name,
    #    -rde_schema, -rde_port and database credentials.
    #    Get the database credentials by querying the pgp module with the
    #    value of -rde_dbi_name
    #credential = password.get_password(args.rde_dbi_name) # Temp removed

    ##if (tfx.credentials_defined(credential) == 1): # Temp removed
    if (1==1):        
        #args.rde_db_user = credential[0]   # Temp removed
        #args.rde_db_password = credential[1] # Temp removed
        args.rde_db_user = "reference_data_user", # Temp removed 
        args.rde_db_password = "F0vcMRyF"         # Temp removed

        ftp_accounts = tfx.get_ftp_accounts(LOGGER, args, is_abn_ftp)

        for ftp_account in ftp_accounts:
            print(f" Currently working with ftp profile: {ftp_account}")
            args.source_path = ""
            args.source_user_id = ftp_account["ftp_user_id"]
            args.source_user_password = ftp_account["ftp_password"]
            args.LAST_PROC_DATE = str(args.LAST_PROC_DATE)


            ftp_key = tfx.get_ftp_accounts(LOGGER, args, is_abn_ftp)

            print(f"I got the ftp key(s):\n {ftp_key}")

            if args.operation == "download":
                target_directory = tfx.get_target_directory(
                    {
                        "member_code": str(ftp_account["tfx_member_code"]),
                        "last_processing_date": args.LAST_PROC_DATE,
                    }
                )

                LOGGER.info("Performing download of reconciliation and exchange files.")
                return_code = tfx.perform_reconciliation_and_exchange_files_download(
                    LOGGER, ftp_account, args, target_directory
                )
            elif args.operation == "validate":
                target_directory = tfx.get_target_directory(
                    {
                        "target_base_dir": args.destination_output_dir,
                        "member_code": str(ftp_account["tfx_member_code"]),
                        "last_processing_date": args.LAST_PROC_DATE,
                    }
                )

                if args.validation_email_list is None:
                    message = "Error: -validation_email_list argument must be provided"
                    LOGGER.error(message)

                    return_code = tfx.get_failure_return_code()
                else:
                    LOGGER.info(
                        "Performing validation of reconciliation and exchange files."
                    )

                    # return success - if all checks done without failures (ie. ftp  can be logged onto and checked for file)
                    return_code = tfx.perform_reconciliation_and_exchange_files_validation(
                        LOGGER, ftp_account, args, target_directory, args.validation_email_list
                    )
    else:
        LOGGER.error(
            "No ftp credentials can be retrieved off the database name"
            + "(rde_dbi_name argument), '"
            + args.rde_dbi_name
            + "'"
        )
        return_code = tfx.update_return_code(return_code, tfx.get_failure_return_code())

    tfx.log_overall_status(LOGGER, return_code)
    sys.exit(return_code)


if __name__ == "__main__":
    main()