批处理系统:Batch批量计算与云原生ServerlessArgo...

科技梦想在奔跑 2024-10-18 18:45:35

随着自动驾驶、科学计算等领域对技术需求的不断深化,以及Kubernetes生态系统日趋丰富,容器化已成为批处理任务执行的主流模式。面对这一趋势,市场提供了两大类解决方案:一类是以Batch批量计算为代表的云服务商自主研发的封闭式平台,另一类则是围绕开源项目Argo Workflows构建的开放兼容平台。

对于企业研发团队而言,明智地选择符合自身业务需求的批处理任务平台至关重要,这直接关系到开发效率、成本控制及未来技术的可扩展性。本文将以一个典型的数据处理应用场景为案例,深入对比Batch批量计算与Argo Workflows的核心特性及适用场景,辅助技术决策者做出更加贴切的选择。

一、案例

如下图所示是一个典型的数据处理任务,第一步使用64个Pod进行数据处理,将128个文件合并成成64个文件,第二步使用32个Pod进行第二步的数据处理,将64个文件合并成32个文件,最后一步启动一个Pod进行最终结果的计算并输出到对象存储中。

架构图:

二、通过Batch批量计算实现

1. 原理

Batch批量计算通常是一项完全托管服务,可让您以任何规模运行批处理式工作负载。以下流程描述 Batch批量计算如何运行每个作业。

1. 创建作业定义,其指定如何运行作业,同时提供权限、内存和CPU要求以及其他配置选项。

2. 将作业提交到托管的Batch批量计算作业队列,作业将一直驻留在该队列中,直到被安排在计算环境中进行处理。

3. Batch批量计算计算队列中每个作业的CPU、内存和GPU要求,并在计算环境中调度计算资源以处理作业。

4. Batch批量计算调度程序将作业放入相应的Batch计算环境进行处理。

5. 作业成功或失败退出,将输出结果写入用户定义的存储空间。

2. 创建任务定义

第一步任务定义,构建process-data、merge-data等任务定义,需要准备镜像,启动参数,所需资源等。Batch批量计算服务通常提供友好的控制台交互,为易于编程和避免频繁的控制台演示,我们直接使用Json的方式定义任务。

a. process-data

{ "type": "container", "containerProperties": { # 执行命令 "command": [ "python", "process.py" ], "image": "python:3.11-amd", # 镜像地址 "resourceRequirements": [ # 资源需要 { "type": "VCPU", "value": "1.0" }, { "type": "MEMORY", "value": "2048" } ], "runtimePlatform": { "cpuArchitecture": "X86_64", # cpu架构 "operatingSystemFamily": "LINUX" }, "networkConfiguration": { "assignPublicIp": "DISABLED" }, "executionRoleArn": "role::xxxxxxx", # 权限 }, "platformCapabilities": [ # 后端资源:服务器或者Serverless Container "Serverless Container" ], "jobDefinitionName": "process-data" # Job名称}

b. merge-data:

{ "type": "container", "containerProperties": { # 执行命令,merge.py "command": [ "python", "merge.py" ], "image": "python:3.11-amd", # 镜像 "resourceRequirements": [ # 资源需要 { "type": "VCPU", "value": "1.0" }, { "type": "MEMORY", "value": "2048" } ], "runtimePlatform": { "cpuArchitecture": "X86_64", "operatingSystemFamily": "LINUX" }, "networkConfiguration": { "assignPublicIp": "ENABLED" }, "executionRoleArn": "role::xxxx", # 权限 "repositoryCredentials": {}, }, "platformCapabilities": [ # 后端资源:服务器或者Serverless Container "Serverless Container" ], "jobDefinitionName": "merge-data" # Job名称}

3. 提交任务并构建依赖关系

a. 定义并提交process-data-l1 Job

Job定义:

{ "jobName": "process-data-l1", "jobDefinition": "arn::xxxx:job-definition/process-data:1", # Job使用的定义 "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Job使用的队列 "dependsOn": [], "arrayProperties": { # 启动任务数 "size": 64 }, "retryStrategy": {}, "timeout": {}, "parameters": {}, "containerOverrides": { "resourceRequirements": [], "environment": [] }}

提交获取Job id:

# batch submit process-data-l1 | get job-idjob-id: b617f1a3-6eeb-4118-8142-1f855053b347

b. 提交process-data-l2 Job

该Job依赖process-data-l1 Job。

{ "jobName": "process-data-l2", "jobDefinition": "arn::xxxx:job-definition/process-data:2", # Job使用的定义 "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Job使用的队列 "dependsOn": [ { "jobId": "b617f1a3-6eeb-4118-8142-1f855053b347" # process-data-l1的job Id } ], "arrayProperties": { # 启动任务数 "size": 32 }, "retryStrategy": {}, "timeout": {}, "parameters": {}, "containerOverrides": { "resourceRequirements": [], "environment": [] }}

提交获取Job id:

# batch submit process-data-l2 | get job-idjob-id: 6df68b3e-4962-4e4f-a71a-189be25b189c

c. 提交merge-data Job

该Job依赖process-data-l2 Job。

{ "jobName": "merge-data", "jobDefinition": "arn::xxxx:job-definition/merge-data:1", # Job使用的定义 "jobQueue": "arn::xxxx:job-queue/process-data-queue", # Merge Job使用的队列 "dependsOn": [ { "jobId": "6df68b3e-4962-4e4f-a71a-189be25b189c" # process-data-l2的job Id } ], "arrayProperties": {}, "retryStrategy": {}, "timeout": {}, "parameters": {}, "containerOverrides": { "resourceRequirements": [], "environment": [] }}

提交Job:

batch submit merge-data

4. 观察任务运行

所有任务均按序正常运行。

三、通过Argo Workflows实现

1. 原理

Serverless Argo Workflows是阿里云的一项全托管服务,基于开源Argo Workflow项目构建,完全符合开源工作流标准,可以让您在Kubernetes上运行任何规模的批处理负载,采用无服务器模式,使用阿里云弹性容器实例ECI运行工作流,通过优化Kubernetes集群参数,实现大规模工作流的高效弹性调度,同时配合抢占式ECI实例,优化成本。下面流程介绍Serverless Argo如何运行每个作业:

1)作业定义,指定如何运行作业,包括每个作业的CPU、内存、镜像、执行命令等。作业依赖关系定义,包括串行、并行循环、重试等。

2)提交Workflow到Serverless Argo集群。

3)Serverless Argo评估每个作业的资源,并调度弹性实例运行作业。

2. 创建任务定义并构建依赖关系

在构建Workflow时,定义Job和其依赖关系在同一个文件中定义。

第一步,定义process-data、和merge-data任务Template,描述每个任务的的镜像和启动参数。

第二步,定义Step/DAG Template,描述任务的并、串行执行关系。

第三步,将Template、依赖关系,存储、输入参数等整合成Workflow。

构建Workflow通常有两种方式,一种是通过Yaml方式构建,另一种是通过Python SDK构建,下边分别展示这两种方式的构建方法。

a. 通过Yaml方式构建

apiVersion: argoproj.io/v1alpha1kind: Workflowmetadata: generateName: process-data- # 数据处理工作流spec: entrypoint: main volumes: # 对象存储挂载 - name: workdir persistentVolumeClaim: claimName: pvc-oss arguments: parameters: - name: numbers value: "64" templates: - name: main steps: - - name: process-data-l1 # 第一级处理,启动64个Pods,Merge 128 个files template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "1" withSequence: count: "{{workflow.parameters.numbers}}" - - name: process-data-l2 # 第二级处理,启动32个Pods,Merge 64 个files, 上一步处理完后启动 template: process-data arguments: parameters: - name: file_number value: "{{item}}" - name: level value: "2" withSequence: count: "{{=asInt(workflow.parameters.numbers)/2}}" - - name: merge-data # 最后一级处理,启动一个Pod,Merge 32 files, 上一步处理完后启动 template: merge-data arguments: parameters: - name: number value: "{{=asInt(workflow.parameters.numbers)/2}}" - name: process-data # process-data 任务定义 inputs: parameters: - name: file_number - name: level container: image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd imagePullPolicy: Always command: [python3] # command args: ["process.py", "{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"]# 接收输入的参数,启动多少个pod进行处理 volumeMounts: - name: workdir mountPath: /mnt/vol - name: merge-data # merge-data 任务定义 inputs: parameters: - name: number container: image: argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd imagePullPolicy: Always command: [python3] args: ["merge.py", "0", "{{inputs.parameters.number}}"] # 接收输入的参数,处理多少个文件。 volumeMounts: - name: workdir mountPath: /mnt/vol

提交Workflow:

argo submit process-data.yaml

b. 通过Python SDK构建

from hera.workflows import Container, Parameter, Steps, Workflow, Volumeimport urllib3urllib3.disable_warnings()# 配置访问地址和tokenglobal_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"global_config.token = "abcdefgxxxxxx" # 填入集群的tokenglobal_config.verify_ssl = ""with Workflow( generate_name="process-data-", # 数据处理工作流 entrypoint="main", volumes=[m.Volume(name="workdir", persistent_volume_claim={"claim_name": "pvc-oss"})], # 对象存储挂载 arguments=[Parameter(name="numbers", value="64")]) as w: process-data = Container( # process-data 任务定义 name="process-data", inputs=[Parameter(name="file_number"), Parameter(name="level")], image="argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd", command=["python3"], args=["process.py","{{inputs.parameters.file_number}}", "{{inputs.parameters.level}}"], volume_mounts=[ m.VolumeMount(name="workdir", mount_path="/mnt/vol"), ], ) merge-data = Container( # merge-data 任务定义 name="merge-data", inputs=[Parameter(name="number")], image="argo-workflows-registry.cn-hangzhou.cr.aliyuncs.com/argo-workflows-demo/python:3.11-amd", command=["python3"], args=["merge.py", "0", "{{inputs.parameters.number}}"], volume_mounts=[ m.VolumeMount(name="workdir", mount_path="/mnt/vol"), ], ) with Steps(name="main") as s: process-data( name="process-data-l1", arguments=[Parameter(name="file_number", value="{{item}}"), Parameter(name="level", value="1")], ) # 第一级处理,启动64个Pods,Merge 128 个files process-data( name="process-data-l2", arguments=[Parameter(name="file_number", value="{{item}}"), Parameter(name="level", value="2")], ) # 第二级处理,启动32个Pods,Merge 64 个files, 上一步处理完后启动 merge-data( name="merge-data", arguments=[Parameter(name="number", value="{{=asInt(workflow.parameters.numbers)/2}}")], ) # 最后一级处理,启动一个Pod,Merge 32 files, 上一步处理完后启动# 创建workfloww.create()

提交任务:

python process.py

3. 提交并观察任务运行

通过Yaml或者Python SDK方式构建并提交后,即可在Argo Server控制台查看工作流运行状态。

工作流按序正常执行成功。

四、对比

可以看出Serverless Argo Workflows和Batch批量计算对容器批处理都有非常完善的支持。尽管它们的核心目标相似,但在任务定义、使用场景、灵活性以及资源管理等方面存在一些关键差异。以下是一个简要的对比。

五、总结

Serverless Argo Workflows和Batch批量计算对容器批处理都有非常完善的支持,选择Argo Workflows还是Batch批量计算主要取决于您的的技术栈、对云供应商的依赖程度、工作流的复杂性和对控制权的需求。如果您的团队熟悉Kubernetes并且需要高度定制化的工作流,Argo Workflows可能是更好的选择。相反,如果你在云厂商生态系统内运作,寻求简单易用且能与云厂商其他服务紧密集成的解决方案,Batch批量计算可能更适合您。

ACK One Serverles Argo团队是国内最早使用和维护Argo Workflows的团队之一,在Argo Workflows使用方面积累众多的最佳实践,如果您需要使用Argo Workflows调度大规模工作流,欢迎加入钉钉群号一同交流:35688562。

参考:

全托管Serverless Argo工作流:

https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/overview-12

Argo Workflows:

https://github.com/argoproj/argo-workflows

创建工作流集群:

https://www.alibabacloud.com/help/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster

PythonSDK助力大规模Argo Workflows构建:

https://mp.weixin.qq.com/s/_2Glhuy6YJEM4ZRMDwE7JA

/ END /

0 阅读:0

科技梦想在奔跑

简介:感谢大家的关注