Untitled
unknown
python
10 months ago
14 kB
33
Indexable
import logging import os import time import boto3 from boto3 import Session from botocore.client import BaseClient from botocore.exceptions import ClientError, WaiterError logger: logging.Logger = logging.getLogger(__name__) if len(logging.getLogger().handlers) > 0: # The Lambda environment pre-configures a handler logging to stderr. If a handler is already configured, # `.basicConfig` does not execute. Thus, we set the level directly. logging.getLogger().setLevel(logging.INFO) else: logging.basicConfig(level=logging.INFO) def handler(event: dict, context: any): session: Session = boto3.session.Session() region = session.region_name logger.info(f"Session region: {region}") ec2: BaseClient = boto3.client('ec2') ec2_resource: BaseClient = boto3.resource('ec2') sts: BaseClient = boto3.client('sts') emr_serverless: BaseClient = boto3.client('emr-serverless') try: # Call the describe_security_groups method response = ec2.describe_security_groups() # Print the security groups for security_group in response['SecurityGroups']: logger.info(f"Before Security Group ID: {security_group['GroupId']}") # Call the describe_subnets method response = ec2.describe_subnets() # Print the subnets for subnet in response['Subnets']: logger.info(f"Before Subnet ID: {subnet['SubnetId']}") application_id: str = os.environ['APPLICATION_ID'] # Create a VPC vpc_response: dict = ec2.create_vpc(CidrBlock='10.0.0.0/16') vpc_id = vpc_response['Vpc']['VpcId'] logger.info(f'Created VPC: {vpc_id}') # Wait until the VPC is available waiter = ec2.get_waiter('vpc_available') waiter.wait(VpcIds=[vpc_id]) logger.info(f'VPC is available: {vpc_id}') # Enable DNS support vpc = ec2_resource.Vpc(vpc_id) vpc.modify_attribute( EnableDnsSupport={'Value': True} ) # Enable DNS hostnames vpc.modify_attribute( EnableDnsHostnames={'Value': True} ) # Create VPC flow logs role_arn = os.environ['IAM_ROLE_ARN'] logger.info(f"Current role ARN: {role_arn}") print(f"Current role ARN: {role_arn}") vpc_flow_log = ec2.create_flow_logs( ResourceIds=[vpc_id], # replace with your VPC ID ResourceType='VPC', TrafficType='ALL', LogGroupName=‘MyAppicationVpcFlowLogs', DeliverLogsPermissionArn=role_arn ) # Create an Internet Gateway internet_gateway = ec2.create_internet_gateway() internet_gateway_id = internet_gateway['InternetGateway']['InternetGatewayId'] logger.info(f'Created Internet Gateway: {internet_gateway_id}') # Attach the Internet Gateway to the VPC ec2.attach_internet_gateway(InternetGatewayId=internet_gateway_id, VpcId=vpc_id) logger.info(f'Attached Internet Gateway to VPC: {vpc_id}') # Get a public subnet from the existing EMR Serverless application # response = emr_serverless.get_application(applicationId=application_id) # # logger.info(f"EMR Serverless subnets: {response['application']['networkConfiguration']['subnetIds']}") # logger.info( # f"EMR Serverless security groups: {response['application']['networkConfiguration']['securityGroupIds']}") # public_subnet_id = response['application']['networkConfiguration']['subnetIds'][0] # Create a public subnet public_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.1.0/24') public_subnet_id = public_subnet['Subnet']['SubnetId'] logger.info(f'Created public subnet: {public_subnet_id}') # Modify the public subnet to auto-assign public IP to instances ec2.modify_subnet_attribute( SubnetId=public_subnet_id, MapPublicIpOnLaunch={'Value': True} ) logger.info(f'Enabled auto-assign public IP for subnet: {public_subnet_id}') # Create a private subnet private_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.2.0/24') private_subnet_id = private_subnet['Subnet']['SubnetId'] logger.info(f'Created private subnet: {private_subnet_id}') # Wait until the subnets are available waiter = ec2.get_waiter('subnet_available') waiter.wait(SubnetIds=[ # public_subnet_id, private_subnet_id]) logger.info(f'Subnets are available: ' # f'{public_subnet_id}, ' f'{private_subnet_id}') # Create subnet flow logs subnet_flow_log = ec2.create_flow_logs( ResourceIds=[private_subnet_id], # replace with your subnet ID ResourceType='Subnet', TrafficType='ALL', LogGroupName=‘MyApplicationSubnetFlowLogs', DeliverLogsPermissionArn=role_arn ) # Create a security group sg = ec2.create_security_group(GroupName=‘my_application_security_group', Description=‘My Application VPC security group', VpcId=vpc_id) sg_id = sg['GroupId'] # Allow all outbound traffic try: ec2.authorize_security_group_egress( GroupId=sg_id, IpPermissions=[ { 'IpProtocol': '-1', 'IpRanges': [{'CidrIp': '0.0.0.0/0'}] } ] ) except ClientError as e: if e.response['Error']['Code'] == 'InvalidPermission.Duplicate': print("The rule already exists, skipping...") else: raise # re-raise the exception if it's a different error logger.info(f'Created security group: {sg_id}') # Create elastic IPs allocation = ec2.allocate_address(Domain='vpc') allocation_id = allocation['AllocationId'] logger.info(f'Created Elastic IP: {allocation_id}') # Create NAT Gateway # Create a NAT Gateway and specify the Elastic IP allocation ID # Replace 'subnet_id' with the ID of the subnet in which you want to create the NAT Gateway nat_gateway = ec2.create_nat_gateway(SubnetId=public_subnet_id, AllocationId=allocation_id) nat_gateway_id = nat_gateway["NatGateway"]["NatGatewayId"] # Wait until the NAT Gateway is available nat_gateway_waiter = ec2.get_waiter('nat_gateway_available') try: nat_gateway_waiter.wait(NatGatewayIds=[nat_gateway_id]) logger.info('NAT Gateway is available') except WaiterError as e: response = ec2.describe_nat_gateways(NatGatewayIds=[nat_gateway_id]) logger.exception(response) raise e # Create a route table route_table = ec2.create_route_table(VpcId=vpc_id) route_table_id = route_table['RouteTable']['RouteTableId'] logger.info(f'Created Route Table: {route_table_id}') # Wait until the route table is created while True: response = ec2.describe_route_tables(RouteTableIds=[route_table_id]) if response['RouteTables']: logger.info(f'Route Table is available: {route_table_id}') break else: logger.info(f'Waiting for Route Table to become available: {route_table_id}') time.sleep(5) # wait for 5 seconds before checking again response = ec2.create_route( RouteTableId=route_table['RouteTable']['RouteTableId'], DestinationCidrBlock='0.0.0.0/0', NatGatewayId=nat_gateway_id, # GatewayId=internet_gateway_id ) logger.info(f"Added route to Internet Gateway {internet_gateway_id} in Route Table {route_table_id}") # Associate the route table with the private subnet ec2.associate_route_table( RouteTableId=route_table_id, SubnetId=private_subnet_id ) logger.info(f"Associated Route Table {route_table_id} with private subnet {public_subnet_id}") # Create VPC endpoints # Create a security group for the VPC endpoints response = ec2.create_security_group(GroupName='VPC_Endpoint_SG', Description='Security group for VPC endpoint', VpcId=vpc_id) vpc_endpoint_security_group_id = response['GroupId'] # Allow all inbound traffic within the VPC ec2.authorize_security_group_ingress( GroupId=vpc_endpoint_security_group_id, IpPermissions=[ {'IpProtocol': '-1', 'IpRanges': [{'CidrIp': '10.0.0.0/16'}]} # Replace with your VPC CIDR ] ) # Allow outbound traffic to the VPC CIDR (which includes the VPC endpoint) ec2.authorize_security_group_egress( GroupId=vpc_endpoint_security_group_id, IpPermissions=[ {'IpProtocol': '-1', 'IpRanges': [{'CidrIp': '10.0.0.0/16'}]} # Replace with your VPC CIDR ] ) # Create a VPC endpoint for S3 s3_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.s3', RouteTableIds=[route_table_id], VpcEndpointType='Gateway', PrivateDnsEnabled=False ) logger.info(f'Created S3 VPC endpoint: {s3_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for Glue glue_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.glue', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created Glue VPC endpoint: {glue_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for Secrets Manager secrets_manager_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.secretsmanager', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created Secrets Manager VPC endpoint: {secrets_manager_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for KMS kms_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.kms', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created KMS VPC endpoint: {kms_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for CloudWatch Logs cloudwatch_logs_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.logs', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created CloudWatch Logs VPC endpoint: {cloudwatch_logs_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Create a VPC endpoint for Step Functions step_functions_endpoint = ec2.create_vpc_endpoint( VpcId=vpc_id, ServiceName=f'com.amazonaws.{region}.states', SubnetIds=[private_subnet_id], VpcEndpointType='Interface', PrivateDnsEnabled=True, SecurityGroupIds=[vpc_endpoint_security_group_id] ) logger.info(f'Created Step Functions VPC endpoint: {step_functions_endpoint["VpcEndpoint"]["VpcEndpointId"]}') # Wait until the VPC endpoints are available while True: response = ec2.describe_vpc_endpoints(VpcEndpointIds=[s3_endpoint["VpcEndpoint"]["VpcEndpointId"], glue_endpoint["VpcEndpoint"]["VpcEndpointId"], secrets_manager_endpoint["VpcEndpoint"][ "VpcEndpointId"], kms_endpoint["VpcEndpoint"]["VpcEndpointId"], cloudwatch_logs_endpoint["VpcEndpoint"][ "VpcEndpointId"], step_functions_endpoint["VpcEndpoint"][ "VpcEndpointId"]]) if all(endpoint['State'] == 'available' for endpoint in response['VpcEndpoints']): logger.info('VPC endpoints are available') break else: logger.info('Waiting for VPC endpoints to become available') time.sleep(5) # wait for 5 seconds before checking again # Attach the VPC configuration to the EMR Serverless application # Replace 'EmrApplicationId' with the ID of your EMR Serverless application response = emr_serverless.update_application( applicationId=application_id, networkConfiguration={ 'subnetIds': [public_subnet_id, private_subnet_id], 'securityGroupIds': [sg_id] } ) logger.info(f'Updated EMR Serverless application: {response}') except ClientError as e: logger.exception("Failure with botocore", exc_info=e) raise e
Editor is loading...
Leave a Comment