10  Appendix: Code re-cap

Our final code will now look like this:

10.0.1 my-container-infra.py

import os
import aws_cdk as cdk
from aws_cdk import (
    aws_cloudwatch as cw,
    aws_ec2 as ec2,
    aws_sns as sns,
    aws_sns_subscriptions as snssubs,
)
import cdk_monitoring_constructs as cdkmon
import containers
import monitoring

app = cdk.App()
env = cdk.Environment(
    account=os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
)
stack = cdk.Stack(app, "my-container-infra", env=env)

vpcname = app.node.try_get_context("vpcname")
if vpcname:
    vpc = ec2.Vpc.from_lookup(stack, "vpc", vpc_name=vpcname)
else:
    vpc = ec2.Vpc(stack, "vpc", vpc_name="my-vpc", nat_gateways=1, max_azs=2)
config = containers.ClusterConfig(vpc=vpc)
cluster = containers.add_cluster(stack, "my-test-cluster", config)

taskconfig: containers.TaskConfig = {
    "cpu": 512,
    "memory_limit_mib": 1024,
    "family": "webapp",
}
containerconfig: containers.ContainerConfig = {
    "image": "public.ecr.aws/aws-containers/hello-app-runner:latest",
    "tcp_ports": [8000],
}
taskdef = containers.add_task_definition_with_container(
    stack, f"taskdef-{taskconfig['family']}", taskconfig, containerconfig
)

service = containers.add_service(
    stack, f"service-{taskconfig['family']}", cluster, taskdef, 8000, 2, True
)

containers.set_service_scaling(
    service=service.service,
    config=containers.ServiceScalingConfig(
        min_count=1,
        max_count=4,
        scale_cpu_target=containers.ScalingThreshold(percent=50),
        scale_memory_target=containers.ScalingThreshold(percent=70),
    ),
)


alarm_topic = sns.Topic(stack, 'alarm-topic', display_name='Alarm topic')

monitoring_config = monitoring.MonitoringConfig(dashboard_name='monitoring', default_alarm_topic=alarm_topic)
mon = monitoring.init_monitoring(stack, monitoring_config)

mon["handler"].add_medium_header("Test App monitoring")
mon["handler"].monitor_fargate_service(
    fargate_service=service,
    human_readable_name="My test service",
)

mon["handler"].monitor_fargate_service(
    fargate_service=service,
    human_readable_name='My test service',
    add_running_task_count_alarm={
        'alarm1': cdkmon.RunningTaskCountThreshold(
            max_running_tasks=2,
            comparison_operator_override=cw.ComparisonOperator.LESS_THAN_THRESHOLD,
            evaluation_periods=2,
            datapoints_to_alarm=2,
            period=cdk.Duration.minutes(5),
        )
    })

alarm_email = 'hello@example.com'
alarm_topic.add_subscription(snssubs.EmailSubscription(alarm_email))

app.synth()

10.0.2 containers.py

from typing import Literal, TypedDict, List, NotRequired  # noqa
import constructs as cons
from aws_cdk import (
    aws_ec2 as ec2,
    aws_ecs as ecs,
    aws_ecs_patterns as ecspat,
    aws_logs as logs,
)


class TaskConfig(TypedDict):
    cpu: Literal[256, 512, 1024, 2048, 4096]
    memory_limit_mib: int
    family: str


class ContainerConfig(TypedDict):
    image: str
    tcp_ports: List[int]


def add_task_definition_with_container(
    scope: cons.Construct,
    id: str,
    task_config: TaskConfig,
    container_config: ContainerConfig,
) -> ecs.FargateTaskDefinition:
    taskdef = ecs.FargateTaskDefinition(
        scope,
        id,
        cpu=task_config["cpu"],
        memory_limit_mib=task_config["memory_limit_mib"],
        family=task_config["family"],
    )

    logdriver = ecs.LogDrivers.aws_logs(
        stream_prefix=taskdef.family,
        log_retention=logs.RetentionDays.ONE_DAY,
    )
    image = ecs.ContainerImage.from_registry(container_config["image"])
    image_id = f"container-{_extract_image_name(container_config['image'])}"
    containerdef = taskdef.add_container(image_id, image=image, logging=logdriver)

    for port in container_config["tcp_ports"]:
        containerdef.add_port_mappings(
            ecs.PortMapping(container_port=port, protocol=ecs.Protocol.TCP)
        )

    return taskdef


def add_service(
    scope: cons.Construct,
    id: str,
    cluster: ecs.Cluster,
    taskdef: ecs.FargateTaskDefinition,
    port: int,
    desired_count: int,
    use_public_endpoint: bool = True,
    service_name: str | None = None,
) -> ecspat.ApplicationLoadBalancedFargateService:
    service = ecspat.ApplicationLoadBalancedFargateService(
        scope,
        id,
        cluster=cluster,
        task_definition=taskdef,
        listener_port=port,
        desired_count=desired_count,
        service_name=service_name,
        circuit_breaker=ecs.DeploymentCircuitBreaker(
            rollback=True,
        ),
        public_load_balancer=use_public_endpoint,
    )
    return service

class ClusterConfig(TypedDict):
    vpc: ec2.IVpc
    enable_container_insights: NotRequired[bool]

def add_cluster(scope: cons.Construct, id: str, config: ClusterConfig) -> ecs.Cluster:
    return ecs.Cluster(scope, id, vpc=config["vpc"], container_insights=config.get("enable_container_insights", None))


def _extract_image_name(image_ref):
    name_with_tag = image_ref.split("/")[-1]
    name = name_with_tag.split(":")[0]
    return name


class ScalingThreshold(TypedDict):
    percent: float


class ServiceScalingConfig(TypedDict):
    min_count: int
    max_count: int
    scale_cpu_target: ScalingThreshold
    scale_memory_target: ScalingThreshold


def set_service_scaling(service: ecs.FargateService, config: ServiceScalingConfig):
    scaling = service.auto_scale_task_count(
        max_capacity=config["max_count"], min_capacity=config["min_count"]
    )
    scaling.scale_on_cpu_utilization(
        "CpuScaling", target_utilization_percent=config["scale_cpu_target"]["percent"]
    )
    scaling.scale_on_memory_utilization(
        "MemoryScaling",
        target_utilization_percent=config["scale_memory_target"]["percent"],
    )

10.0.3 monitoring.py

from typing import NotRequired, TypedDict
from constructs import Construct
import aws_cdk as cdk
from aws_cdk import (
    aws_sns as sns,
)
import cdk_monitoring_constructs as cdkmon


class MonitoringConfig(TypedDict):
    dashboard_name: str
    default_alarm_topic: NotRequired[sns.ITopic]
    default_alarm_name_prefix: NotRequired[str]


class MonitoringContext(TypedDict):
    handler: cdkmon.MonitoringFacade
    default_alarm_topic: NotRequired[sns.ITopic]
    default_alarm_name_prefix: NotRequired[str]



def init_monitoring(scope: Construct, config: MonitoringConfig) -> MonitoringContext:
    sns_alarm_strategy = cdkmon.NoopAlarmActionStrategy()
    if config.get("default_alarm_topic"):
        sns_alarm_strategy = cdkmon.SnsAlarmActionStrategy(on_alarm_topic=config.get("default_alarm_topic"))
    default_alarm_name_prefix = config.get("default_alarm_name_prefix")
    if default_alarm_name_prefix is None:
        default_alarm_name_prefix = config["dashboard_name"]
    return MonitoringContext(
        handler=cdkmon.MonitoringFacade(
            scope,
            config["dashboard_name"],
            alarm_factory_defaults=cdkmon.AlarmFactoryDefaults(
                actions_enabled=True,
                action=sns_alarm_strategy,
                alarm_name_prefix=default_alarm_name_prefix
            )),
            default_alarm_topic=config.get("default_alarm_topic"),
            default_alarm_name_prefix=default_alarm_name_prefix)

10.0.4 containers_test.py

import pytest
import aws_cdk as cdk
from aws_cdk import (
    aws_ec2 as ec2,
    aws_ecs as ecs,
    assertions,
)
import containers


def test_ecs_cluster_defined_with_existing_vpc():
    stack = cdk.Stack()
    vpc = ec2.Vpc(stack, "vpc")
    config = containers.ClusterConfig(vpc=vpc)
    cluster = containers.add_cluster(stack, "my-test-cluster", config)

    template = assertions.Template.from_stack(stack)
    template.resource_count_is("AWS::ECS::Cluster", 1)
    assert cluster.vpc is vpc


def test_check_that_container_insights_become_enabled():
    stack = cdk.Stack()
    vpc = ec2.Vpc(stack, "vpc")
    config = containers.ClusterConfig(vpc=vpc, enable_container_insights=True)
    containers.add_cluster(stack, "test-cluster", config)

    template = assertions.Template.from_stack(stack)

    template.has_resource_properties('AWS::ECS::Cluster', {
        'ClusterSettings': assertions.Match.array_with(
            pattern=[
                assertions.Match.object_equals(pattern={
                    'Name': 'containerInsights',
                    'Value': 'enabled'
                })
            ]
        )
    })

def test_ecs_fargate_task_definition_defined():
    stack = cdk.Stack()
    cpuval = 512
    memval = 1024
    familyval = "test"
    taskcfg: containers.TaskConfig = {
        "cpu": cpuval,
        "memory_limit_mib": memval,
        "family": familyval,
    }
    image = "public.ecr.aws/aws-containers/hello-app-runner:latest"
    containercfg: containers.ContainerConfig = {"image": image, "tcp_ports": [8000]}
    taskdef = containers.add_task_definition_with_container(
        stack, f"taskdef-{taskcfg['family']}", taskcfg, containercfg
    )

    assert taskdef.is_fargate_compatible
    assert taskdef in stack.node.children

    template = assertions.Template.from_stack(stack)
    template.resource_count_is("AWS::ECS::TaskDefinition", 1)
    template.has_resource_properties(
        "AWS::ECS::TaskDefinition",
        {
            "RequiresCompatibilities": ["FARGATE"],
            "Cpu": str(cpuval),
            "Memory": str(memval),
            "Family": familyval,
        },
    )


def test_container_definition_added_to_task_definition():
    stack = cdk.Stack()
    cpuval = 512
    memval = 1024
    familyval = "test"
    taskcfg: containers.TaskConfig = {
        "cpu": cpuval,
        "memory_limit_mib": memval,
        "family": familyval,
    }
    image_name = "public.ecr.aws/aws-containers/hello-app-runner:latest"
    containercfg: containers.ContainerConfig = {
        "image": image_name,
        "tcp_ports": [8000],
    }

    taskdef = containers.add_task_definition_with_container(
        stack, "test-taskdef", taskcfg, containercfg
    )

    template = assertions.Template.from_stack(stack)
    containerdef: ecs.ContainerDefinition = taskdef.default_container  # type: ignore

    assert containerdef is not None
    assert containerdef.image_name == image_name

    template.has_resource_properties(
        "AWS::ECS::TaskDefinition",
        {
            "ContainerDefinitions": assertions.Match.array_with(
                [assertions.Match.object_like({"Image": image_name})]
            )
        },
    )


@pytest.fixture
def service_test_input_data():
    stack = cdk.Stack()
    vpc = ec2.Vpc(stack, "vpc")
    config=containers.ClusterConfig(vpc=vpc)
    cluster = containers.add_cluster(stack, "test-cluster", config)
    cpuval = 512
    memval = 1024
    familyval = "test"
    taskcfg: containers.TaskConfig = {
        "cpu": cpuval,
        "memory_limit_mib": memval,
        "family": familyval,
    }
    image_name = "public.ecr.aws/aws-containers/hello-app-runner:latest"
    containercfg: containers.ContainerConfig = {
        "image": image_name,
        "tcp_ports": [8000],
    }

    taskdef = containers.add_task_definition_with_container(
        stack, "test-taskdef", taskcfg, containercfg
    )
    return {"stack": stack, "cluster": cluster, "task_definition": taskdef}


def test_fargate_service_created_with_only_mandatory_properties(
    service_test_input_data,
):
    stack = service_test_input_data["stack"]
    cluster = service_test_input_data["cluster"]
    taskdef = service_test_input_data["task_definition"]

    port = 80
    desired_count = 1

    service = containers.add_service(
        stack, "test-service", cluster, taskdef, port, desired_count
    )

    sg_capture = assertions.Capture()
    template = assertions.Template.from_stack(stack)

    assert service.cluster == cluster
    assert service.task_definition == taskdef

    template.resource_count_is("AWS::ECS::Service", 1)
    template.has_resource_properties(
        "AWS::ECS::Service",
        {
            "DesiredCount": desired_count,
            "LaunchType": "FARGATE",
            "NetworkConfiguration": assertions.Match.object_like(
                {
                    "AwsvpcConfiguration": assertions.Match.object_like(
                        {
                            "AssignPublicIp": "DISABLED",
                            "SecurityGroups": assertions.Match.array_with([sg_capture]),
                        }
                    )
                }
            ),
        },
    )

    template.resource_count_is("AWS::ElasticLoadBalancingV2::LoadBalancer", 1)
    template.has_resource_properties(
        "AWS::ElasticLoadBalancingV2::LoadBalancer",
        {"Type": "application", "Scheme": "internet-facing"},
    )

    template.has_resource_properties(
        "AWS::EC2::SecurityGroup",
        {
            "SecurityGroupIngress": assertions.Match.array_with(
                [
                    assertions.Match.object_like(
                        {"CidrIp": "0.0.0.0/0", "FromPort": port, "IpProtocol": "tcp"}
                    )
                ]
            )
        },
    )


def test_fargate_service_created_without_public_access(service_test_input_data):
    stack = service_test_input_data["stack"]
    cluster = service_test_input_data["cluster"]
    taskdef = service_test_input_data["task_definition"]

    port = 80
    desired_count = 1
    containers.add_service(
        stack, "test-service", cluster, taskdef, port, desired_count, False
    )

    template = assertions.Template.from_stack(stack)
    template.resource_count_is("AWS::ElasticLoadBalancingV2::LoadBalancer", 1)
    template.has_resource_properties(
        "AWS::ElasticLoadBalancingV2::LoadBalancer",
        {"Type": "application", "Scheme": "internal"},
    )


def test_scaling_settings_for_service(service_test_input_data):
    stack = service_test_input_data["stack"]
    cluster = service_test_input_data["cluster"]
    taskdef = service_test_input_data["task_definition"]
    port = 80
    desired_count = 2

    service = containers.add_service(
        stack, "test-service", cluster, taskdef, port, desired_count, False
    )

    config = containers.ServiceScalingConfig(
        min_count=1,
        max_count=5,
        scale_cpu_target=containers.ScalingThreshold(percent=50),
        scale_memory_target=containers.ScalingThreshold(percent=50),
    )
    containers.set_service_scaling(service=service.service, config=config)

    scale_resource = assertions.Capture()
    template = assertions.Template.from_stack(stack)
    template.resource_count_is("AWS::ApplicationAutoScaling::ScalableTarget", 1)
    template.has_resource_properties(
        "AWS::ApplicationAutoScaling::ScalableTarget",
        {
            "MaxCapacity": config["max_count"],
            "MinCapacity": config["min_count"],
            "ResourceId": scale_resource,
            "ScalableDimension": "ecs:service:DesiredCount",
            "ServiceNamespace": "ecs",
        },
    )

    template.resource_count_is("AWS::ApplicationAutoScaling::ScalingPolicy", 2)
    template.has_resource_properties(
        "AWS::ApplicationAutoScaling::ScalingPolicy",
        {
            "PolicyType": "TargetTrackingScaling",
            "TargetTrackingScalingPolicyConfiguration": assertions.Match.object_like(
                {
                    "PredefinedMetricSpecification": assertions.Match.object_equals(
                        {"PredefinedMetricType": "ECSServiceAverageCPUUtilization"}
                    ),
                    "TargetValue": config["scale_cpu_target"]["percent"],
                }
            ),
        },
    )
    template.has_resource_properties(
        "AWS::ApplicationAutoScaling::ScalingPolicy",
        {
            "PolicyType": "TargetTrackingScaling",
            "TargetTrackingScalingPolicyConfiguration": assertions.Match.object_like(
                {
                    "PredefinedMetricSpecification": assertions.Match.object_equals(
                        {"PredefinedMetricType": "ECSServiceAverageMemoryUtilization"}
                    ),
                    "TargetValue": config["scale_memory_target"]["percent"],
                }
            ),
        },
    )

10.0.5 monitoring_test.py

import pytest
import aws_cdk as cdk
from aws_cdk import (
    assertions,
    aws_ec2 as ec2,
    aws_sns as sns,
)
import monitoring as mon


def test_init_monitoring_of_stack_with_defaults():
    stack = cdk.Stack()

    config = mon.MonitoringConfig(dashboard_name="test-monitoring")
    mon.init_monitoring(stack, config)
    template = assertions.Template.from_stack(stack)
    print(template)
    template.resource_count_is("AWS::CloudWatch::Dashboard", 1)
    template.has_resource_properties(
        "AWS::CloudWatch::Dashboard", {"DashboardName": config["dashboard_name"]}
    )

def test_init_monitoring_of_stack_with_sns_alarm_topic():
    stack = cdk.Stack()
    ec2.Vpc(stack, 'vpc')
    alarm_topic = sns.Topic(stack, 'alarm-topic')

    monitoring_config = mon.MonitoringConfig(
        dashboard_name='test-monitoring',
        default_alarm_topic=alarm_topic  # type: ignore
    )

    monitoring = mon.init_monitoring(stack, config=monitoring_config)
    assert(monitoring.get("default_alarm_topic") == monitoring_config.get("default_alarm_topic"))
    assert(monitoring.get("default_alarm_name_prefix") == monitoring_config.get("dashboard_name"))