当前位置:Java -> 在 Kubernetes 上自动扩展 DynamoDB 流应用

在 Kubernetes 上自动扩展 DynamoDB 流应用

这篇博客文章演示了如何在Kubernetes上自动扩展您的DynamoDB Streams消费者应用程序。您将使用一个使用DynamoDB Streams Kinesis适配器库来消费DynamoDB表中的变更数据事件的Java应用程序。它将部署到Amazon EKS集群,并将使用KEDA自动进行扩展。

该应用程序包括一个实现了com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor的代码,它负责从DynamoDB流中处理数据并将其复制到另一个(目标)DynamoDB表-这只是一个示例。我们将使用AWS CLI来向DynamoDB流生成数据,并观察应用程序的扩展情况。

代码可在GitHub存储库中获取。

使用AWS CLI向DynamoDB流生成数据

涵盖的内容

  • 介绍
    • 使用Kinesis客户端库的水平扩展
  • KEDA是什么?
  • 先决条件
  • 在EKS上设置和配置KEDA
  • 配置IAM角色
  • 将DynamoDB Streams消费者应用程序部署到EKS
  • DynamoDB Streams消费者应用程序通过KEDA实现自动扩展
  • 删除资源
  • 结论

介绍

Amazon DynamoDB是一个完全托管的数据库服务,提供快速和可预测的性能以及无缝的可伸缩性。使用DynamoDB Streams,您可以利用变更数据捕获(CDC)实时获取DynamoDB表数据变更的通知。这使得可以轻松构建应用程序以对基础数据库的更改做出反应,而无需复杂的轮询或查询。

DynamoDB为变更数据捕获提供了两种流模型:

  • DynamoDB的Kinesis数据流
  • DynamoDB Streams

使用Kinesis Data Streams,可以捕获任何DynamoDB表中的项目级修改,并将其复制到Kinesis数据流。而DynamoDB Streams捕获了任何DynamoDB表中项目级修改的时间顺序序列,并将此信息存储在日志中,持续时间最长可达24小时。

我们将利用原生DynamoDB Streams功能。即使使用了DynamoDB Streams,当涉及到消费更改数据事件时,有多种选项可供选择:

我们的应用程序将利用DynamoDB Streams以及Kinesis客户端库(KCL)适配器库1.x来消费DynamoDB表的变更数据事件。

使用Kinesis客户端库进行水平扩展

Kinesis客户端库确保每个分片都有一个记录处理器在运行和处理该分片数据。KCL帮助处理了许多与分布式计算和可伸缩性相关的复杂任务。它连接到数据流,枚举数据流中的分片,并使用租约来将分片关联到其消费者应用程序。

对每个它管理的分片实例化一个记录处理器。KCL从数据流中获取数据记录,将记录推送到相应的记录处理器,并对已处理的记录进行检查点处理。更重要的是,当工作实例数发生变化或数据流重新分片时(分片被拆分或合并),它会平衡分片-工作器关联(租约)。这意味着只需添加更多实例,就能自动平衡分片到各个实例,从而能够扩展您的DynamoDB Streams应用程序。

但是,当负载增加时,仍然需要一种方式来扩展应用程序。当然,您可以手动操作或构建自定义解决方案来完成此操作。

这就是KEDA发挥作用的地方。

KEDA是什么?

KEDA是一个基于Kubernetes的事件驱动自动缩放组件,可以监视DynamoDB Streams等事件源,并根据需要处理的事件数量来扩展底层的Deployment(和Pod)。它构建在诸如Horizontal Pod Autoscaler等原生Kubernetes原语之上,可以添加到任何Kubernetes集群中。以下是其主要组件的高级概述(您可以参考KEDA文档进行深入了解):

KEDA主要组件的高级概览来自 KEDA Concepts文档

  1. KEDA中的keda-operator-metrics-apiserver组件充当Kubernetes指标服务器,为Horizontal Pod Autoscaler公开度量标准。
  2. KEDA Scaler集成到外部系统(例如Redis)中获取这些度量标准(例如列表长度),以便根据需要处理的事件数量自动缩放Kubernetes中的任何容器。
  3. keda-operator组件的作用是激活注销Deployment,即从零缩放。

你将看到< a rel =“noopener noreferrer” target=“_ blank”> DynamoDB Streams scaler正在动作,它是根据DynamoDB Stream的分片数量而扩展的。

现在让我们继续这个教程的实际部分。

必备条件

除了AWS账户外,您还需要安装 < a rel =“noopener noreferrer” target=“_ blank”> AWS CLI ,< a rel =“noopener noreferrer” target=“_ blank”> kubectl 和< a> Docker 。

设置EKS集群并创建DynamoDB表

有多种方法可以创建< a rel =“noopener noreferrer” target=“_ blank”> Amazon EKS集群 。我更喜欢使用< a> eksctl CLI,因为它提供的便利。使用< code> eksctl 创建EKS集群可能就像这样简单:

eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>

< br />

有关详细信息,请参阅< a rel =“noopener noreferrer” target=“_ blank”>开始使用Amazon EKS - eksctl。

创建启用了流的DynamoDB表来持久保存应用程序数据并访问更改数据源。您可以使用AWS CLI使用以下命令创建表:

aws dynamodb create-table \
    --table-name users \
    --attribute-definitions AttributeName=email,AttributeType=S \
    --key-schema AttributeName=email,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST \
    --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES

< br />

我们需要创建另一个表,它将用作第一个表的副本。

aws dynamodb create-table \
    --table-name users_replica \
    --attribute-definitions AttributeName=email,AttributeType=S \
    --key-schema AttributeName=email,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST

< br />

克隆此GitHub存储库并将其更改为正确的目录:

git clone https://github.com/abhirockzz/dynamodb-streams-keda-autoscale
cd dynamodb-streams-keda-autoscale

< br />

好的,让我们开始吧!

在EKS上设置和配置KEDA

本教程中,您将使用< a rel =“noopener noreferrer” target=“_ blank”>YAML文件来部署 KEDA ,但您也可以使用< a rel =“noopener noreferrer” target=“_ blank”>Helm图表。

安装 KEDA

# update version 2.8.2 if required

kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.8.2/keda-2.8.2.yaml

< br />

验证安装:

# check Custom Resource Definitions
kubectl get crd

# check KEDA Deployments
kubectl get deployment -n keda

# check KEDA operator logs
kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='{.items[0].metadata.name}' -n keda) -n keda

< br />

配置IAM角色

KEDA运算符以及DynamoDB流消费者应用程序需要调用AWS API。由于两者将作为EKS中的< code> Deployment 运行,因此我们将使用< a rel =“noopener noreferrer” target=“_ blank”>服务帐户的IAM角色(IRSA)提供所需的权限。

在我们特定的情况下:

  • < code> KEDA 运算符需要能够获取有关DynamoDB表和流的信息
  • 应用程序(特指KCL 1.x库)需要与Kinesis和DynamoDB交互 - 它需要一些< a rel =“noopener noreferrer” target=“_ blank”> IAM权限这样做。

为KEDA Operator配置IRSA

将您的AWS帐户ID和OIDC身份提供者设置为环境变量:

ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)

#update the cluster name and region as required
export EKS_CLUSTER_NAME=demo-eks-cluster
export AWS_REGION=us-east-1

OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:\/\///")

< br />

创建包含角色的受信任实体的< code> JSON 文件:

read -r -d '' TRUST_RELATIONSHIP <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
          "${OIDC_PROVIDER}:sub": "system:serviceaccount:keda:keda-operator"
        }
      }
    }
  ]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust_keda.json

< br />

现在,创建IAM角色并附加策略(查看< code> policy_dynamodb_streams_keda.json 文件以获取详细信息):

export ROLE_NAME=keda-operator-dynamodb-streams-role

aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for DynamoDB streams KEDA scaler on EKS"

aws iam create-policy --policy-name keda-dynamodb-streams-policy --policy-document file://policy_dynamodb_streams_keda.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/keda-dynamodb-streams-policy

< br />

关联IAM角色和服务帐户:

kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}

# verify the annotation 
kubectl describe serviceaccount/keda-operator -n keda

< br />

您需要重新启动< code> KEDA 运算符< code> Deployment 才能使其生效:

kubectl rollout restart deployment.apps/keda-operator -n keda

# to verify, confirm that the KEDA operator has the right environment variables
kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath={.items..metadata.name}) | grep "^\s*AWS_"

# expected output

AWS_STS_REGIONAL_ENDPOINTS:   regional
AWS_DEFAULT_REGION:           us-east-1
AWS_REGION:                   us-east-1
AWS_ROLE_ARN:                 arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-dynamodb-streams-role
AWS_WEB_IDENTITY_TOKEN_FILE:  /var/run/secrets/eks.amazonaws.com/serviceaccount/token

< br />

为DynamoDB流消费应用程序配置IRSA

首先创建一个Kubernetes服务帐户:

kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
  name: dynamodb-streams-consumer-app-sa
EOF

< br />

创建包含角色的受信任实体的< code> JSON 文件:

read -r -d '' TRUST_RELATIONSHIP <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Federated": "arn:aws:iam::${ACCOUNT_ID}:oidc-provider/${OIDC_PROVIDER}"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "${OIDC_PROVIDER}:aud": "sts.amazonaws.com",
          "${OIDC_PROVIDER}:sub": "system:serviceaccount:default:dynamodb-streams-consumer-app-sa"
        }
      }
    }
  ]
}
EOF
echo "${TRUST_RELATIONSHIP}" > trust.json

< br />

现在,创建IAM角色并附加策略。更新< code> policy.json 文件,并输入区域和AWS帐户详细信息。

export ROLE_NAME=dynamodb-streams-consumer-app-role

aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for DynamoDB Streams consumer app on EKS"

aws iam create-policy --policy-name dynamodb-streams-consumer-app-policy --policy-document file://policy.json

aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::${ACCOUNT_ID}:policy/dynamodb-streams-consumer-app-policy

< br />

关联IAM角色和服务帐户:

kubectl annotate serviceaccount -n default dynamodb-streams-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}

# verify the annotation
kubectl describe serviceaccount/dynamodb-streams-consumer-app-sa

< br />

核心基础设施现在准备就绪。让我们准备并部署消费者应用程序。

将DynamoDB流消费应用程序部署到EKS

我们首先需要构建Docker镜像并将其推送到ECR(您可以参考< code> Dockerfile 以获取详细信息)。

构建并推送Docker镜像至ECR

# create runnable JAR file
mvn clean compile assembly\:single

# build docker image
docker build -t dynamodb-streams-consumer-app .

AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)

# create a private ECR repo
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com

aws ecr create-repository --repository-name dynamodb-streams-consumer-app --region us-east-1

# tag and push the image
docker tag dynamodb-streams-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-consumer-app:latest
docker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-consumer-app:latest

< br />

部署消费者应用程序

更新< code> consumer.yaml 以包括您刚刚推送到ECR的Docker镜像和源表的DynamoDB流的ARN。清单的其余部分保持不变。

要检索流的ARN,请运行以下命令:

aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn'

< br />

consumer.yaml Deployment 清单如下:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: dynamodb-streams-kcl-consumer-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: dynamodb-streams-kcl-consumer-app
  template:
    metadata:
      labels:
        app: dynamodb-streams-kcl-consumer-app
    spec:
      serviceAccountName: dynamodb-streams-kcl-consumer-app-sa
      containers:
        - name: dynamodb-streams-kcl-consumer-app
          image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/dynamodb-streams-kcl-consumer-app:latest
          imagePullPolicy: Always
          env:
            - name: TARGET_TABLE_NAME
              value: users_replica
            - name: APPLICATION_NAME
              value: dynamodb-streams-kcl-app-demo
            - name: SOURCE_TABLE_STREAM_ARN
              value: <enter ARN>
            - name: AWS_REGION
              value: us-east-1
            - name: INSTANCE_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name

< br />

创建 Deployment

kubectl apply -f consumer.yaml

# verify Pod transition to Running state
kubectl get pods -w

< br />

DynamoDB Streams消费者应用程序自动扩展与KEDA

现在您已经部署了消费者应用程序,KCL适配器库应该开始工作。它将首先在DynamoDB中创建一个“控制表” - 它应与应用程序的名称相同(在本例中为< code> dynamodb-streams-kcl-app-demo )。

初始协调和表的创建可能需要几分钟的时间。您可以查看消费者应用程序的日志以查看进展。

kubectl logs -f $(kubectl get po -l=app=dynamodb-streams-kcl-consumer-app --output=jsonpath={.items..metadata.name})


完成租约分配后,请检查表并注意leaseOwner属性:

aws dynamodb describe-table --table-name dynamodb-streams-kcl-app-demo


dynamodb-streams-kcl-app-demo Items returned

向DynamoDB表添加数据

现在,您已部署消费者应用程序,让我们向源DynamoDB表(users)添加数据。

您可以使用producer.sh脚本。

export export TABLE_NAME=users
./producer.sh


检查消费者日志以查看消息的处理情况:

kubectl logs -f $(kubectl get po -l=app=dynamodb-streams-kcl-consumer-app --output=jsonpath={.items..metadata.name})


检查目标表(users_replica)以确认DynamoDB流消费者应用程序实际上已经复制了数据。

aws dynamodb scan --table-name users_replica


注意processed_by属性的值?它与消费者应用程序Pod相同。这将使我们更容易验证端到端的自动扩展过程。

创建KEDA定标器

使用定标器定义:

kubectl apply -f keda-dynamodb-streams-scaler.yaml


这是ScaledObject定义。请注意,它针对dynamodb-streams-kcl-consumer-appDeployment(我们刚刚创建的)进行定位,shardCount设置为2

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name:  aws-dynamodb-streams-scaledobject
spec:
  scaleTargetRef:
    name: dynamodb-streams-kcl-consumer-app
  triggers:
  - type: aws-dynamodb-streams
    metadata:
      awsRegion: us-east-1
      tableName: users
      shardCount: "2"
      identityOwner: "operator"


关于shardCount属性的说明:

我们使用了2shardCount值。非常重要的一点是我们使用了支持"每个shard最多同时支持2个消费者"的DynamoDB Streams Kinesis适配器库,它使用支持KCL 1.x。这意味着您不能有超过两个消费者应用程序实例处理相同的DynamoDB流shard。

但是,这个KEDA定标器配置将确保每两个shards就有一个Pod。因此,例如,如果有四个shards,该应用程序将扩展到两个Pods。如果有六个shards,将有三个Pods,依此类推。当然,您可以选择每个shard一个Pod,只需将shardCount设置为1

要跟踪DynamoDB流中的shard数量,可以运行以下命令:

aws dynamodbstreams describe-stream --stream-arn $(aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn') | jq -r '.StreamDescription.Shards | length'


我还使用了一个叫做jq的实用程序。如果您想要shards的详细信息:

aws dynamodbstreams describe-stream --stream-arn $(aws dynamodb describe-table --table-name users | jq -r '.Table.LatestStreamArn') | jq -r '.StreamDescription.Shards'


验证DynamoDB流消费者应用程序自动扩展

我们从一个应用程序的Pod开始。但是,由于KEDA,我们应该看到额外的Pod自动出现以满足消费者应用程序的处理要求。

为了确认,请检查Pods的数量:

kubectl get pods -l=app=dynamodb-streams-kcl-consumer-app-consumer


很可能您会看到DynamoDB流中有四个shards和两个Pod。根据将数据产生到DynamoDB表的速率,这可能会发生变化(增加/减少)

与以前一样,验证DynamoDB目标表(users_replica)中的数据,并注意processed_by属性。因为我们已经扩展到额外的Pods,每个Pod都将处理DynamoDB更改流的一部分消息,所以每条记录的值都应该不同。

另外,确保检查DynamoDBdynamodb-streams-kcl-app-demo控制表。您应该看到leaseOwner的更新,反映现在有两个Pods从DynamoDB流中消费。

Updated items returned showing two pods consuming from DynamoDB stream

验证端到端解决方案后,您可以清理资源以避免产生额外的费用。

删除资源

删除EKS集群和DynamoDB表。

eksctl delete cluster --name <enter cluster name>
aws dynamodb delete-table --table-name users
aws dynamodb delete-table --table-name users_replica


结论

您应该尝试的用例:

  • 进一步扩展 - 如何让DynamoDB流增加其分片数?当分片数增加时,消费者实例Pod的数量会发生什么变化?
  • 缩小规模 - 当DynamoDB流的分片容量减少时会发生什么情况?

在本文中,我们演示了如何使用KEDA和DynamoDB流,结合了两种强大的技术(变更数据捕获和自动扩展),来构建可伸缩的、事件驱动的系统,以根据应用程序的数据处理需求进行动态调整。

推荐阅读: 百度面经(21)

本文链接: 在 Kubernetes 上自动扩展 DynamoDB 流应用