8 More test driven development
In this chapter, we are going to expand on the service in ECS we have already defined in the previous chapter and make it a load-balanced service that can automatically scale out and scale in, based on load. We are going to use a test-driven design (TDD) approach to figure out what we want to do here.
8.1 A recap
First, a quick re-cap what we have done so far:
- We are using AWS CDK to define an infrastructure, using Python
- This infrastructure uses an existing VPC (Virtual Private Cloud - network infrastructure)
- We run a container-based solution using AWS Elastic Container Service (ECS)
- We have an ECS task definition that defines the container properties to use
- We have an ECS service definition that uses this task definition, plus a desired state description, so ECS will make sure that our solution matches that desired state.
We can expose the container publicly and connect to the web application that we have used so far to test the infrastructure building blocks.
So far, we have three code files we have worked with:
- my-container-infra.py - the main program file
- containers.py - where most of our container handling logic lives
- containers_test.py - The test code we have for our container management logic
The code (in Python) to this point is below here:
my-container-infra.py
import os
import aws_cdk as cdk
from aws_cdk import (
as ec2,
aws_ec2
)import containers
= cdk.App()
app = cdk.Environment(
env =os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
account
)= cdk.Stack(app, "my-container-infra", env=env)
stack
= ec2.Vpc.from_lookup(stack, "vpc", is_default=True)
vpc
= containers.add_cluster(stack, "my-test-cluster", vpc)
cluster
= {
taskconfig: containers.TaskConfig "cpu": 512,
"memory_limit_mib": 1024,
"family": "webapp",
}= {
containerconfig: containers.ContainerConfig "image": "public.ecr.aws/aws-containers/hello-app-runner:latest",
}= containers.add_task_definition_with_container(
taskdef f"taskdef-{taskconfig['family']}", taskconfig, containerconfig
stack,
)
containers.add_service(f"service-{taskconfig['family']}", cluster, taskdef, 8000, 0, True
stack,
)
app.synth()
containers.py
from typing import Literal, TypedDict # noqa
import constructs as cons
from aws_cdk import (
as ec2,
aws_ec2 as ecs,
aws_ecs as logs,
aws_logs
)
class TaskConfig(TypedDict):
256, 512, 1024, 2048, 4096]
cpu: Literal[int
memory_limit_mib: str
family:
class ContainerConfig(TypedDict):
str
image:
def add_task_definition_with_container(
scope: cons.Construct,id: str,
task_config: TaskConfig,
container_config: ContainerConfig,-> ecs.FargateTaskDefinition:
) = ecs.FargateTaskDefinition(
taskdef
scope,id,
=task_config["cpu"],
cpu=task_config["memory_limit_mib"],
memory_limit_mib=task_config["family"],
family
)
= ecs.LogDrivers.aws_logs(
logdriver =taskdef.family,
stream_prefix=logs.RetentionDays.ONE_DAY,
log_retention
)= ecs.ContainerImage.from_registry(container_config["image"])
image = f"container-{_extract_image_name(container_config['image'])}"
image_id =image, logging=logdriver)
taskdef.add_container(image_id, image
return taskdef
def add_service(
scope: cons.Construct,id: str,
cluster: ecs.Cluster,
taskdef: ecs.FargateTaskDefinition,int,
port: int,
desired_count: bool = False,
assign_public_ip: str = None,
service_name: -> ecs.FargateService:
) = service_name if service_name else ""
name = ec2.SecurityGroup(
sg
scope,f"{id}-security-group",
=f"security group for service {name}",
description=cluster.vpc,
vpc
)
sg.add_ingress_rule(ec2.Peer.any_ipv4(), ec2.Port.tcp(port))
= ecs.FargateService(
service
scope,id,
=cluster,
cluster=taskdef,
task_definition=desired_count,
desired_count=service_name,
service_name=[sg],
security_groups=ecs.DeploymentCircuitBreaker(
circuit_breaker=True,
rollback
),=assign_public_ip,
assign_public_ip
)return service
def add_cluster(scope: cons.Construct, id: str, vpc: ec2.IVpc) -> ecs.Cluster:
return ecs.Cluster(scope, id, vpc=vpc)
def _extract_image_name(image_ref):
= image_ref.split("/")[-1]
name_with_tag = name_with_tag.split(":")[0]
name return name
containers_test.py
import aws_cdk as cdk
from aws_cdk import (
as ec2,
aws_ec2 as ecs,
aws_ecs
assertions,
)import containers
def test_ecs_cluster_defined_with_existing_vpc():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc = containers.add_cluster(stack, "my-test-cluster", vpc=vpc)
cluster
= assertions.Template.from_stack(stack)
template "AWS::ECS::Cluster", 1)
template.resource_count_is(assert cluster.vpc is vpc
def test_ecs_fargate_task_definition_defined():
= cdk.Stack()
stack = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image = {"image": image}
containercfg: containers.ContainerConfig = containers.add_task_definition_with_container(
taskdef f"taskdef-{taskcfg['family']}", taskcfg, containercfg
stack,
)
assert taskdef.is_fargate_compatible
assert taskdef in stack.node.children
= assertions.Template.from_stack(stack)
template "AWS::ECS::TaskDefinition", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ECS::TaskDefinition",
{"RequiresCompatibilities": ["FARGATE"],
"Cpu": str(cpuval),
"Memory": str(memval),
"Family": familyval,
},
)
def test_container_definition_added_to_task_definition():
= cdk.Stack()
stack = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image_name = {"image": image_name}
containercfg: containers.ContainerConfig
= containers.add_task_definition_with_container(
taskdef "test-taskdef", taskcfg, containercfg
stack,
)
= assertions.Template.from_stack(stack)
template = taskdef.default_container # type: ignore
containerdef: ecs.ContainerDefinition
assert containerdef is not None
assert containerdef.image_name == image_name
template.has_resource_properties("AWS::ECS::TaskDefinition",
{"ContainerDefinitions": assertions.Match.array_with(
"Image": image_name})]
[assertions.Match.object_like({
)
},
)
def test_fargate_service_created_with_only_mandatory_properties():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc = containers.add_cluster(stack, "test-cluster", vpc=vpc)
cluster = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image_name = {"image": image_name}
containercfg: containers.ContainerConfig
= containers.add_task_definition_with_container(
taskdef "test-taskdef", taskcfg, containercfg
stack,
)
= 80
port = 1
desired_count
= containers.add_service(
service "test-service", cluster, taskdef, port, desired_count
stack,
)
= assertions.Capture()
sg_capture = assertions.Template.from_stack(stack)
template
assert service.cluster == cluster
assert service.task_definition == taskdef
"AWS::ECS::Service", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ECS::Service",
{"DesiredCount": desired_count,
"LaunchType": "FARGATE",
"NetworkConfiguration": assertions.Match.object_like(
{"AwsvpcConfiguration": assertions.Match.object_like(
{"AssignPublicIp": "DISABLED",
"SecurityGroups": assertions.Match.array_with([sg_capture]),
}
)
}
),
},
)
template.has_resource_properties("AWS::EC2::SecurityGroup",
{"SecurityGroupIngress": assertions.Match.array_with(
[
assertions.Match.object_like("CidrIp": "0.0.0.0/0", "FromPort": port, "IpProtocol": "tcp"}
{
)
]
)
}, )
8.2 What we will do now
In the previous chapter, there were some points about our design that we should address, which include:
- Specifying a port number to access the service through add_service is right now fine for a single container instance only. For multiple containers (desiredCount > 1) there would need to be a load balancer.
- The design opens for traffic from everywhere, regardless of whether it uses public or private IP addresses
We should also add some mechanisms to scale the solution based on some load characteristics.
The points above tie in with each other. If we put the container service behind a load balancer, we want to restrict access so that it only goes via the load balancer. We also want to have a suitable number of containers running, based on the load on the service.
So we have a few tasks:
- Add a load balancer in front of a service, when we set up a service in ECS
- Make sure access is restricted to go via load balancer
- Add scalability configuration for containers behind load balancer
We can break these up in more detailed steps later.
8.3 The trouble with default VPCs
Until now, our infrastructure experiments have assumed that there is a VPC in place, and we have also for simplicity used the default VPC, which is something most AWS accounts will have.
A problem with the default VPC is that there are only public subnets in that VPC. The reason for that is backward compatibility with the old EC2 Classic, back in those days when you did not need an explicit VPC to run your virtual machines in AWS. This is my guess anyway.
Either way, it is a shortcoming of this VPC. To enable us to use a different VPC with both private and public subnets, we will add a small code segment to optionally add a VPC in the stack. You can also point to an existing VPC that has both public and private subnets.
= app.node.try_get_context("vpcname")
vpcname if vpcname:
= ec2.Vpc.from_lookup(stack, "vpc", vpc_name=vpcname)
vpc else:
= ec2.Vpc(stack, "vpc", vpc_name="my-vpc", nat_gateways=1, max_azs=2) vpc
Option one is to provide the name of an existing VPC, and in that case, we use that. If not VPC name is provided, we create a new VPC.
The created VPC will have both public and private subnets, will use two availability zones and have a shared NAT gateway for outbound traffic from private subnets. This is strictly to keep costs down.
If you have an existing VPC to use, you can provide the name of that VPC through a command line option:
cdk synth -c vpcname=thenameofmyvpc
This approach to sending contextual data to the AWS CDK App can be quite useful. The key takeaway for us here is that we can have a VPC with both private and public subnets.
8.4 Starting from the tests - TDD a load-balanced service
Let us start by looking at our tests. We currently have one test for the add_service function, see below. There is a fair amount of set-up work here.
def test_fargate_service_created_with_only_mandatory_properties():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc = containers.add_cluster(stack, "test-cluster", vpc=vpc)
cluster = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image_name = {"image": image_name}
containercfg: containers.ContainerConfig
= containers.add_task_definition_with_container(
taskdef "test-taskdef", taskcfg, containercfg
stack,
)
= 80
port = 1
desired_count
= containers.add_service(
service "test-service", cluster, taskdef, port, desired_count
stack,
)
= assertions.Capture()
sg_capture = assertions.Template.from_stack(stack)
template
assert service.cluster == cluster
assert service.task_definition == taskdef
"AWS::ECS::Service", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ECS::Service",
{"DesiredCount": desired_count,
"LaunchType": "FARGATE",
"NetworkConfiguration": assertions.Match.object_like(
{"AwsvpcConfiguration": assertions.Match.object_like(
{"AssignPublicIp": "DISABLED",
"SecurityGroups": assertions.Match.array_with([sg_capture]),
}
)
}
),
},
)
template.has_resource_properties("AWS::EC2::SecurityGroup",
{"SecurityGroupIngress": assertions.Match.array_with(
[
assertions.Match.object_like("CidrIp": "0.0.0.0/0", "FromPort": port, "IpProtocol": "tcp"}
{
)
]
)
}, )
My suspicion at this point is that we may need multiple tests for the service part. Each test would have will have had the same setup work to do. With that in mind, I think we could refactor the service test into a group of tests that all run the same setup. Later, if we add more service tests, most of the set-up work will be in place already. Luckily, the pytest test framework we use with our AWS CDK Python code has support for that, using fixtures. The refactored code looks like this:
@pytest.fixture
def service_test_input_data():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc = containers.add_cluster(stack, "test-cluster", vpc=vpc)
cluster = 512
cpuval = 1024
memval = 'test'
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image_name = {"image": image_name}
containercfg: containers.ContainerConfig
= containers.add_task_definition_with_container(
taskdef "test-taskdef", taskcfg, containercfg
stack,
)return { "stack": stack, "cluster": cluster, "task_definition": taskdef}
def test_fargate_service_created_with_only_mandatory_properties(service_test_input_data):
= service_test_input_data["stack"]
stack = service_test_input_data["cluster"]
cluster = service_test_input_data["task_definition"]
taskdef
= 80
port = 1
desired_count
= containers.add_service(
service "test-service", cluster, taskdef, port, desired_count
stack,
)
= assertions.Capture()
sg_capture = assertions.Template.from_stack(stack)
template
assert service.cluster == cluster
assert service.task_definition == taskdef
"AWS::ECS::Service", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ECS::Service",
{"DesiredCount": desired_count,
"LaunchType": "FARGATE",
"NetworkConfiguration": assertions.Match.object_like(
{"AwsvpcConfiguration": assertions.Match.object_like(
{"AssignPublicIp": "DISABLED",
"SecurityGroups": assertions.Match.array_with([sg_capture]),
}
)
}
),
},
)
template.has_resource_properties("AWS::EC2::SecurityGroup",
{"SecurityGroupIngress": assertions.Match.array_with(
[
assertions.Match.object_like("CidrIp": "0.0.0.0/0", "FromPort": port, "IpProtocol": "tcp"}
{
)
]
)
}, )
When you change the code, run uv run pytest to make sure that the tests still work.
Next, let us think about what we want to do. We want to add a load balancer in front of our container service, so that we can scale it to a suitable amount and set meaningful desired count larger than 1.
So we are going to practice some test-driven development to update our add_service() function.
Since we should have a load balanced service, it would make sense to test that we get a load balancer in place. The underlying CloudFormation would generate a resource called AWS::ElasticLoadBalancerV2::LoadBalancer. So we can add a test that there will be such a resource after we have called our add_service function.
How do we implement this? We are trying to keep things simple, and fortunately AWS CDK has a construct that fits what we need well, called ApplicationLoadBalancedFargateService. This should set up a load balancer and create the service behind it, which is just what we want. We already have a task definition for the container we want to run as a service and this construct can accept that as input, which is just great!
Another change we can make to our function besides the name change is the optional parameter assign_public_ip. This is not relevant for a load balancer, since we will not have a specific IP address for the load balancer. But the underlying purpose of that parameter is that we should be able to tell if the endpoint we expose should be public (available through the internet) or private (only reachable in our own network). So we can also change the name of this parameter to better reflect our intent, instead of what it should do. So we can rename it to use_public_endpoint instead, for example.
So now we have a new test and some new code to support this, but if we run uv run pytest now, we get a failure! Our new construct will create security groups under the hood for us, and there will be security groups for the load balancer itself, as well as for the targets it will redirect traffic to. So our previous test for a single security group is no longer valid. We might not even care exactly how many service groups are created, as long as the functionality they are supposed to handle is in place (only allow specific network traffic to selected resources). We can also reasonably assume that those who have built ApplicationLoadBalancedFargateService have tested out that code well enough that we should not test their implementation. So we can skip our test for a single security group we had before.
So we adjust the test and we also adjust the underlying implementation of add_service, since we do not need to care about managing the security groups ourselves, at this point in time at least.
Now if we run uv run pytest, we see the test succeed! Is that good enough? Actually, we should also be clear what ports to map, so we should also include some port mapping information. The load balancer port and the container ports may not be the same. This may be suitable to include in the container configuration for the container itself.
Finally, let us also make sure we have a test for the use_public_endpoint parameter. Here, one relatively simple way to check is if the CloudFormation load balancer resource has its scheme set to internal if use_public_endpoint
is false, and it should be internet-facing if use_public_endpoint is true.
Now we have a new set of code, see below:
containers_test.py
import pytest
import aws_cdk as cdk
from aws_cdk import (
as ec2,
aws_ec2 as ecs,
aws_ecs
assertions,
)import containers
def test_ecs_cluster_defined_with_existing_vpc():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc = containers.add_cluster(stack, "my-test-cluster", vpc=vpc)
cluster
= assertions.Template.from_stack(stack)
template "AWS::ECS::Cluster", 1)
template.resource_count_is(assert cluster.vpc is vpc
def test_ecs_fargate_task_definition_defined():
= cdk.Stack()
stack = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image = {"image": image, "tcp_ports": [8080]}
containercfg: containers.ContainerConfig = containers.add_task_definition_with_container(
taskdef f"taskdef-{taskcfg['family']}", taskcfg, containercfg
stack,
)
assert taskdef.is_fargate_compatible
assert taskdef in stack.node.children
= assertions.Template.from_stack(stack)
template "AWS::ECS::TaskDefinition", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ECS::TaskDefinition",
{"RequiresCompatibilities": ["FARGATE"],
"Cpu": str(cpuval),
"Memory": str(memval),
"Family": familyval,
},
)
def test_container_definition_added_to_task_definition():
= cdk.Stack()
stack = 512
cpuval = 1024
memval = "test"
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image_name = {"image": image_name, "tcp_ports": [8080]}
containercfg: containers.ContainerConfig
= containers.add_task_definition_with_container(
taskdef "test-taskdef", taskcfg, containercfg
stack,
)
= assertions.Template.from_stack(stack)
template = taskdef.default_container # type: ignore
containerdef: ecs.ContainerDefinition
assert containerdef is not None
assert containerdef.image_name == image_name
template.has_resource_properties("AWS::ECS::TaskDefinition",
{"ContainerDefinitions": assertions.Match.array_with(
"Image": image_name})]
[assertions.Match.object_like({
)
},
)
@pytest.fixture
def service_test_input_data():
= cdk.Stack()
stack = ec2.Vpc(stack, "vpc")
vpc = containers.add_cluster(stack, "test-cluster", vpc=vpc)
cluster = 512
cpuval = 1024
memval = 'test'
familyval = {
taskcfg: containers.TaskConfig "cpu": cpuval,
"memory_limit_mib": memval,
"family": familyval,
}= "public.ecr.aws/aws-containers/hello-app-runner:latest"
image_name = {"image": image_name, "tcp_ports": [8080]}
containercfg: containers.ContainerConfig
= containers.add_task_definition_with_container(
taskdef "test-taskdef", taskcfg, containercfg
stack,
)return { "stack": stack, "cluster": cluster, "task_definition": taskdef}
def test_fargate_service_created_with_only_mandatory_properties(service_test_input_data):
= service_test_input_data["stack"]
stack = service_test_input_data["cluster"]
cluster = service_test_input_data["task_definition"]
taskdef
= 80
port = 1
desired_count
= containers.add_service(
service "test-service", cluster, taskdef, port, desired_count
stack,
)
= assertions.Capture()
sg_capture = assertions.Template.from_stack(stack)
template
assert service.cluster == cluster
assert service.task_definition == taskdef
"AWS::ECS::Service", 1)
template.resource_count_is(
template.has_resource_properties("AWS::ECS::Service",
{"DesiredCount": desired_count,
"LaunchType": "FARGATE",
"NetworkConfiguration": assertions.Match.object_like(
{"AwsvpcConfiguration": assertions.Match.object_like(
{"AssignPublicIp": "DISABLED",
"SecurityGroups": assertions.Match.array_with([sg_capture]),
}
)
}
),
},
)
'AWS::ElasticLoadBalancingV2::LoadBalancer', 1)
template.resource_count_is('AWS::ElasticLoadBalancingV2::LoadBalancer', {
template.has_resource_properties('Type': 'application',
'Scheme': 'internet-facing'
})
template.has_resource_properties("AWS::EC2::SecurityGroup",
{"SecurityGroupIngress": assertions.Match.array_with(
[
assertions.Match.object_like("CidrIp": "0.0.0.0/0", "FromPort": port, "IpProtocol": "tcp"}
{
)
]
)
},
)
def test_fargate_service_created_without_public_access(service_test_input_data):
= service_test_input_data["stack"]
stack = service_test_input_data["cluster"]
cluster = service_test_input_data["task_definition"]
taskdef
= 80
port = 1
desired_count 'test-service', cluster, taskdef, port, desired_count, False)
containers.add_service(stack,
= assertions.Template.from_stack(stack)
template 'AWS::ElasticLoadBalancingV2::LoadBalancer', 1)
template.resource_count_is('AWS::ElasticLoadBalancingV2::LoadBalancer', {
template.has_resource_properties('Type': 'application',
'Scheme': 'internal'
})
containers.py
from typing import Literal, TypedDict, List # noqa
import constructs as cons
from aws_cdk import (
as ec2,
aws_ec2 as ecs,
aws_ecs as ecspat,
aws_ecs_patterns as logs,
aws_logs
)
class TaskConfig(TypedDict):
256, 512, 1024, 2048, 4096]
cpu: Literal[int
memory_limit_mib: str
family:
class ContainerConfig(TypedDict):
str
image: int]
tcp_ports: List[
def add_task_definition_with_container(
scope: cons.Construct,id: str,
task_config: TaskConfig,
container_config: ContainerConfig,-> ecs.FargateTaskDefinition:
) = ecs.FargateTaskDefinition(
taskdef
scope,id,
=task_config["cpu"],
cpu=task_config["memory_limit_mib"],
memory_limit_mib=task_config["family"],
family
)
= ecs.LogDrivers.aws_logs(
logdriver =taskdef.family,
stream_prefix=logs.RetentionDays.ONE_DAY,
log_retention
)= ecs.ContainerImage.from_registry(container_config["image"])
image = f"container-{_extract_image_name(container_config['image'])}"
image_id = taskdef.add_container(image_id, image=image, logging=logdriver)
containerdef
for port in container_config["tcp_ports"]:
=port, protocol=ecs.Protocol.TCP))
containerdef.add_port_mappings(ecs.PortMapping(container_port
return taskdef
def add_service(
scope: cons.Construct,id: str,
cluster: ecs.Cluster,
taskdef: ecs.FargateTaskDefinition,int,
port: int,
desired_count: bool = True,
use_public_endpoint: str | None = None,
service_name: -> ecspat.ApplicationLoadBalancedFargateService:
) = ecspat.ApplicationLoadBalancedFargateService(
service
scope,id,
=cluster,
cluster=taskdef,
task_definition=port,
listener_port=desired_count,
desired_count=service_name,
service_name=ecs.DeploymentCircuitBreaker(
circuit_breaker=True,
rollback
),=use_public_endpoint,
public_load_balancer
)return service
def add_cluster(scope: cons.Construct, id: str, vpc: ec2.IVpc) -> ecs.Cluster:
return ecs.Cluster(scope, id, vpc=vpc)
def _extract_image_name(image_ref):
= image_ref.split("/")[-1]
name_with_tag = name_with_tag.split(":")[0]
name return name
my-container-infra.py
import os
import aws_cdk as cdk
from aws_cdk import (
as ec2,
aws_ec2
)import containers
= cdk.App()
app = cdk.Environment(
env =os.getenv("CDK_DEFAULT_ACCOUNT"), region=os.getenv("CDK_DEFAULT_REGION")
account
)= cdk.Stack(app, "my-container-infra", env=env)
stack
= app.node.try_get_context("vpcname")
vpcname if vpcname:
= ec2.Vpc.from_lookup(stack, "vpc", vpc_name=vpcname)
vpc else:
= ec2.Vpc(stack, "vpc", vpc_name="my-vpc", nat_gateways=1, max_azs=2)
vpc
= containers.add_cluster(stack, "my-test-cluster", vpc)
cluster
= {
taskconfig: containers.TaskConfig "cpu": 512,
"memory_limit_mib": 1024,
"family": "webapp",
}= {
containerconfig: containers.ContainerConfig "image": "public.ecr.aws/aws-containers/hello-app-runner:latest",
"tcp_ports": [8000],
}= containers.add_task_definition_with_container(
taskdef f"taskdef-{taskconfig['family']}", taskconfig, containerconfig
stack,
)
containers.add_service(f"service-{taskconfig['family']}", cluster, taskdef, 8000, 2, True
stack,
)
app.synth()
Run npm test and make sure all the tests pass. You can also to deploy the solution now, using cdk deploy. Expect the deployment to take some time, especially if you deploy everything from scratch and if you have it set up to create a new VPC.
You notice that when the deployment is completed, there will be two outputs representing the load balancer endpoint. This results from using ApplicationLoadBalancedFargateService, which adds these outputs by default to the stack.
8.5 A bit of auto-scaling
If we have a load balancer in front of our service to distribute the load among multiple containers, it would make sense to scale up and down how many containers will handle that load. Fortunately, after some investigation, we can see that the FargateService class has a function called autoScaleTaskCount, which allows us to set the minimum and maximum number of tasks. It also allows us to specify thresholds for CPU and memory to determine if the service should scale down or up the number of tasks running in the service.
There is no corresponding function on ApplicationLoadBalancedFargateService though, but we can access the underlying FargateService using a service property on ApplicationLoadBalancedFargateService.
Unfortunately, we cannot access all these scaling settings on FargateService once we have set them, so to verify that these settings have been set, we would resort to check CloudFormation resources. In this case, it is resources of the ApplicationAutoScaling that is used by CloudFormation.
Frankly, I am hesitant about the value of some of these tests, since this may be borderline to test the CDK implementation rather than testing our own logic. I kept them here more to be a learning experience. Even with these tests, we would want to do some actual integration tests to see that the real behaviour once deployed is what we expect. There is some experimental work in AWS CDK for building integration tests, but that is out of scope for this article.
For now, we can at least know that there are some settings in place for the auto-scaling, but more work would be needed to test its actual behaviour.
Extract from containers_test.py
def test_scaling_settings_for_service(service_test_input_data):
= service_test_input_data['stack']
stack = service_test_input_data['cluster']
cluster = service_test_input_data['task_definition']
taskdef = 80
port = 2
desired_count
= containers.add_service(stack, 'test-service', cluster, taskdef, port, desired_count, False)
service
= containers.ServiceScalingConfig(
config =1,
min_count=5,
max_count=containers.ScalingThreshold(percent=50),
scale_cpu_target=containers.ScalingThreshold(percent=50))
scale_memory_target=service.service, config=config)
containers.set_service_scaling(service
= assertions.Capture()
scale_resource = assertions.Template.from_stack(stack)
template 'AWS::ApplicationAutoScaling::ScalableTarget', 1)
template.resource_count_is('AWS::ApplicationAutoScaling::ScalableTarget', {
template.has_resource_properties('MaxCapacity': config["max_count"],
'MinCapacity': config["min_count"],
'ResourceId': scale_resource,
'ScalableDimension': 'ecs:service:DesiredCount',
'ServiceNamespace': 'ecs'
})
'AWS::ApplicationAutoScaling::ScalingPolicy', 2)
template.resource_count_is('AWS::ApplicationAutoScaling::ScalingPolicy', {
template.has_resource_properties('PolicyType': 'TargetTrackingScaling',
'TargetTrackingScalingPolicyConfiguration': assertions.Match.object_like({
'PredefinedMetricSpecification': assertions.Match.object_equals({
'PredefinedMetricType': 'ECSServiceAverageCPUUtilization'
}),'TargetValue': config["scale_cpu_target"]["percent"]
})
})'AWS::ApplicationAutoScaling::ScalingPolicy', {
template.has_resource_properties('PolicyType': 'TargetTrackingScaling',
'TargetTrackingScalingPolicyConfiguration': assertions.Match.object_like({
'PredefinedMetricSpecification': assertions.Match.object_equals({
'PredefinedMetricType': 'ECSServiceAverageMemoryUtilization'
}),'TargetValue': config["scale_memory_target"]["percent"]
}) })
Extract from containers.py
class ScalingThreshold(TypedDict):
float
percent:
class ServiceScalingConfig(TypedDict):
int
min_count: int
max_count:
scale_cpu_target: ScalingThreshold
scale_memory_target: ScalingThreshold
def set_service_scaling(service: ecs.FargateService, config: ServiceScalingConfig):
= service.auto_scale_task_count(max_capacity=config["max_count"], min_capacity=config["min_count"])
scaling 'CpuScaling', target_utilization_percent=config["scale_cpu_target"]["percent"])
scaling.scale_on_cpu_utilization('MemoryScaling', target_utilization_percent=config["scale_memory_target"]["percent"]) scaling.scale_on_memory_utilization(
Extract from my-container-infra.py
class ScalingThreshold(TypedDict):
float
percent:
class ServiceScalingConfig(TypedDict):
int
min_count: int
max_count:
scale_cpu_target: ScalingThreshold
scale_memory_target: ScalingThreshold
def set_service_scaling(service: ecs.FargateService, config: ServiceScalingConfig):
= service.auto_scale_task_count(max_capacity=config["max_count"], min_capacity=config["min_count"])
scaling 'CpuScaling', target_utilization_percent=config["scale_cpu_target"]["percent"])
scaling.scale_on_cpu_utilization('MemoryScaling', target_utilization_percent=config["scale_memory_target"]["percent"]) scaling.scale_on_memory_utilization(
You can run uv run pytest to make sure all tests pass, and run cdk deploy to see that it also deploys ok.
Congratulations, you have now set up something that is fairly similar to what we initially set up with AppRunner!
As you can see, this was definitely more details to handle, even though we had a fair amount of help from AWS CDK itself also.