Untitled
unknown
python
a year ago
14 kB
38
Indexable
import logging import os import time import boto3 from boto3 import Session from botocore.client import BaseClient from botocore.exceptions import ClientError, WaiterError logger: logging.Logger = logging.getLogger(__name__) if len(logging.getLogger().handlers) > 0: # The Lambda environment pre-configures a handler logging to stderr. If a handler is already configured, # `.basicConfig` does not execute. Thus, we set the level directly. logging.getLogger().setLevel(logging.INFO) else: logging.basicConfig(level=logging.INFO) def handler(event: dict, context: any): session: Session = boto3.session.Session() region = session.region_name logger.info(f"Session region: {region}") ec2: BaseClient = boto3.client('ec2') ec2_resource: BaseClient = boto3.resource('ec2') sts: BaseClient = boto3.client('sts') emr_serverless: BaseClient = boto3.client('emr-serverless') try: # Call the describe_security_groups method response = ec2.describe_security_groups() # Print the security groups for security_group in response['SecurityGroups']: logger.info(f"Before Security Group ID: {security_group['GroupId']}") # Call the describe_subnets method response = ec2.describe_subnets() # Print the subnets for subnet in response['Subnets']: logger.info(f"Before Subnet ID: {subnet['SubnetId']}") application_id: str = os.environ['APPLICATION_ID'] # Create a VPC vpc_response: dict = ec2.create_vpc(CidrBlock='10.0.0.0/16') vpc_id = vpc_response['Vpc']['VpcId'] logger.info(f'Created VPC: {vpc_id}') # Wait until the VPC is available waiter = ec2.get_waiter('vpc_available') waiter.wait(VpcIds=[vpc_id]) logger.info(f'VPC is available: {vpc_id}') # Enable DNS support vpc = ec2_resource.Vpc(vpc_id) vpc.modify_attribute( EnableDnsSupport={'Value': True} ) # Enable DNS hostnames vpc.modify_attribute( EnableDnsHostnames={'Value': True} ) # Create VPC flow logs role_arn = os.environ['IAM_ROLE_ARN'] logger.info(f"Current role ARN: {role_arn}") print(f"Current role ARN: {role_arn}") vpc_flow_log = ec2.create_flow_logs( ResourceIds=[vpc_id], # replace with your VPC ID ResourceType='VPC', TrafficType='ALL', LogGroupName=‘MyAppicationVpcFlowLogs', DeliverLogsPermissionArn=role_arn ) # Create an Internet Gateway internet_gateway = ec2.create_internet_gateway() internet_gateway_id = internet_gateway['InternetGateway']['InternetGatewayId'] logger.info(f'Created Internet Gateway: {internet_gateway_id}') # Attach the Internet Gateway to the VPC ec2.attach_internet_gateway(InternetGatewayId=internet_gateway_id, VpcId=vpc_id) logger.info(f'Attached Internet Gateway to VPC: {vpc_id}') # Get a public subnet from the existing EMR Serverless application # response = emr_serverless.get_application(applicationId=application_id) # # logger.info(f"EMR Serverless subnets: {response['application']['networkConfiguration']['subnetIds']}") # logger.info( # f"EMR Serverless security groups: {response['application']['networkConfiguration']['securityGroupIds']}") # public_subnet_id = response['application']['networkConfiguration']['subnetIds'][0] # Create a public subnet public_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.1.0/24') public_subnet_id = public_subnet['Subnet']['SubnetId'] logger.info(f'Created public subnet: {public_subnet_id}') # Modify the public subnet to auto-assign public IP to instances ec2.modify_subnet_attribute( SubnetId=public_subnet_id, MapPublicIpOnLaunch={'Value': True} ) logger.info(f'Enabled auto-assign public IP for subnet: {public_subnet_id}') # Create a private subnet private_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.2.0/24') private_subnet_id = private_subnet['Subnet']['SubnetId'] logger.info(f'Created private subnet: {private_subnet_id}') # Wait until the subnets are available waiter = ec2.get_waiter('subnet_available') waiter.wait(SubnetIds=[ # public_subnet_id, private_subnet_id]) logger.info(f'Subnets are available: ' # f'{public_subnet_id}, ' f'{private_subnet_id}') # Create subnet flow logs subnet_flow_log = ec2.create_flow_logs( ResourceIds=[private_subnet_id], # replace with your subnet ID ResourceType='Subnet', TrafficType='ALL', LogGroupName=‘MyApplicationSubnetFlowLogs', DeliverLogsPermissionArn=role_arn ) # Create a security group sg = ec2.create_security_group(GroupName=‘my_application_security_group', Description=‘My Application VPC security group', VpcId=vpc_id) sg_id = sg['GroupId'] # Allow all outbound traffic try: ec2.authorize_security_group_egress( GroupId=sg_id, IpPermissions=[ { 'IpProtocol': '-1', 'IpRanges': [{'CidrIp': '0.0.0.0/0'}] } ] ) except ClientError as e: if e.response['Error']['Code'] == 'InvalidPermission.Duplicate': print("The rule already exists, skipping...") else: raise # re-raise the exception if it's a different error logger.info(f'Created security group: {sg_id}') # Create elastic IPs allocation = ec2.allocate_address(Domain='vpc') allocation_id = allocation['AllocationId'] logger.info(f'Created Elastic IP: {allocation_id}') # Create NAT Gateway # Create a NAT Gateway and specify the Elastic IP allocation ID # Replace 'subnet_id' with the ID of the subnet in which you want to create the NAT Gateway nat_gateway = ec2.create_nat_gateway(SubnetId=public_subnet_id, AllocationId=allocation_id) nat_gateway_id = nat_gateway["NatGateway"]["NatGatewayId"] # Wait until the NAT Gateway is available nat_gateway_waiter = ec2.get_waiter('nat_gateway_available') try: nat_gateway_waiter.wait(NatGatewayIds=[nat_gateway_id]) logger.info('NAT Gateway is available') except WaiterError as e: response = ec2.describe_nat_gateways(NatGatewayIds=[nat_gateway_id]) logger.exception(response) raise e # Create a route table route_table = ec2.create_route_table(VpcId=vpc_id) route_table_id = route_table['RouteTable']['RouteTableId'] logger.info(f'Created Route Table: {route_table_id}') # Wait until the route table is created while True: response = ec2.describe_route_tables(RouteTableIds=[route_table_id]) if response['RouteTables']: logger.info(f'Route Table is available: {route_table_id}') break else: logger.info(f'Waiting for Route Table to become available: {route_table_id}') time.sleep(5) # wait for 5 seconds before checking again response = ec2.create_route( RouteTableId=route_table['RouteTable']['RouteTableId'], DestinationCidrBlock='0.0.0.0/0', NatGatewayId=nat_gateway_id, # GatewayId=internet_gateway_id ) logger.info(f"Added route to Internet Gateway {internet_gateway_id} in Route Table {route_table_id}") # Associate the route table with the private subnet ec2.associate_route_table( RouteTableId=route_table_id, SubnetId=private_subnet_id ) logger.info(f"Associated Route Table {route_table_id} with private subnet {public_subnet_id}") # Create VPC endpoints # Create a security group for the VPC endpoints response = ec2.create_security_group(GroupName='VPC_Endpoint_SG', Description='Security group for VPC endpoint', VpcId=vpc_id) vpc_endpoint_security_group_id = response['GroupId'] # Allow all inbound traffic within the VPC ec2.authorize_security_group_ingress( GroupId=vpc_endpoint_security_group_id, IpPermissions=[ {'IpProtocol': '-1', 'IpRanges': [{'CidrIp': '10.0.0.0/16'}]} # Replace with your VPC CIDR ] ) # Allow outbound traffic to the VPC CIDR (which includes the VPC endpoint) ec2.authorize_security_group_egress( GroupId=vpc_endpoint_security_group_id, IpPermissions=[ {'IpProtocol': '-1', 'IpRanges': [{'CidrIp': '10.0.0.0/16'}]} # Replace with your VPC CIDR ] ) # Create a VPC endpoint for S3 s3_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.s3', RouteTableIds=[route_table_id], VpcEndpointType='Gateway', PrivateDnsEnabled=False ) logger.info(f'Created S3 VPC endpoint: {s3_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for Glue glue_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.glue', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created Glue VPC endpoint: {glue_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for Secrets Manager secrets_manager_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.secretsmanager', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created Secrets Manager VPC endpoint: {secrets_manager_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for KMS kms_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.kms', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created KMS VPC endpoint: {kms_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for CloudWatch Logs cloudwatch_logs_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.logs', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created CloudWatch Logs VPC endpoint: {cloudwatch_logs_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for Step Functions step_functions_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.states', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created Step Functions VPC endpoint: {step_functions_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Wait until the VPC endpoints are available while True: response = ec2.describe_vpc_endpoints(VpcEndpointIds=[s3_endpoint["VpcEndpoint"]["VpcEndpointId"], glue_endpoint["VpcEndpoint"]["VpcEndpointId"], secrets_manager_endpoint["VpcEndpoint"][ "VpcEndpointId"], kms_endpoint["VpcEndpoint"]["VpcEndpointId"], cloudwatch_logs_endpoint["VpcEndpoint"][ "VpcEndpointId"], step_functions_endpoint["VpcEndpoint"][ "VpcEndpointId"]]) if all(endpoint['State'] == 'available' for endpoint in response['VpcEndpoints']): logger.info('VPC endpoints are available') break else: logger.info('Waiting for VPC endpoints to become available') time.sleep(5) # wait for 5 seconds before checking again # Attach the VPC configuration to the EMR Serverless application # Replace 'EmrApplicationId' with the ID of your EMR Serverless application response = emr_serverless.update_application( applicationId=application_id, networkConfiguration={ 'subnetIds': [public_subnet_id, private_subnet_id], 'securityGroupIds': [sg_id] } ) logger.info(f'Updated EMR Serverless application: {response}') except ClientError as e: logger.exception("Failure with botocore", exc_info=e) raise e
Editor is loading...
Leave a Comment