Untitled
unknown
plain_text
a year ago
1.5 kB
5
Indexable
import os
import logging
import json
import boto3
from botocore import config
logger = logging.getLogger()
logger.setLevel(logging.INFO)
lmd_client = boto3.client(
"lambda",
config=config.Config(
read_timeout=900, connect_timeout=900, retries={"max_attempts": 0}
),
)
def send_mail(msg: str, msg_type: str = "error") -> None:
if msg_type == "inform":
logger.info(msg)
payload = {
"subject": "[ALERT] Securities Ingestion Process!!!",
"body": "Dear Team,\nThe securities delta file size is bigger than expected.\n\n"
+ msg
+ f"\n\nThanks,\n{os.environ['signature']}",
"to": os.environ["to_recipient"].split(",")
}
else:
logger.error(msg)
payload = {
"subject": "[ERROR] Securities Ingestion Process Failed",
"body": "Dear Team,\nThe securities ingestion process failed due to the following error.\n\n"
+ msg
+ f"\n\nThanks,\n{os.environ['signature']}",
"to": os.environ["to_recipient"].split(",")
}
response = json.loads(
lmd_client.invoke(
FunctionName=os.environ["send_mail_lmd"],
InvocationType="RequestResponse",
Payload=bytes(json.dumps(payload).encode()),
)["Payload"].read()
)
if not response.get("status") == 200:
raise ChildProcessError(response)Editor is loading...
Leave a Comment