Untitled

 avatar
unknown
python
10 months ago
14 kB
33
Indexable
import logging
import os
import time

import boto3
from boto3 import Session
from botocore.client import BaseClient
from botocore.exceptions import ClientError, WaiterError

logger: logging.Logger = logging.getLogger(__name__)
if len(logging.getLogger().handlers) > 0:
    # The Lambda environment pre-configures a handler logging to stderr. If a handler is already configured,
    # `.basicConfig` does not execute. Thus, we set the level directly.
    logging.getLogger().setLevel(logging.INFO)
else:
    logging.basicConfig(level=logging.INFO)


def handler(event: dict, context: any):
    session: Session = boto3.session.Session()
    region = session.region_name

    logger.info(f"Session region: {region}")

    ec2: BaseClient = boto3.client('ec2')
    ec2_resource: BaseClient = boto3.resource('ec2')
    sts: BaseClient = boto3.client('sts')
    emr_serverless: BaseClient = boto3.client('emr-serverless')

    try:
        # Call the describe_security_groups method
        response = ec2.describe_security_groups()

        # Print the security groups
        for security_group in response['SecurityGroups']:
            logger.info(f"Before Security Group ID: {security_group['GroupId']}")

        # Call the describe_subnets method
        response = ec2.describe_subnets()

        # Print the subnets
        for subnet in response['Subnets']:
            logger.info(f"Before Subnet ID: {subnet['SubnetId']}")

        application_id: str = os.environ['APPLICATION_ID']

        # Create a VPC
        vpc_response: dict = ec2.create_vpc(CidrBlock='10.0.0.0/16')
        vpc_id = vpc_response['Vpc']['VpcId']
        logger.info(f'Created VPC: {vpc_id}')

        # Wait until the VPC is available
        waiter = ec2.get_waiter('vpc_available')
        waiter.wait(VpcIds=[vpc_id])
        logger.info(f'VPC is available: {vpc_id}')

        # Enable DNS support
        vpc = ec2_resource.Vpc(vpc_id)
        vpc.modify_attribute(
            EnableDnsSupport={'Value': True}
        )

        # Enable DNS hostnames
        vpc.modify_attribute(
            EnableDnsHostnames={'Value': True}
        )

        # Create VPC flow logs
        role_arn = os.environ['IAM_ROLE_ARN']
        logger.info(f"Current role ARN: {role_arn}")
        print(f"Current role ARN: {role_arn}")
        vpc_flow_log = ec2.create_flow_logs(
            ResourceIds=[vpc_id],  # replace with your VPC ID
            ResourceType='VPC',
            TrafficType='ALL',
            LogGroupName=‘MyAppicationVpcFlowLogs',
            DeliverLogsPermissionArn=role_arn
        )

        # Create an Internet Gateway
        internet_gateway = ec2.create_internet_gateway()
        internet_gateway_id = internet_gateway['InternetGateway']['InternetGatewayId']
        logger.info(f'Created Internet Gateway: {internet_gateway_id}')

        # Attach the Internet Gateway to the VPC
        ec2.attach_internet_gateway(InternetGatewayId=internet_gateway_id, VpcId=vpc_id)
        logger.info(f'Attached Internet Gateway to VPC: {vpc_id}')

        # Get a public subnet from the existing EMR Serverless application
        # response = emr_serverless.get_application(applicationId=application_id)
        #
        # logger.info(f"EMR Serverless subnets: {response['application']['networkConfiguration']['subnetIds']}")
        # logger.info(
        #     f"EMR Serverless security groups: {response['application']['networkConfiguration']['securityGroupIds']}")

        # public_subnet_id = response['application']['networkConfiguration']['subnetIds'][0]

        # Create a public subnet
        public_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.1.0/24')
        public_subnet_id = public_subnet['Subnet']['SubnetId']
        logger.info(f'Created public subnet: {public_subnet_id}')

        # Modify the public subnet to auto-assign public IP to instances
        ec2.modify_subnet_attribute(
            SubnetId=public_subnet_id,
            MapPublicIpOnLaunch={'Value': True}
        )
        logger.info(f'Enabled auto-assign public IP for subnet: {public_subnet_id}')

        # Create a private subnet
        private_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.2.0/24')
        private_subnet_id = private_subnet['Subnet']['SubnetId']
        logger.info(f'Created private subnet: {private_subnet_id}')

        # Wait until the subnets are available
        waiter = ec2.get_waiter('subnet_available')
        waiter.wait(SubnetIds=[
            # public_subnet_id,
            private_subnet_id])
        logger.info(f'Subnets are available: '
                    # f'{public_subnet_id}, '
                    f'{private_subnet_id}')

        # Create subnet flow logs
        subnet_flow_log = ec2.create_flow_logs(
            ResourceIds=[private_subnet_id],  # replace with your subnet ID
            ResourceType='Subnet',
            TrafficType='ALL',
            LogGroupName=‘MyApplicationSubnetFlowLogs',
            DeliverLogsPermissionArn=role_arn
        )

        # Create a security group
        sg = ec2.create_security_group(GroupName=‘my_application_security_group',
                                       Description=‘My Application VPC security group',
                                       VpcId=vpc_id)
        sg_id = sg['GroupId']
        # Allow all outbound traffic
        try:
            ec2.authorize_security_group_egress(
                GroupId=sg_id,
                IpPermissions=[
                    {
                        'IpProtocol': '-1',
                        'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
                    }
                ]
            )
        except ClientError as e:
            if e.response['Error']['Code'] == 'InvalidPermission.Duplicate':
                print("The rule already exists, skipping...")
            else:
                raise  # re-raise the exception if it's a different error
        logger.info(f'Created security group: {sg_id}')

        # Create elastic IPs
        allocation = ec2.allocate_address(Domain='vpc')
        allocation_id = allocation['AllocationId']
        logger.info(f'Created Elastic IP: {allocation_id}')

        # Create NAT Gateway
        # Create a NAT Gateway and specify the Elastic IP allocation ID
        # Replace 'subnet_id' with the ID of the subnet in which you want to create the NAT Gateway
        nat_gateway = ec2.create_nat_gateway(SubnetId=public_subnet_id, AllocationId=allocation_id)
        nat_gateway_id = nat_gateway["NatGateway"]["NatGatewayId"]

        # Wait until the NAT Gateway is available
        nat_gateway_waiter = ec2.get_waiter('nat_gateway_available')
        try:
            nat_gateway_waiter.wait(NatGatewayIds=[nat_gateway_id])
            logger.info('NAT Gateway is available')
        except WaiterError as e:
            response = ec2.describe_nat_gateways(NatGatewayIds=[nat_gateway_id])
            logger.exception(response)
            raise e

        # Create a route table
        route_table = ec2.create_route_table(VpcId=vpc_id)

        route_table_id = route_table['RouteTable']['RouteTableId']
        logger.info(f'Created Route Table: {route_table_id}')

        # Wait until the route table is created
        while True:
            response = ec2.describe_route_tables(RouteTableIds=[route_table_id])
            if response['RouteTables']:
                logger.info(f'Route Table is available: {route_table_id}')
                break
            else:
                logger.info(f'Waiting for Route Table to become available: {route_table_id}')
                time.sleep(5)  # wait for 5 seconds before checking again

        response = ec2.create_route(
            RouteTableId=route_table['RouteTable']['RouteTableId'],
            DestinationCidrBlock='0.0.0.0/0',
            NatGatewayId=nat_gateway_id,
            # GatewayId=internet_gateway_id
        )
        logger.info(f"Added route to Internet Gateway {internet_gateway_id} in Route Table {route_table_id}")

        # Associate the route table with the private subnet
        ec2.associate_route_table(
            RouteTableId=route_table_id,
            SubnetId=private_subnet_id
        )
        logger.info(f"Associated Route Table {route_table_id} with private subnet {public_subnet_id}")

        # Create VPC endpoints
        # Create a security group for the VPC endpoints
        response = ec2.create_security_group(GroupName='VPC_Endpoint_SG',
                                             Description='Security group for VPC endpoint',
                                             VpcId=vpc_id)
        vpc_endpoint_security_group_id = response['GroupId']

        # Allow all inbound traffic within the VPC
        ec2.authorize_security_group_ingress(
            GroupId=vpc_endpoint_security_group_id,
            IpPermissions=[
                {'IpProtocol': '-1',
                 'IpRanges': [{'CidrIp': '10.0.0.0/16'}]}  # Replace with your VPC CIDR
            ]
        )

        # Allow outbound traffic to the VPC CIDR (which includes the VPC endpoint)
        ec2.authorize_security_group_egress(
            GroupId=vpc_endpoint_security_group_id,
            IpPermissions=[
                {'IpProtocol': '-1',
                 'IpRanges': [{'CidrIp': '10.0.0.0/16'}]}  # Replace with your VPC CIDR
            ]
        )

        # Create a VPC endpoint for S3
        s3_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.s3',
            RouteTableIds=[route_table_id],
            VpcEndpointType='Gateway',
            PrivateDnsEnabled=False
        )
        logger.info(f'Created S3 VPC endpoint: {s3_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for Glue
        glue_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.glue',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created Glue VPC endpoint: {glue_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for Secrets Manager
        secrets_manager_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.secretsmanager',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created Secrets Manager VPC endpoint: {secrets_manager_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for KMS
        kms_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.kms',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created KMS VPC endpoint: {kms_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for CloudWatch Logs
        cloudwatch_logs_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.logs',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created CloudWatch Logs VPC endpoint: {cloudwatch_logs_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Create a VPC endpoint for Step Functions
        step_functions_endpoint = ec2.create_vpc_endpoint(
            VpcId=vpc_id,
            ServiceName=f'com.amazonaws.{region}.states',
            SubnetIds=[private_subnet_id],
            VpcEndpointType='Interface',
            PrivateDnsEnabled=True,
            SecurityGroupIds=[vpc_endpoint_security_group_id]
        )
        logger.info(f'Created Step Functions VPC endpoint: {step_functions_endpoint["VpcEndpoint"]["VpcEndpointId"]}')

        # Wait until the VPC endpoints are available
        while True:
            response = ec2.describe_vpc_endpoints(VpcEndpointIds=[s3_endpoint["VpcEndpoint"]["VpcEndpointId"],
                                                                  glue_endpoint["VpcEndpoint"]["VpcEndpointId"],
                                                                  secrets_manager_endpoint["VpcEndpoint"][
                                                                      "VpcEndpointId"],
                                                                  kms_endpoint["VpcEndpoint"]["VpcEndpointId"],
                                                                  cloudwatch_logs_endpoint["VpcEndpoint"][
                                                                      "VpcEndpointId"],
                                                                  step_functions_endpoint["VpcEndpoint"][
                                                                      "VpcEndpointId"]])
            if all(endpoint['State'] == 'available' for endpoint in response['VpcEndpoints']):
                logger.info('VPC endpoints are available')
                break
            else:
                logger.info('Waiting for VPC endpoints to become available')
                time.sleep(5)  # wait for 5 seconds before checking again

        # Attach the VPC configuration to the EMR Serverless application
        # Replace 'EmrApplicationId' with the ID of your EMR Serverless application
        response = emr_serverless.update_application(
            applicationId=application_id,
            networkConfiguration={
                'subnetIds': [public_subnet_id, private_subnet_id],
                'securityGroupIds': [sg_id]
            }
        )
        logger.info(f'Updated EMR Serverless application: {response}')

    except ClientError as e:
        logger.exception("Failure with botocore", exc_info=e)
        raise e
Editor is loading...
Leave a Comment