10 Appendix: Code re-cap
Our final code will now look like this:
10.0.1 my-container-infra.py
import os
import aws_cdk as cdk
from aws_cdk import (
as cw,
aws_cloudwatch as ec2,
aws_ec2 as sns,
aws_sns as snssubs,
aws_sns_subscriptions
)import cdk_monitoring_constructs as cdkmon
import containers
import monitoring
= cdk.App()
app = cdk.Environment(
env =os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
account
)= cdk.Stack(app, "my-container-infra", env=env)
stack
= app.node.try_get_context("vpcname")
vpcname if vpcname:
= ec2.Vpc.from_lookup(stack, "vpc", vpc_name=vpcname)
vpc else:
= ec2.Vpc(stack, "vpc", vpc_name="my-vpc", nat_gateways=1, max_azs=2)
vpc = containers.ClusterConfig(vpc=vpc)
config = containers.add_cluster(stack, "my-test-cluster", config)
cluster
= {
taskconfig: containers.TaskConfig "cpu": 512,
"memory_limit_mib": 1024,
"family": "webapp",
}= {
containerconfig: containers.ContainerConfig "image": "public.ecr.aws/aws-containers/hello-app-runner:latest",
"tcp_ports": [8000],
}= containers.add_task_definition_with_container(
taskdef f"taskdef-{taskconfig['family']}", taskconfig, containerconfig
stack,
)
= containers.add_service(
service f"service-{taskconfig['family']}", cluster, taskdef, 8000, 2, True
stack,
)
containers.set_service_scaling(=service.service,
service=containers.ServiceScalingConfig(
config=1,
min_count=4,
max_count=containers.ScalingThreshold(percent=50),
scale_cpu_target=containers.ScalingThreshold(percent=70),
scale_memory_target
),
)
= sns.Topic(stack, 'alarm-topic', display_name='Alarm topic')
alarm_topic
= monitoring.MonitoringConfig(dashboard_name='monitoring', default_alarm_topic=alarm_topic)
monitoring_config = monitoring.init_monitoring(stack, monitoring_config)
mon
"handler"].add_medium_header("Test App monitoring")
mon["handler"].monitor_fargate_service(
mon[=service,
fargate_service="My test service",
human_readable_name
)
"handler"].monitor_fargate_service(
mon[=service,
fargate_service='My test service',
human_readable_name={
add_running_task_count_alarm'alarm1': cdkmon.RunningTaskCountThreshold(
=2,
max_running_tasks=cw.ComparisonOperator.LESS_THAN_THRESHOLD,
comparison_operator_override=2,
evaluation_periods=2,
datapoints_to_alarm=cdk.Duration.minutes(5),
period
)
})
= 'hello@example.com'
alarm_email
alarm_topic.add_subscription(snssubs.EmailSubscription(alarm_email))
app.synth()
10.0.2 containers.py
from typing import Literal, TypedDict, List, NotRequired # noqa
import constructs as cons
from aws_cdk import (
as ec2,
aws_ec2 as ecs,
aws_ecs as ecspat,
aws_ecs_patterns as logs,
aws_logs
)
class TaskConfig(TypedDict):
256, 512, 1024, 2048, 4096]
cpu: Literal[int
memory_limit_mib: str
family:
class ContainerConfig(TypedDict):
str
image: int]
tcp_ports: List[
def add_task_definition_with_container(
scope: cons.Construct,id: str,
task_config: TaskConfig,
container_config: ContainerConfig,-> ecs.FargateTaskDefinition:
) = ecs.FargateTaskDefinition(
taskdef
scope,id,
=task_config["cpu"],
cpu=task_config["memory_limit_mib"],
memory_limit_mib=task_config["family"],
family
)
= ecs.LogDrivers.aws_logs(
logdriver =taskdef.family,
stream_prefix=logs.RetentionDays.ONE_DAY,
log_retention
)= ecs.ContainerImage.from_registry(container_config["image"])
image = f"container-{_extract_image_name(container_config['image'])}"
image_id = taskdef.add_container(image_id, image=image, logging=logdriver)
containerdef
for port in container_config["tcp_ports"]:
containerdef.add_port_mappings(=port, protocol=ecs.Protocol.TCP)
ecs.PortMapping(container_port
)
return taskdef
def add_service(
scope: cons.Construct,id: str,
cluster: ecs.Cluster,
taskdef: ecs.FargateTaskDefinition,int,
port: int,
desired_count: bool = True,
use_public_endpoint: str | None = None,
service_name: -> ecspat.ApplicationLoadBalancedFargateService:
) = ecspat.ApplicationLoadBalancedFargateService(
service
scope,id,
=cluster,
cluster=taskdef,
task_definition=port,
listener_port=desired_count,
desired_count=service_name,
service_name=ecs.DeploymentCircuitBreaker(
circuit_breaker=True,
rollback
),=use_public_endpoint,
public_load_balancer
)return service
class ClusterConfig(TypedDict):
vpc: ec2.IVpcbool]
enable_container_insights: NotRequired[
def add_cluster(scope: cons.Construct, id: str, config: ClusterConfig) -> ecs.Cluster:
return ecs.Cluster(scope, id, vpc=config["vpc"], container_insights=config.get("enable_container_insights", None))
def _extract_image_name(image_ref):
= image_ref.split("/")[-1]
name_with_tag = name_with_tag.split(":")[0]
name return name
class ScalingThreshold(TypedDict):
float
percent:
class ServiceScalingConfig(TypedDict):
int
min_count: int
max_count:
scale_cpu_target: ScalingThreshold
scale_memory_target: ScalingThreshold
def set_service_scaling(service: ecs.FargateService, config: ServiceScalingConfig):
= service.auto_scale_task_count(
scaling =config["max_count"], min_capacity=config["min_count"]
max_capacity
)
scaling.scale_on_cpu_utilization("CpuScaling", target_utilization_percent=config["scale_cpu_target"]["percent"]
)
scaling.scale_on_memory_utilization("MemoryScaling",
=config["scale_memory_target"]["percent"],
target_utilization_percent )
10.0.3 monitoring.py
from typing import NotRequired, TypedDict
from constructs import Construct
import aws_cdk as cdk
from aws_cdk import (
as sns,
aws_sns
)import cdk_monitoring_constructs as cdkmon
class MonitoringConfig(TypedDict):
str
dashboard_name:
default_alarm_topic: NotRequired[sns.ITopic]str]
default_alarm_name_prefix: NotRequired[
class MonitoringContext(TypedDict):
handler: cdkmon.MonitoringFacade
default_alarm_topic: NotRequired[sns.ITopic]str]
default_alarm_name_prefix: NotRequired[
def init_monitoring(scope: Construct, config: MonitoringConfig) -> MonitoringContext:
= cdkmon.NoopAlarmActionStrategy()
sns_alarm_strategy if config.get("default_alarm_topic"):
= cdkmon.SnsAlarmActionStrategy(on_alarm_topic=config.get("default_alarm_topic"))
sns_alarm_strategy = config.get("default_alarm_name_prefix")
default_alarm_name_prefix if default_alarm_name_prefix is None:
= config["dashboard_name"]
default_alarm_name_prefix return MonitoringContext(
=cdkmon.MonitoringFacade(
handler
scope,"dashboard_name"],
config[=cdkmon.AlarmFactoryDefaults(
alarm_factory_defaults=True,
actions_enabled=sns_alarm_strategy,
action=default_alarm_name_prefix
alarm_name_prefix
)),=config.get("default_alarm_topic"),
default_alarm_topic=default_alarm_name_prefix) default_alarm_name_prefix
10.0.4 containers_test.py
import pytest
import aws_cdk as cdk
from aws_cdk import (
as ec2,
aws_ec2 as ecs,
aws_ecs
assertions,
)import containers
def test_ecs_cluster_defined_with_existing_vpc():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc = containers.ClusterConfig(vpc=vpc)
config = containers.add_cluster(stack, "my-test-cluster", config)
cluster
= assertions.Template.from_stack(stack)
template "AWS::ECS::Cluster", 1)
template.resource_count_is(assert cluster.vpc is vpc
def test_check_that_container_insights_become_enabled():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc = containers.ClusterConfig(vpc=vpc, enable_container_insights=True)
config "test-cluster", config)
containers.add_cluster(stack,
= assertions.Template.from_stack(stack)
template
'AWS::ECS::Cluster', {
template.has_resource_properties('ClusterSettings': assertions.Match.array_with(
=[
pattern={
assertions.Match.object_equals(pattern'Name': 'containerInsights',
'Value': 'enabled'
})
]
)
})
def test_ecs_fargate_task_definition_defined():
= cdk.Stack()
stack = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image = {"image": image, "tcp_ports": [8000]}
containercfg: containers.ContainerConfig = containers.add_task_definition_with_container(
taskdef f"taskdef-{taskcfg['family']}", taskcfg, containercfg
stack,
)
assert taskdef.is_fargate_compatible
assert taskdef in stack.node.children
= assertions.Template.from_stack(stack)
template "AWS::ECS::TaskDefinition", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ECS::TaskDefinition",
{"RequiresCompatibilities": ["FARGATE"],
"Cpu": str(cpuval),
"Memory": str(memval),
"Family": familyval,
},
)
def test_container_definition_added_to_task_definition():
= cdk.Stack()
stack = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image_name = {
containercfg: containers.ContainerConfig "image": image_name,
"tcp_ports": [8000],
}
= containers.add_task_definition_with_container(
taskdef "test-taskdef", taskcfg, containercfg
stack,
)
= assertions.Template.from_stack(stack)
template = taskdef.default_container # type: ignore
containerdef: ecs.ContainerDefinition
assert containerdef is not None
assert containerdef.image_name == image_name
template.has_resource_properties("AWS::ECS::TaskDefinition",
{"ContainerDefinitions": assertions.Match.array_with(
"Image": image_name})]
[assertions.Match.object_like({
)
},
)
@pytest.fixture
def service_test_input_data():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc =containers.ClusterConfig(vpc=vpc)
config= containers.add_cluster(stack, "test-cluster", config)
cluster = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image_name = {
containercfg: containers.ContainerConfig "image": image_name,
"tcp_ports": [8000],
}
= containers.add_task_definition_with_container(
taskdef "test-taskdef", taskcfg, containercfg
stack,
)return {"stack": stack, "cluster": cluster, "task_definition": taskdef}
def test_fargate_service_created_with_only_mandatory_properties(
service_test_input_data,
):= service_test_input_data["stack"]
stack = service_test_input_data["cluster"]
cluster = service_test_input_data["task_definition"]
taskdef
= 80
port = 1
desired_count
= containers.add_service(
service "test-service", cluster, taskdef, port, desired_count
stack,
)
= assertions.Capture()
sg_capture = assertions.Template.from_stack(stack)
template
assert service.cluster == cluster
assert service.task_definition == taskdef
"AWS::ECS::Service", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ECS::Service",
{"DesiredCount": desired_count,
"LaunchType": "FARGATE",
"NetworkConfiguration": assertions.Match.object_like(
{"AwsvpcConfiguration": assertions.Match.object_like(
{"AssignPublicIp": "DISABLED",
"SecurityGroups": assertions.Match.array_with([sg_capture]),
}
)
}
),
},
)
"AWS::ElasticLoadBalancingV2::LoadBalancer", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ElasticLoadBalancingV2::LoadBalancer",
"Type": "application", "Scheme": "internet-facing"},
{
)
template.has_resource_properties("AWS::EC2::SecurityGroup",
{"SecurityGroupIngress": assertions.Match.array_with(
[
assertions.Match.object_like("CidrIp": "0.0.0.0/0", "FromPort": port, "IpProtocol": "tcp"}
{
)
]
)
},
)
def test_fargate_service_created_without_public_access(service_test_input_data):
= service_test_input_data["stack"]
stack = service_test_input_data["cluster"]
cluster = service_test_input_data["task_definition"]
taskdef
= 80
port = 1
desired_count
containers.add_service("test-service", cluster, taskdef, port, desired_count, False
stack,
)
= assertions.Template.from_stack(stack)
template "AWS::ElasticLoadBalancingV2::LoadBalancer", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ElasticLoadBalancingV2::LoadBalancer",
"Type": "application", "Scheme": "internal"},
{
)
def test_scaling_settings_for_service(service_test_input_data):
= service_test_input_data["stack"]
stack = service_test_input_data["cluster"]
cluster = service_test_input_data["task_definition"]
taskdef = 80
port = 2
desired_count
= containers.add_service(
service "test-service", cluster, taskdef, port, desired_count, False
stack,
)
= containers.ServiceScalingConfig(
config =1,
min_count=5,
max_count=containers.ScalingThreshold(percent=50),
scale_cpu_target=containers.ScalingThreshold(percent=50),
scale_memory_target
)=service.service, config=config)
containers.set_service_scaling(service
= assertions.Capture()
scale_resource = assertions.Template.from_stack(stack)
template "AWS::ApplicationAutoScaling::ScalableTarget", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ApplicationAutoScaling::ScalableTarget",
{"MaxCapacity": config["max_count"],
"MinCapacity": config["min_count"],
"ResourceId": scale_resource,
"ScalableDimension": "ecs:service:DesiredCount",
"ServiceNamespace": "ecs",
},
)
"AWS::ApplicationAutoScaling::ScalingPolicy", 2)
template.resource_count_is(
template.has_resource_properties("AWS::ApplicationAutoScaling::ScalingPolicy",
{"PolicyType": "TargetTrackingScaling",
"TargetTrackingScalingPolicyConfiguration": assertions.Match.object_like(
{"PredefinedMetricSpecification": assertions.Match.object_equals(
"PredefinedMetricType": "ECSServiceAverageCPUUtilization"}
{
),"TargetValue": config["scale_cpu_target"]["percent"],
}
),
},
)
template.has_resource_properties("AWS::ApplicationAutoScaling::ScalingPolicy",
{"PolicyType": "TargetTrackingScaling",
"TargetTrackingScalingPolicyConfiguration": assertions.Match.object_like(
{"PredefinedMetricSpecification": assertions.Match.object_equals(
"PredefinedMetricType": "ECSServiceAverageMemoryUtilization"}
{
),"TargetValue": config["scale_memory_target"]["percent"],
}
),
}, )
10.0.5 monitoring_test.py
import pytest
import aws_cdk as cdk
from aws_cdk import (
assertions,as ec2,
aws_ec2 as sns,
aws_sns
)import monitoring as mon
def test_init_monitoring_of_stack_with_defaults():
= cdk.Stack()
stack
= mon.MonitoringConfig(dashboard_name="test-monitoring")
config
mon.init_monitoring(stack, config)= assertions.Template.from_stack(stack)
template print(template)
"AWS::CloudWatch::Dashboard", 1)
template.resource_count_is(
template.has_resource_properties("AWS::CloudWatch::Dashboard", {"DashboardName": config["dashboard_name"]}
)
def test_init_monitoring_of_stack_with_sns_alarm_topic():
= cdk.Stack()
stack 'vpc')
ec2.Vpc(stack, = sns.Topic(stack, 'alarm-topic')
alarm_topic
= mon.MonitoringConfig(
monitoring_config ='test-monitoring',
dashboard_name=alarm_topic # type: ignore
default_alarm_topic
)
= mon.init_monitoring(stack, config=monitoring_config)
monitoring assert(monitoring.get("default_alarm_topic") == monitoring_config.get("default_alarm_topic"))
assert(monitoring.get("default_alarm_name_prefix") == monitoring_config.get("dashboard_name"))