当前位置:Java -> 在 Kubernetes 上自动扩展 DynamoDB 流应用
这篇博客文章演示了如何在Kubernetes上自动扩展您的DynamoDB Streams消费者应用程序。您将使用一个使用DynamoDB Streams Kinesis适配器库来消费DynamoDB表中的变更数据事件的Java应用程序。它将部署到Amazon EKS集群,并将使用KEDA自动进行扩展。
该应用程序包括一个实现了com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
的代码,它负责从DynamoDB流中处理数据并将其复制到另一个(目标)DynamoDB表-这只是一个示例。我们将使用AWS CLI来向DynamoDB流生成数据,并观察应用程序的扩展情况。
代码可在GitHub存储库中获取。
Amazon DynamoDB是一个完全托管的数据库服务,提供快速和可预测的性能以及无缝的可伸缩性。使用DynamoDB Streams,您可以利用变更数据捕获(CDC)实时获取DynamoDB表数据变更的通知。这使得可以轻松构建应用程序以对基础数据库的更改做出反应,而无需复杂的轮询或查询。
DynamoDB为变更数据捕获提供了两种流模型:
使用Kinesis Data Streams,可以捕获任何DynamoDB表中的项目级修改,并将其复制到Kinesis数据流。而DynamoDB Streams捕获了任何DynamoDB表中项目级修改的时间顺序序列,并将此信息存储在日志中,持续时间最长可达24小时。
我们将利用原生DynamoDB Streams功能。即使使用了DynamoDB Streams,当涉及到消费更改数据事件时,有多种选项可供选择:
我们的应用程序将利用DynamoDB Streams以及Kinesis客户端库(KCL)适配器库1.x来消费DynamoDB表的变更数据事件。
Kinesis客户端库确保每个分片都有一个记录处理器在运行和处理该分片数据。KCL帮助处理了许多与分布式计算和可伸缩性相关的复杂任务。它连接到数据流,枚举数据流中的分片,并使用租约来将分片关联到其消费者应用程序。
对每个它管理的分片实例化一个记录处理器。KCL从数据流中获取数据记录,将记录推送到相应的记录处理器,并对已处理的记录进行检查点处理。更重要的是,当工作实例数发生变化或数据流重新分片时(分片被拆分或合并),它会平衡分片-工作器关联(租约)。这意味着只需添加更多实例,就能自动平衡分片到各个实例,从而能够扩展您的DynamoDB Streams应用程序。
但是,当负载增加时,仍然需要一种方式来扩展应用程序。当然,您可以手动操作或构建自定义解决方案来完成此操作。
这就是KEDA发挥作用的地方。
KEDA
是一个基于Kubernetes的事件驱动自动缩放组件,可以监视DynamoDB Streams等事件源,并根据需要处理的事件数量来扩展底层的Deployment
(和Pod
)。它构建在诸如Horizontal Pod Autoscaler等原生Kubernetes原语之上,可以添加到任何Kubernetes集群中。以下是其主要组件的高级概述(您可以参考KEDA文档进行深入了解):
KEDA
中的keda-operator-metrics-apiserver
组件充当Kubernetes指标服务器,为Horizontal Pod Autoscaler公开度量标准。keda-operator
组件的作用是激活和注销Deployment
,即从零缩放。你将看到< a rel =“noopener noreferrer” target=“_ blank”> DynamoDB Streams scaler正在动作,它是根据DynamoDB Stream的分片数量而扩展的。
现在让我们继续这个教程的实际部分。
除了AWS账户外,您还需要安装 < a rel =“noopener noreferrer” target=“_ blank”> AWS CLI ,< a rel =“noopener noreferrer” target=“_ blank”> kubectl 和< a> Docker 。
有多种方法可以创建< a rel =“noopener noreferrer” target=“_ blank”> Amazon EKS集群 。我更喜欢使用< a> eksctl CLI,因为它提供的便利。使用< code> eksctl 创建EKS集群可能就像这样简单:
eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>
< br />
有关详细信息,请参阅< a rel =“noopener noreferrer” target=“_ blank”>开始使用Amazon EKS - eksctl。
创建启用了流的DynamoDB表来持久保存应用程序数据并访问更改数据源。您可以使用AWS CLI使用以下命令创建表:
aws dynamodb create-table \
--table-name users \
--attribute-definitions AttributeName=email,AttributeType=S \
--key-schema AttributeName=email,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
< br />
我们需要创建另一个表,它将用作第一个表的副本。
aws dynamodb create-table \
--table-name users_replica \
--attribute-definitions AttributeName=email,AttributeType=S \
--key-schema AttributeName=email,KeyType=HASH \
--billing-mode PAY_PER_REQUEST
< br />
克隆此GitHub存储库并将其更改为正确的目录:
git clone https://github.com/abhirockzz/dynamodb-streams-keda-autoscale
cd dynamodb-streams-keda-autoscale
< br />
好的,让我们开始吧!
本教程中,您将使用< a rel =“noopener noreferrer” target=“_ blank”>YAML文件来部署 KEDA
,但您也可以使用< a rel =“noopener noreferrer” target=“_ blank”>Helm图表。
安装 KEDA
:
# update version 2.8.2 if required
kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml
< br />
验证安装:
# check Custom Resource Definitions
kubectl get crd
# check KEDA Deployments
kubectl get deployment -n keda
# check KEDA operator logs
kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='{.items[0].metadata.name}' -n keda) -n keda
< br />
KEDA运算符以及DynamoDB流消费者应用程序需要调用AWS API。由于两者将作为EKS中的< code> Deployment 运行,因此我们将使用< a rel =“noopener noreferrer” target=“_ blank”>服务帐户的IAM角色(IRSA)提供所需的权限。
在我们特定的情况下:
将您的AWS帐户ID和OIDC身份提供者设置为环境变量:
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
#update the cluster name and region as required
export EKS_CLUSTER_NAME=demo-eks-cluster
export AWS_REGION=us-east-1
OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")
< br />
创建包含角色的受信任实体的< code> JSON 文件:
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
"${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator"
}
}
}
]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust_keda.json
< br />
现在,创建IAM角色并附加策略(查看< code> policy_dynamodb_streams_keda.json 文件以获取详细信息):
export ROLE_NAME=keda-operator-dynamodb-streams-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for DynamoDB streams KEDA scaler on EKS"
aws iam create-policy --policy-name keda-dynamodb-streams-policy --policy-document file://policy_dynamodb_streams_keda.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-dynamodb-streams-policy
< br />
关联IAM角色和服务帐户:
kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/keda-operator -n keda
< br />
您需要重新启动< code> KEDA 运算符< code> Deployment 才能使其生效:
kubectl rollout restart deployment.apps/keda-operator -n keda
# to verify, confirm that the KEDA operator has the right environment variables
kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath={.items..metadata.name}) | grep "^\s*AWS_"
# expected output
AWS_STS_REGIONAL_ENDPOINTS: regional
AWS_DEFAULT_REGION: us-east-1
AWS_REGION: us-east-1
AWS_ROLE_ARN: arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-dynamodb-streams-role
AWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
< br />
首先创建一个Kubernetes服务帐户:
kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
name: dynamodb-streams-consumer-app-sa
EOF
< br />
创建包含角色的受信任实体的< code> JSON 文件:
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
"${OIDC_PROVIDER}:sub": "system:serviceaccount:default:dynamodb-streams-consumer-app-sa"
}
}
}
]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust.json
< br />
现在,创建IAM角色并附加策略。更新< code> policy.json 文件,并输入区域和AWS帐户详细信息。
export ROLE_NAME=dynamodb-streams-consumer-app-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for DynamoDB Streams consumer app on EKS"
aws iam create-policy --policy-name dynamodb-streams-consumer-app-policy --policy-document file://policy.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/dynamodb-streams-consumer-app-policy
< br />
关联IAM角色和服务帐户:
kubectl annotate serviceaccount -n default dynamodb-streams-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}
# verify the annotation
kubectl describe serviceaccount/dynamodb-streams-consumer-app-sa
< br />
核心基础设施现在准备就绪。让我们准备并部署消费者应用程序。
我们首先需要构建Docker镜像并将其推送到ECR(您可以参考< code> Dockerfile 以获取详细信息)。
# create runnable JAR file
mvn clean compile assembly\:single
# build docker image
docker build -t dynamodb-streams-consumer-app .
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
# create a private ECR repo
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
aws ecr create-repository --repository-name dynamodb-streams-consumer-app --region us-east-1
# tag and push the image
docker tag dynamodb-streams-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-consumer-app:latest
docker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-consumer-app:latest
< br />
更新< code> consumer.yaml 以包括您刚刚推送到ECR的Docker镜像和源表的DynamoDB流的ARN。清单的其余部分保持不变。
要检索流的ARN,请运行以下命令:
aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn'
< br />
consumer.yaml
Deployment
清单如下:
apiVersion: apps/v1
kind: Deployment
metadata:
name: dynamodb-streams-kcl-consumer-app
spec:
replicas: 1
selector:
matchLabels:
app: dynamodb-streams-kcl-consumer-app
template:
metadata:
labels:
app: dynamodb-streams-kcl-consumer-app
spec:
serviceAccountName: dynamodb-streams-kcl-consumer-app-sa
containers:
- name: dynamodb-streams-kcl-consumer-app
image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-kcl-consumer-app:latest
imagePullPolicy: Always
env:
- name: TARGET_TABLE_NAME
value: users_replica
- name: APPLICATION_NAME
value: dynamodb-streams-kcl-app-demo
- name: SOURCE_TABLE_STREAM_ARN
value: <enter ARN>
- name: AWS_REGION
value: us-east-1
- name: INSTANCE_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
< br />
创建 Deployment
:
kubectl apply -f consumer.yaml
# verify Pod transition to Running state
kubectl get pods -w
< br />
现在您已经部署了消费者应用程序,KCL适配器库应该开始工作。它将首先在DynamoDB中创建一个“控制表” - 它应与应用程序的名称相同(在本例中为< code> dynamodb-streams-kcl-app-demo )。
初始协调和表的创建可能需要几分钟的时间。您可以查看消费者应用程序的日志以查看进展。
kubectl logs -f $(kubectl get po -l=app=dynamodb-streams-kcl-consumer-app --output=jsonpath={.items..metadata.name})
完成租约分配后,请检查表并注意leaseOwner
属性:
aws dynamodb describe-table --table-name dynamodb-streams-kcl-app-demo
现在,您已部署消费者应用程序,让我们向源DynamoDB表(users
)添加数据。
您可以使用producer.sh
脚本。
export export TABLE_NAME=users
./producer.sh
检查消费者日志以查看消息的处理情况:
kubectl logs -f $(kubectl get po -l=app=dynamodb-streams-kcl-consumer-app --output=jsonpath={.items..metadata.name})
检查目标表(users_replica
)以确认DynamoDB流消费者应用程序实际上已经复制了数据。
aws dynamodb scan --table-name users_replica
注意processed_by
属性的值?它与消费者应用程序Pod
相同。这将使我们更容易验证端到端的自动扩展过程。
使用定标器定义:
kubectl apply -f keda-dynamodb-streams-scaler.yaml
这是ScaledObject
定义。请注意,它针对dynamodb-streams-kcl-consumer-app
Deployment
(我们刚刚创建的)进行定位,shardCount
设置为2
:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: aws-dynamodb-streams-scaledobject
spec:
scaleTargetRef:
name: dynamodb-streams-kcl-consumer-app
triggers:
- type: aws-dynamodb-streams
metadata:
awsRegion: us-east-1
tableName: users
shardCount: "2"
identityOwner: "operator"
shardCount
属性的说明:我们使用了2
的shardCount
值。非常重要的一点是我们使用了支持"每个shard最多同时支持2个消费者"的DynamoDB Streams Kinesis适配器库,它使用支持KCL 1.x。这意味着您不能有超过两个消费者应用程序实例处理相同的DynamoDB流shard。
但是,这个KEDA定标器配置将确保每两个shards就有一个Pod
。因此,例如,如果有四个shards,该应用程序将扩展到两个Pods
。如果有六个shards,将有三个Pods
,依此类推。当然,您可以选择每个shard一个Pod
,只需将shardCount
设置为1
。
要跟踪DynamoDB流中的shard数量,可以运行以下命令:
aws dynamodbstreams describe-stream --stream-arn $(aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn') | jq -r '.StreamDescription.Shards | length'
我还使用了一个叫做jq
的实用程序。如果您想要shards的详细信息:
aws dynamodbstreams describe-stream --stream-arn $(aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn') | jq -r '.StreamDescription.Shards'
我们从一个应用程序的Pod
开始。但是,由于KEDA
,我们应该看到额外的Pod
自动出现以满足消费者应用程序的处理要求。
为了确认,请检查Pods
的数量:
kubectl get pods -l=app=dynamodb-streams-kcl-consumer-app-consumer
很可能您会看到DynamoDB流中有四个shards和两个Pod
。根据将数据产生到DynamoDB表的速率,这可能会发生变化(增加/减少)。
与以前一样,验证DynamoDB目标表(users_replica
)中的数据,并注意processed_by
属性。因为我们已经扩展到额外的Pods
,每个Pod
都将处理DynamoDB更改流的一部分消息,所以每条记录的值都应该不同。
另外,确保检查DynamoDB
中dynamodb-streams-kcl-app-demo
控制表。您应该看到leaseOwner
的更新,反映现在有两个Pods
从DynamoDB流中消费。
验证端到端解决方案后,您可以清理资源以避免产生额外的费用。
删除EKS集群和DynamoDB表。
eksctl delete cluster --name <enter cluster name>
aws dynamodb delete-table --table-name users
aws dynamodb delete-table --table-name users_replica
您应该尝试的用例:
Pod
的数量会发生什么变化?在本文中,我们演示了如何使用KEDA
和DynamoDB流,结合了两种强大的技术(变更数据捕获和自动扩展),来构建可伸缩的、事件驱动的系统,以根据应用程序的数据处理需求进行动态调整。
推荐阅读: 百度面经(21)
本文链接: 在 Kubernetes 上自动扩展 DynamoDB 流应用