Untitled
unknown
plain_text
5 months ago
1.5 kB
2
Indexable
import os import logging import json import boto3 from botocore import config logger = logging.getLogger() logger.setLevel(logging.INFO) lmd_client = boto3.client( "lambda", config=config.Config( read_timeout=900, connect_timeout=900, retries={"max_attempts": 0} ), ) def send_mail(msg: str, msg_type: str = "error") -> None: if msg_type == "inform": logger.info(msg) payload = { "subject": "[ALERT] Securities Ingestion Process!!!", "body": "Dear Team,\nThe securities delta file size is bigger than expected.\n\n" + msg + f"\n\nThanks,\n{os.environ['signature']}", "to": os.environ["to_recipient"].split(",") } else: logger.error(msg) payload = { "subject": "[ERROR] Securities Ingestion Process Failed", "body": "Dear Team,\nThe securities ingestion process failed due to the following error.\n\n" + msg + f"\n\nThanks,\n{os.environ['signature']}", "to": os.environ["to_recipient"].split(",") } response = json.loads( lmd_client.invoke( FunctionName=os.environ["send_mail_lmd"], InvocationType="RequestResponse", Payload=bytes(json.dumps(payload).encode()), )["Payload"].read() ) if not response.get("status") == 200: raise ChildProcessError(response)
Editor is loading...
Leave a Comment