python-common=mayer
DaFileName:'/home/windmill/adm-471307/projects/python-common/tests/test_utils.py'
"""Unit tests for utils.__init__ module."""
import os
from datetime import datetime
import freezegun
import mock
from parameterized import parameterized
from python_common import settings, utils
class TestUtils:
"""Unit tests for classes and functions in utils module."""
@staticmethod
def datetime_to_date(datetime_obj):
"""A helper function to convert datetime object to a string."""
return datetime.strftime(datetime_obj, "%Y-%m-%d")
@parameterized.expand(
[
(-1, None, "2021-10-11"),
(-1, datetime(2021, 10, 18), "2021-10-15"),
(-2, None, "2021-10-08"),
(0, None, "2021-10-12"),
]
)
def test_get_business_day(self, offset, other_time, expected_date):
"""Tests get_business_day function."""
with freezegun.freeze_time("2021-10-12 10:00:00"):
# Freeze time to a Tuesday.
assert expected_date == self.datetime_to_date(
utils.get_business_day(offset, other_time)
)
@parameterized.expand(
[(-1, "-1"), ("", ""), (" 39.0", "39"), (39.123, "39.123"), (3999.0, "3999")]
)
def test_safe_strip(self, text_or_number, expected_result):
"""Tests safe_strip function."""
assert expected_result == utils.safe_strip(text_or_number)
@parameterized.expand(
[
(None, ""),
([], ""),
(["foo", "bar", 123], "'foo','bar','123'"),
]
)
def test_add_quotes_to_list(self, string_list, expected_result):
"""Tests add_quotes_to_list function."""
assert utils.add_quotes_to_list(string_list) == expected_result
@parameterized.expand([(None, 0), ("-2.0", -2.0), ("foo", 0), (12.12, 12.12)])
def test_safe_float(self, test_data, expected_result):
"""Tests add_quotes_to_list function."""
assert utils.safe_float(test_data) == expected_result
def test_get_mics_mapping_db_schema(self):
"""Tests get_mics_mapping_db_schema function."""
# Please note: before running this test, assume following has been run:
# $ . /fcs/global/`hostname`/common
#
prod_envs = {"MICS_DEFAULT": "PROD"}
with mock.patch.dict(os.environ, prod_envs):
assert "VP01MHFIL" == utils.get_mics_mapping_db_schema("MH")
uat_envs = {"MICS_DEFAULT": "ACCEPT"}
with mock.patch.dict(os.environ, uat_envs):
assert "VA01MHFIL" == utils.get_mics_mapping_db_schema("MH")
def test_get_mics_host_name(self):
prod_envs = {"MICS_DEFAULT": "PROD"}
with mock.patch.dict(os.environ, prod_envs):
assert "mics" == utils.get_mics_host_name()
uat_envs = {"MICS_DEFAULT": "ACCEPT"}
with mock.patch.dict(os.environ, uat_envs):
assert "mics-uat" == utils.get_mics_host_name()
def test_convert_list_to_dict(self):
assert {"foo": 1, "bar": 2} == utils.convert_list_to_dict(["foo", 1, "bar", 2])
def test_get_all_templates(self):
tmpl_dir = os.path.join(settings.SOURCE_ROOT, "templates")
tmpl_files = utils.get_all_templates(tmpl_dir)
assert {"done": "done.yaml", "error": "error.yaml"} == tmpl_files
def test_return_value(self):
response = utils.Response(utils.Status.NOT_AVAILABLE)
assert response.sys_return_value() == settings.DEPENDS_RETRY
response.status = utils.Status.OK
assert response.sys_return_value() == settings.DEPENDS_SUCCESS
response.status = "any"
assert response.sys_return_value() == "any"
dividends-uploader: mayer
DaFileName:'/home/windmill/adm-471307/projects/dividends-uploader/dividends_uploader/utils/sql_collection.py'
import os
import yaml
SQL_YAML = "sql.yaml"
class DictWrapper(dict):
"""Converts multi layered dict to dot accessible object."""
def __getattr__(self, name):
value = self.get(name)
if isinstance(value, dict):
return DictWrapper(value)
else:
return value
class SqlCollection:
def __init__(self):
sql_yaml = os.path.join(os.path.dirname(__file__), SQL_YAML)
self.yaml_content = yaml.safe_load(open(sql_yaml))
self.collection = DictWrapper(self.yaml_content)
collection = SqlCollection().collection
Mifid - mayer
DaFileName:'/home/windmill/adm-471307/projects/python-common/tests/test_utils.py'
"""Unit tests for utils.__init__ module."""
import os
from datetime import datetime
import freezegun
import mock
from parameterized import parameterized
from python_common import settings, utils
class TestUtils:
"""Unit tests for classes and functions in utils module."""
@staticmethod
def datetime_to_date(datetime_obj):
"""A helper function to convert datetime object to a string."""
return datetime.strftime(datetime_obj, "%Y-%m-%d")
@parameterized.expand(
[
(-1, None, "2021-10-11"),
(-1, datetime(2021, 10, 18), "2021-10-15"),
(-2, None, "2021-10-08"),
(0, None, "2021-10-12"),
]
)
def test_get_business_day(self, offset, other_time, expected_date):
"""Tests get_business_day function."""
with freezegun.freeze_time("2021-10-12 10:00:00"):
# Freeze time to a Tuesday.
assert expected_date == self.datetime_to_date(
utils.get_business_day(offset, other_time)
)
@parameterized.expand(
[(-1, "-1"), ("", ""), (" 39.0", "39"), (39.123, "39.123"), (3999.0, "3999")]
)
def test_safe_strip(self, text_or_number, expected_result):
"""Tests safe_strip function."""
assert expected_result == utils.safe_strip(text_or_number)
@parameterized.expand(
[
(None, ""),
([], ""),
(["foo", "bar", 123], "'foo','bar','123'"),
]
)
def test_add_quotes_to_list(self, string_list, expected_result):
"""Tests add_quotes_to_list function."""
assert utils.add_quotes_to_list(string_list) == expected_result
@parameterized.expand([(None, 0), ("-2.0", -2.0), ("foo", 0), (12.12, 12.12)])
def test_safe_float(self, test_data, expected_result):
"""Tests add_quotes_to_list function."""
assert utils.safe_float(test_data) == expected_result
def test_get_mics_mapping_db_schema(self):
"""Tests get_mics_mapping_db_schema function."""
# Please note: before running this test, assume following has been run:
# $ . /fcs/global/`hostname`/common
#
prod_envs = {"MICS_DEFAULT": "PROD"}
with mock.patch.dict(os.environ, prod_envs):
assert "VP01MHFIL" == utils.get_mics_mapping_db_schema("MH")
uat_envs = {"MICS_DEFAULT": "ACCEPT"}
with mock.patch.dict(os.environ, uat_envs):
assert "VA01MHFIL" == utils.get_mics_mapping_db_schema("MH")
def test_get_mics_host_name(self):
prod_envs = {"MICS_DEFAULT": "PROD"}
with mock.patch.dict(os.environ, prod_envs):
assert "mics" == utils.get_mics_host_name()
uat_envs = {"MICS_DEFAULT": "ACCEPT"}
with mock.patch.dict(os.environ, uat_envs):
assert "mics-uat" == utils.get_mics_host_name()
def test_convert_list_to_dict(self):
assert {"foo": 1, "bar": 2} == utils.convert_list_to_dict(["foo", 1, "bar", 2])
def test_get_all_templates(self):
tmpl_dir = os.path.join(settings.SOURCE_ROOT, "templates")
tmpl_files = utils.get_all_templates(tmpl_dir)
assert {"done": "done.yaml", "error": "error.yaml"} == tmpl_files
def test_return_value(self):
response = utils.Response(utils.Status.NOT_AVAILABLE)
assert response.sys_return_value() == settings.DEPENDS_RETRY
response.status = utils.Status.OK
assert response.sys_return_value() == settings.DEPENDS_SUCCESS
response.status = "any"
assert response.sys_return_value() == "any"
tfx - mayer
DaFileName:'/home/windmill/adm-471307/projects/tfx/tfx/reconciliation_and_exchange_files.py'
import argparse
import os
import sys
from pathlib import Path
# from our local libraries
from python_common import password as password
from python_common import log as LOGGER
import tfx
# TFX 36
is_abn_ftp = False
def __manually_delete_file__(LOGGER, filename):
message = "Deleting temporary file, " + filename
LOGGER.info(message)
os.remove(filename)
def acquire_arguments():
# initialize parser
parser = argparse.ArgumentParser(
description="Trade, Cash & Position reconciliation vs exchange" + " files"
)
parser.add_argument(
"-operation",
required=True,
type=tfx.validate_operation,
help="Operation: 'download' or 'validate'. When \"-operation download\""
+ " is provided the process will "
+ " download the reconciliation and exchange files."
+ " \nWhen \"-operation validate\", the"
+ " process will do a post validation of the reconciliation and"
+ " exchange files to ensure they have been downloaded to the expected"
+ " paths."
)
parser.add_argument(
"-t",
action="store_true",
help="Test mode. Only applies to operation=download. It will download"
+ " the margin reference file, report if the zip file is corrupted."
+ " No file copy to occur to the target host.",
)
parser.add_argument(
"-LAST_PROC_DATE",
required=True,
type=tfx.validate_date,
help="Last processing date in format of YYYYMMDD",
)
parser.add_argument(
"-source_host",
required=True,
help="FTP ip or hostname of the source server to get the files from",
)
parser.add_argument("-rde_dbi_name", required=True, help="RDE DBI Name")
parser.add_argument(
"-rde_schema", default="reference_data", help="RDE DBI Schema Name"
)
parser.add_argument("-rde_port", default="5444", help="RDE DBI port")
parser.add_argument(
"-destination_host",
type=str,
required=True,
help="destination server's ip address or hostname. This is where the files will be stored in ultimately",
)
parser.add_argument(
"-destination_host_user_id",
type=str,
default="jpauto",
help="Output server's user id to use when transferring the output"
+ " files over",
)
parser.add_argument(
"-destination_output_dir",
type=str,
default="/tmp",
help="Output directory to store the downloaded files in. This would be the directory of the destination server",
)
parser.add_argument(
"-margin_file_host",
type=str,
help="The server hostname or IP for the margin file to be copied to"
+ " files over",
)
parser.add_argument(
"-margin_file_host_user_id",
type=str,
default="jpauto",
help="The user name to be used to copy the margin file to the margin file host"
+ " files over",
)
parser.add_argument(
"-margin_file_output_dir",
type=str,
help="The directory path for the margin file to be copied to",
)
parser.add_argument(
"-validation_email_list",
type=str,
help='When a validation (ie. -operation "validation") is to be done, an email'
+ " list is required so that failures can be emailed to",
)
return parser.parse_args()
def main():
return_code = None
LOGGER.init_logger(__file__)
# fit in the credentials for the source ftp server
args = acquire_arguments()
args.t=True # TODO - setting this on - test mode
if args.t:
LOGGER.info("RUNNING ON TEST MODE.")
# Connect to the database using command line arguments for -rde_dbi_name,
# -rde_schema, -rde_port and database credentials.
# Get the database credentials by querying the pgp module with the
# value of -rde_dbi_name
#credential = password.get_password(args.rde_dbi_name) # Temp removed
##if (tfx.credentials_defined(credential) == 1): # Temp removed
if (1==1):
#args.rde_db_user = credential[0] # Temp removed
#args.rde_db_password = credential[1] # Temp removed
args.rde_db_user = "reference_data_user", # Temp removed
args.rde_db_password = "F0vcMRyF" # Temp removed
ftp_accounts = tfx.get_ftp_accounts(LOGGER, args, is_abn_ftp)
for ftp_account in ftp_accounts:
print(f" Currently working with ftp profile: {ftp_account}")
args.source_path = ""
args.source_user_id = ftp_account["ftp_user_id"]
args.source_user_password = ftp_account["ftp_password"]
args.LAST_PROC_DATE = str(args.LAST_PROC_DATE)
ftp_key = tfx.get_ftp_accounts(LOGGER, args, is_abn_ftp)
print(f"I got the ftp key(s):\n {ftp_key}")
if args.operation == "download":
target_directory = tfx.get_target_directory(
{
"member_code": str(ftp_account["tfx_member_code"]),
"last_processing_date": args.LAST_PROC_DATE,
}
)
LOGGER.info("Performing download of reconciliation and exchange files.")
return_code = tfx.perform_reconciliation_and_exchange_files_download(
LOGGER, ftp_account, args, target_directory
)
elif args.operation == "validate":
target_directory = tfx.get_target_directory(
{
"target_base_dir": args.destination_output_dir,
"member_code": str(ftp_account["tfx_member_code"]),
"last_processing_date": args.LAST_PROC_DATE,
}
)
if args.validation_email_list is None:
message = "Error: -validation_email_list argument must be provided"
LOGGER.error(message)
return_code = tfx.get_failure_return_code()
else:
LOGGER.info(
"Performing validation of reconciliation and exchange files."
)
# return success - if all checks done without failures (ie. ftp can be logged onto and checked for file)
return_code = tfx.perform_reconciliation_and_exchange_files_validation(
LOGGER, ftp_account, args, target_directory, args.validation_email_list
)
else:
LOGGER.error(
"No ftp credentials can be retrieved off the database name"
+ "(rde_dbi_name argument), '"
+ args.rde_dbi_name
+ "'"
)
return_code = tfx.update_return_code(return_code, tfx.get_failure_return_code())
tfx.log_overall_status(LOGGER, return_code)
sys.exit(return_code)
if __name__ == "__main__":
main()