Untitled
unknown
python
a year ago
14 kB
43
Indexable
import logging
import os
import time
import boto3
from boto3 import Session
from botocore.client import BaseClient
from botocore.exceptions import ClientError, WaiterError
logger: logging.Logger = logging.getLogger(__name__)
if len(logging.getLogger().handlers) > 0:
# The Lambda environment pre-configures a handler logging to stderr. If a handler is already configured,
# `.basicConfig` does not execute. Thus, we set the level directly.
logging.getLogger().setLevel(logging.INFO)
else:
logging.basicConfig(level=logging.INFO)
def handler(event: dict, context: any):
session: Session = boto3.session.Session()
region = session.region_name
logger.info(f"Session region: {region}")
ec2: BaseClient = boto3.client('ec2')
ec2_resource: BaseClient = boto3.resource('ec2')
sts: BaseClient = boto3.client('sts')
emr_serverless: BaseClient = boto3.client('emr-serverless')
try:
# Call the describe_security_groups method
response = ec2.describe_security_groups()
# Print the security groups
for security_group in response['SecurityGroups']:
logger.info(f"Before Security Group ID: {security_group['GroupId']}")
# Call the describe_subnets method
response = ec2.describe_subnets()
# Print the subnets
for subnet in response['Subnets']:
logger.info(f"Before Subnet ID: {subnet['SubnetId']}")
application_id: str = os.environ['APPLICATION_ID']
# Create a VPC
vpc_response: dict = ec2.create_vpc(CidrBlock='10.0.0.0/16')
vpc_id = vpc_response['Vpc']['VpcId']
logger.info(f'Created VPC: {vpc_id}')
# Wait until the VPC is available
waiter = ec2.get_waiter('vpc_available')
waiter.wait(VpcIds=[vpc_id])
logger.info(f'VPC is available: {vpc_id}')
# Enable DNS support
vpc = ec2_resource.Vpc(vpc_id)
vpc.modify_attribute(
EnableDnsSupport={'Value': True}
)
# Enable DNS hostnames
vpc.modify_attribute(
EnableDnsHostnames={'Value': True}
)
# Create VPC flow logs
role_arn = os.environ['IAM_ROLE_ARN']
logger.info(f"Current role ARN: {role_arn}")
print(f"Current role ARN: {role_arn}")
vpc_flow_log = ec2.create_flow_logs(
ResourceIds=[vpc_id], # replace with your VPC ID
ResourceType='VPC',
TrafficType='ALL',
LogGroupName=‘MyAppicationVpcFlowLogs',
DeliverLogsPermissionArn=role_arn
)
# Create an Internet Gateway
internet_gateway = ec2.create_internet_gateway()
internet_gateway_id = internet_gateway['InternetGateway']['InternetGatewayId']
logger.info(f'Created Internet Gateway: {internet_gateway_id}')
# Attach the Internet Gateway to the VPC
ec2.attach_internet_gateway(InternetGatewayId=internet_gateway_id, VpcId=vpc_id)
logger.info(f'Attached Internet Gateway to VPC: {vpc_id}')
# Get a public subnet from the existing EMR Serverless application
# response = emr_serverless.get_application(applicationId=application_id)
#
# logger.info(f"EMR Serverless subnets: {response['application']['networkConfiguration']['subnetIds']}")
# logger.info(
# f"EMR Serverless security groups: {response['application']['networkConfiguration']['securityGroupIds']}")
# public_subnet_id = response['application']['networkConfiguration']['subnetIds'][0]
# Create a public subnet
public_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.1.0/24')
public_subnet_id = public_subnet['Subnet']['SubnetId']
logger.info(f'Created public subnet: {public_subnet_id}')
# Modify the public subnet to auto-assign public IP to instances
ec2.modify_subnet_attribute(
SubnetId=public_subnet_id,
MapPublicIpOnLaunch={'Value': True}
)
logger.info(f'Enabled auto-assign public IP for subnet: {public_subnet_id}')
# Create a private subnet
private_subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.2.0/24')
private_subnet_id = private_subnet['Subnet']['SubnetId']
logger.info(f'Created private subnet: {private_subnet_id}')
# Wait until the subnets are available
waiter = ec2.get_waiter('subnet_available')
waiter.wait(SubnetIds=[
# public_subnet_id,
private_subnet_id])
logger.info(f'Subnets are available: '
# f'{public_subnet_id}, '
f'{private_subnet_id}')
# Create subnet flow logs
subnet_flow_log = ec2.create_flow_logs(
ResourceIds=[private_subnet_id], # replace with your subnet ID
ResourceType='Subnet',
TrafficType='ALL',
LogGroupName=‘MyApplicationSubnetFlowLogs',
DeliverLogsPermissionArn=role_arn
)
# Create a security group
sg = ec2.create_security_group(GroupName=‘my_application_security_group',
Description=‘My Application VPC security group',
VpcId=vpc_id)
sg_id = sg['GroupId']
# Allow all outbound traffic
try:
ec2.authorize_security_group_egress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': '-1',
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}
]
)
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidPermission.Duplicate':
print("The rule already exists, skipping...")
else:
raise # re-raise the exception if it's a different error
logger.info(f'Created security group: {sg_id}')
# Create elastic IPs
allocation = ec2.allocate_address(Domain='vpc')
allocation_id = allocation['AllocationId']
logger.info(f'Created Elastic IP: {allocation_id}')
# Create NAT Gateway
# Create a NAT Gateway and specify the Elastic IP allocation ID
# Replace 'subnet_id' with the ID of the subnet in which you want to create the NAT Gateway
nat_gateway = ec2.create_nat_gateway(SubnetId=public_subnet_id, AllocationId=allocation_id)
nat_gateway_id = nat_gateway["NatGateway"]["NatGatewayId"]
# Wait until the NAT Gateway is available
nat_gateway_waiter = ec2.get_waiter('nat_gateway_available')
try:
nat_gateway_waiter.wait(NatGatewayIds=[nat_gateway_id])
logger.info('NAT Gateway is available')
except WaiterError as e:
response = ec2.describe_nat_gateways(NatGatewayIds=[nat_gateway_id])
logger.exception(response)
raise e
# Create a route table
route_table = ec2.create_route_table(VpcId=vpc_id)
route_table_id = route_table['RouteTable']['RouteTableId']
logger.info(f'Created Route Table: {route_table_id}')
# Wait until the route table is created
while True:
response = ec2.describe_route_tables(RouteTableIds=[route_table_id])
if response['RouteTables']:
logger.info(f'Route Table is available: {route_table_id}')
break
else:
logger.info(f'Waiting for Route Table to become available: {route_table_id}')
time.sleep(5) # wait for 5 seconds before checking again
response = ec2.create_route(
RouteTableId=route_table['RouteTable']['RouteTableId'],
DestinationCidrBlock='0.0.0.0/0',
NatGatewayId=nat_gateway_id,
# GatewayId=internet_gateway_id
)
logger.info(f"Added route to Internet Gateway {internet_gateway_id} in Route Table {route_table_id}")
# Associate the route table with the private subnet
ec2.associate_route_table(
RouteTableId=route_table_id,
SubnetId=private_subnet_id
)
logger.info(f"Associated Route Table {route_table_id} with private subnet {public_subnet_id}")
# Create VPC endpoints
# Create a security group for the VPC endpoints
response = ec2.create_security_group(GroupName='VPC_Endpoint_SG',
Description='Security group for VPC endpoint',
VpcId=vpc_id)
vpc_endpoint_security_group_id = response['GroupId']
# Allow all inbound traffic within the VPC
ec2.authorize_security_group_ingress(
GroupId=vpc_endpoint_security_group_id,
IpPermissions=[
{'IpProtocol': '-1',
'IpRanges': [{'CidrIp': '10.0.0.0/16'}]} # Replace with your VPC CIDR
]
)
# Allow outbound traffic to the VPC CIDR (which includes the VPC endpoint)
ec2.authorize_security_group_egress(
GroupId=vpc_endpoint_security_group_id,
IpPermissions=[
{'IpProtocol': '-1',
'IpRanges': [{'CidrIp': '10.0.0.0/16'}]} # Replace with your VPC CIDR
]
)
# Create a VPC endpoint for S3
s3_endpoint = ec2.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=f'com.amazonaws.{region}.s3',
RouteTableIds=[route_table_id],
VpcEndpointType='Gateway',
PrivateDnsEnabled=False
)
logger.info(f'Created S3 VPC endpoint: {s3_endpoint["VpcEndpoint"]["VpcEndpointId"]}')
# Create a VPC endpoint for Glue
glue_endpoint = ec2.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=f'com.amazonaws.{region}.glue',
SubnetIds=[private_subnet_id],
VpcEndpointType='Interface',
PrivateDnsEnabled=True,
SecurityGroupIds=[vpc_endpoint_security_group_id]
)
logger.info(f'Created Glue VPC endpoint: {glue_endpoint["VpcEndpoint"]["VpcEndpointId"]}')
# Create a VPC endpoint for Secrets Manager
secrets_manager_endpoint = ec2.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=f'com.amazonaws.{region}.secretsmanager',
SubnetIds=[private_subnet_id],
VpcEndpointType='Interface',
PrivateDnsEnabled=True,
SecurityGroupIds=[vpc_endpoint_security_group_id]
)
logger.info(f'Created Secrets Manager VPC endpoint: {secrets_manager_endpoint["VpcEndpoint"]["VpcEndpointId"]}')
# Create a VPC endpoint for KMS
kms_endpoint = ec2.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=f'com.amazonaws.{region}.kms',
SubnetIds=[private_subnet_id],
VpcEndpointType='Interface',
PrivateDnsEnabled=True,
SecurityGroupIds=[vpc_endpoint_security_group_id]
)
logger.info(f'Created KMS VPC endpoint: {kms_endpoint["VpcEndpoint"]["VpcEndpointId"]}')
# Create a VPC endpoint for CloudWatch Logs
cloudwatch_logs_endpoint = ec2.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=f'com.amazonaws.{region}.logs',
SubnetIds=[private_subnet_id],
VpcEndpointType='Interface',
PrivateDnsEnabled=True,
SecurityGroupIds=[vpc_endpoint_security_group_id]
)
logger.info(f'Created CloudWatch Logs VPC endpoint: {cloudwatch_logs_endpoint["VpcEndpoint"]["VpcEndpointId"]}')
# Create a VPC endpoint for Step Functions
step_functions_endpoint = ec2.create_vpc_endpoint(
VpcId=vpc_id,
ServiceName=f'com.amazonaws.{region}.states',
SubnetIds=[private_subnet_id],
VpcEndpointType='Interface',
PrivateDnsEnabled=True,
SecurityGroupIds=[vpc_endpoint_security_group_id]
)
logger.info(f'Created Step Functions VPC endpoint: {step_functions_endpoint["VpcEndpoint"]["VpcEndpointId"]}')
# Wait until the VPC endpoints are available
while True:
response = ec2.describe_vpc_endpoints(VpcEndpointIds=[s3_endpoint["VpcEndpoint"]["VpcEndpointId"],
glue_endpoint["VpcEndpoint"]["VpcEndpointId"],
secrets_manager_endpoint["VpcEndpoint"][
"VpcEndpointId"],
kms_endpoint["VpcEndpoint"]["VpcEndpointId"],
cloudwatch_logs_endpoint["VpcEndpoint"][
"VpcEndpointId"],
step_functions_endpoint["VpcEndpoint"][
"VpcEndpointId"]])
if all(endpoint['State'] == 'available' for endpoint in response['VpcEndpoints']):
logger.info('VPC endpoints are available')
break
else:
logger.info('Waiting for VPC endpoints to become available')
time.sleep(5) # wait for 5 seconds before checking again
# Attach the VPC configuration to the EMR Serverless application
# Replace 'EmrApplicationId' with the ID of your EMR Serverless application
response = emr_serverless.update_application(
applicationId=application_id,
networkConfiguration={
'subnetIds': [public_subnet_id, private_subnet_id],
'securityGroupIds': [sg_id]
}
)
logger.info(f'Updated EMR Serverless application: {response}')
except ClientError as e:
logger.exception("Failure with botocore", exc_info=e)
raise eEditor is loading...
Leave a Comment