Untitled

 avatar
unknown
python
a year ago
14 kB
38
Indexable
import logging
import os
import time

import boto3
from boto3 import Session
from botocore.client import BaseClient
from botocore.exceptions import ClientError, WaiterError

logger: logging.Logger = logging.getLogger(__name__)
if len(logging.getLogger().handlers) > 0:
    # The Lambda environment pre-configures a handler logging to stderr. If a handler is already configured,
    # `.basicConfig` does not execute. Thus, we set the level directly.
    logging.getLogger().setLevel(logging.INFO)
else:
    logging.basicConfig(level=logging.INFO)


def handler(event: dict, context: any):
    session: Session = boto3.session.Session()
    region = session.region_name

    logger.info(f"Session region: {region}")

    ec2: BaseClient = boto3.client('ec2')
    ec2_resource: BaseClient = boto3.resource('ec2')
    sts: BaseClient = boto3.client('sts')
    emr_serverless: BaseClient = boto3.client('emr-serverless')

    try:
        # Call the describe_security_groups method
        response = ec2.describe_security_groups()

        # Print the security groups
        for security_group in response['SecurityGroups']:
            logger.info(f"Before Security Group ID: {security_group['GroupId']}")

        # Call the describe_subnets method
        response = ec2.describe_subnets()

        # Print the subnets
        for subnet in response['Subnets']:
            logger.info(f"Before Subnet ID: {subnet['SubnetId']}")

        application_id: str = os.environ['APPLICATION_ID']

        # Create a VPC
        vpc_response: dict = ec2.create_vpc(CidrBlock='10.0.0.0/16')
        vpc_id = vpc_response['Vpc']['VpcId']
        logger.info(f'Created VPC: {vpc_id}')

        # Wait until the VPC is available
        waiter = ec2.get_waiter('vpc_available')
        waiter.wait(VpcIds=[vpc_id])
        logger.info(f'VPC is available: {vpc_id}')

        # Enable DNS support
        vpc = ec2_resource.Vpc(vpc_id)
        vpc.modify_attribute(
            EnableDnsSupport={'Value': True}
        )

        # Enable DNS hostnames
        vpc.modify_attribute(
            EnableDnsHostnames={'Value': True}
        )

        # Create VPC flow logs
        role_arn = os.environ['IAM_ROLE_ARN']
        logger.info(f"Current role ARN: {role_arn}")
        print(f"Current role ARN: {role_arn}")
        vpc_flow_log = ec2.create_flow_logs(
            ResourceIds=[vpc_id],  # replace with your VPC ID
            ResourceType='VPC',
            TrafficType='ALL',
            LogGroupName=‘MyAppicationVpcFlowLogs',
            DeliverLogsPermissionArn=role_arn
        )

        # Create an Internet Gateway
        internet_gateway = ec2.create_internet_gateway()
        internet_gateway_id = internet_gateway['InternetGateway']['InternetGatewayId']
        logger.info(f'Created Internet Gateway: {internet_gateway_id}')

        # Attach the Internet Gateway to the VPC
        ec2.attach_internet_gateway(InternetGatewayId=internet_gateway_id, VpcId=vpc_id)
        logger.info(f'Attached Internet Gateway to VPC: {vpc_id}')

        # Get a public subnet from the existing EMR Serverless application
        # response = emr_serverless.get_application(applicationId=application_id)
        #
        # logger.info(f"EMR Serverless subnets: {response['application']['networkConfiguration']['subnetIds']}")
        # logger.info(
        #     f"EMR Serverless security groups: {response['application']['networkConfiguration']['securityGroupIds']}")

        # public_subnet_id = response['application']['networkConfiguration']['subnetIds'][0]

        # Create a public subnet
        public_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.1.0/24')
        public_subnet_id = public_subnet['Subnet']['SubnetId']
        logger.info(f'Created public subnet: {public_subnet_id}')

        # Modify the public subnet to auto-assign public IP to instances
        ec2.modify_subnet_attribute(
            SubnetId=public_subnet_id,
            MapPublicIpOnLaunch={'Value': True}
        )
        logger.info(f'Enabled auto-assign public IP for subnet: {public_subnet_id}')

        # Create a private subnet
        private_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.2.0/24')
        private_subnet_id = private_subnet['Subnet']['SubnetId']
        logger.info(f'Created private subnet: {private_subnet_id}')

        # Wait until the subnets are available
        waiter = ec2.get_waiter('subnet_available')
        waiter.wait(SubnetIds=[
            # public_subnet_id,
            private_subnet_id])
        logger.info(f'Subnets are available: '
                    # f'{public_subnet_id}, '
                    f'{private_subnet_id}')

        # Create subnet flow logs
        subnet_flow_log = ec2.create_flow_logs(
            ResourceIds=[private_subnet_id],  # replace with your subnet ID
            ResourceType='Subnet',
            TrafficType='ALL',
            LogGroupName=‘MyApplicationSubnetFlowLogs',
            DeliverLogsPermissionArn=role_arn
        )

        # Create a security group
        sg = ec2.create_security_group(GroupName=‘my_application_security_group',
                                       Description=‘My Application VPC security group',
                                       VpcId=vpc_id)
        sg_id = sg['GroupId']
        # Allow all outbound traffic
        try:
            ec2.authorize_security_group_egress(
                GroupId=sg_id,
                IpPermissions=[
                    {
                        'IpProtocol': '-1',
                        'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
                    }
                ]
            )
        except ClientError as e:
            if e.response['Error']['Code'] == 'InvalidPermission.Duplicate':
                print("The rule already exists, skipping...")
            else:
                raise  # re-raise the exception if it's a different error
        logger.info(f'Created security group: {sg_id}')

        # Create elastic IPs
        allocation = ec2.allocate_address(Domain='vpc')
        allocation_id = allocation['AllocationId']
        logger.info(f'Created Elastic IP: {allocation_id}')

        # Create NAT Gateway
        # Create a NAT Gateway and specify the Elastic IP allocation ID
        # Replace 'subnet_id' with the ID of the subnet in which you want to create the NAT Gateway
        nat_gateway = ec2.create_nat_gateway(SubnetId=public_subnet_id, AllocationId=allocation_id)
        nat_gateway_id = nat_gateway["NatGateway"]["NatGatewayId"]

        # Wait until the NAT Gateway is available
        nat_gateway_waiter = ec2.get_waiter('nat_gateway_available')
        try:
            nat_gateway_waiter.wait(NatGatewayIds=[nat_gateway_id])
            logger.info('NAT Gateway is available')
        except WaiterError as e:
            response = ec2.describe_nat_gateways(NatGatewayIds=[nat_gateway_id])
            logger.exception(response)
            raise e

        # Create a route table
        route_table = ec2.create_route_table(VpcId=vpc_id)

        route_table_id = route_table['RouteTable']['RouteTableId']
        logger.info(f'Created Route Table: {route_table_id}')

        # Wait until the route table is created
        while True:
            response = ec2.describe_route_tables(RouteTableIds=[route_table_id])
            if response['RouteTables']:
                logger.info(f'Route Table is available: {route_table_id}')
                break
            else:
                logger.info(f'Waiting for Route Table to become available: {route_table_id}')
                time.sleep(5)  # wait for 5 seconds before checking again

        response = ec2.create_route(
            RouteTableId=route_table['RouteTable']['RouteTableId'],
            DestinationCidrBlock='0.0.0.0/0',
            NatGatewayId=nat_gateway_id,
            # GatewayId=internet_gateway_id
        )
        logger.info(f"Added route to Internet Gateway {internet_gateway_id} in Route Table {route_table_id}")

        # Associate the route table with the private subnet
        ec2.associate_route_table(
            RouteTableId=route_table_id,
            SubnetId=private_subnet_id
        )
        logger.info(f"Associated Route Table {route_table_id} with private subnet {public_subnet_id}")

        # Create VPC endpoints
        # Create a security group for the VPC endpoints
        response = ec2.create_security_group(GroupName='VPC_Endpoint_SG',
                                             Description='Security group for VPC endpoint',
                                             VpcId=vpc_id)
        vpc_endpoint_security_group_id = response['GroupId']

        # Allow all inbound traffic within the VPC
        ec2.authorize_security_group_ingress(
            GroupId=vpc_endpoint_security_group_id,
            IpPermissions=[
                {'IpProtocol': '-1',
                 'IpRanges': [{'CidrIp': '10.0.0.0/16'}]}  # Replace with your VPC CIDR
            ]
        )

        # Allow outbound traffic to the VPC CIDR (which includes the VPC endpoint)
        ec2.authorize_security_group_egress(
            GroupId=vpc_endpoint_security_group_id,
            IpPermissions=[
                {'IpProtocol': '-1',
                 'IpRanges': [{'CidrIp': '10.0.0.0/16'}]}  # Replace with your VPC CIDR
            ]
        )

        # Create a VPC endpoint for S3
        s3_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.s3',
            RouteTableIds=[route_table_id],
            VpcEndpointType='Gateway',
            PrivateDnsEnabled=False
        )
        logger.info(f'Created S3 VPC endpoint: {s3_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for Glue
        glue_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.glue',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created Glue VPC endpoint: {glue_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for Secrets Manager
        secrets_manager_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.secretsmanager',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created Secrets Manager VPC endpoint: {secrets_manager_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for KMS
        kms_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.kms',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created KMS VPC endpoint: {kms_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for CloudWatch Logs
        cloudwatch_logs_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.logs',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created CloudWatch Logs VPC endpoint: {cloudwatch_logs_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for Step Functions
        step_functions_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.states',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created Step Functions VPC endpoint: {step_functions_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Wait until the VPC endpoints are available
        while True:
            response = ec2.describe_vpc_endpoints(VpcEndpointIds=[s3_endpoint["VpcEndpoint"]["VpcEndpointId"],
                                                                  glue_endpoint["VpcEndpoint"]["VpcEndpointId"],
                                                                  secrets_manager_endpoint["VpcEndpoint"][
                                                                      "VpcEndpointId"],
                                                                  kms_endpoint["VpcEndpoint"]["VpcEndpointId"],
                                                                  cloudwatch_logs_endpoint["VpcEndpoint"][
                                                                      "VpcEndpointId"],
                                                                  step_functions_endpoint["VpcEndpoint"][
                                                                      "VpcEndpointId"]])
            if all(endpoint['State'] == 'available' for endpoint in response['VpcEndpoints']):
                logger.info('VPC endpoints are available')
                break
            else:
                logger.info('Waiting for VPC endpoints to become available')
                time.sleep(5)  # wait for 5 seconds before checking again

        # Attach the VPC configuration to the EMR Serverless application
        # Replace 'EmrApplicationId' with the ID of your EMR Serverless application
        response = emr_serverless.update_application(
            applicationId=application_id,
            networkConfiguration={
                'subnetIds': [public_subnet_id, private_subnet_id],
                'securityGroupIds': [sg_id]
            }
        )
        logger.info(f'Updated EMR Serverless application: {response}')

    except ClientError as e:
        logger.exception("Failure with botocore", exc_info=e)
        raise e
Editor is loading...
Leave a Comment