基于 Serverless 工作流高并发批量解冻 OSS 文件

本文涉及的产品
简介:

前言

Serverless 工作流(Serverless Workflow,原函数工作流)是一个用来协调多个分布式任务执行的全托管 Serverless 云服务,致力于简化开发和运行业务流程所需要的任务协调、状态管理以及错误处理等繁琐工作,让用户聚焦业务逻辑开发。用户可以用顺序、分支、并行等方式来编排分布式任务,服务会按照设定好的顺序可靠地协调任务执行,跟踪每个任务的状态转换,并在必要时执行用户定义的重试逻辑,以确保工作流顺利完成。

函数计算 FC 是事件驱动的全托管计算服务,无需采购服务器和运维,只需上传代码就能实现高可用、高并发、弹性伸缩的后端服务。

本文介绍如何使用 Serverless 工作流来高并发大批量的解冻 oss 归档存储文件,使用工作流的优势:

  • 高并发
  • 错误自动重试,高可靠性
  • 每个步骤都有输入输出记录以及实时执行状态,高可观测性

应用中心一键部署

  1. 前往 Serverless 工作流应用中心 创建并部署 OSS Restore 应用。
    image
  2. 部署完成后执行流程 {stackName}-mainRestoreFlow-{suffix},输入:

    {
      "endpoint": "",
      "bucketName": "",
      "prefix": "",
      "marker": "",
      "maxKeys": 100,
      "pollInterval": 10,
      "workers": 10,
      "groupSize": 1
    }

    执行参数说明:

    • endpoint
      OSS endpoint
    • bucketName:
      OSS bucket 名称
    • prefix:
      OSS bucket 文件过滤前缀
    • maxKeys:
      OSS ListObjects 返回的最大文件数量 (这里不要超过 foreach 的并发限制,默认是 100)
    • pollInterval:
      轮询 OSS 文件 restore 状态的时间间隔(秒)
    • groupSize:
      一个任务步骤(对应一个函数)批量处理的文件数
    • workers:
      多个文件在一个函数中处理时,设定的处理线程池大小
    • marker:
      OSS ListObjects 的开始 marker

    详细信息可参考 ListObjects API

流程执行完毕后,会自动解冻所有符合过滤条件的归档存储文件。

原理

解冻文件主流程

主流程 mainRestoreFlow 主要做以下事情:

  1. 任务步骤 listArchiveFiles 调用 FC 函数 listArchiveFilesmarker 开始执行 ListObjects 列举指定 maxKeys 数量,前缀为 prefix 的 OSS 文件。
  2. 调用子流程 restoreFlow 对获取的文件列表进行解冻,并返回下一次列举的起点 maker
  3. 选择步骤 checkEnd 检测是否已完成列举 bucket 中所有的文件,若没有,跳转到 listArchiveFiles 中从下一个 marker 开始继续执行,否则结束。

主流程定义,请参考 流程定义语言

version: v1
type: flow
steps:
- type: pass
  name: init
  outputMappings:
    - source: $input.endpoint
      target: endpoint
    - source: $input.bucketName
      target: bucketName
    - source: $input.prefix
      target: prefix
    - source: $input.maxKeys
      target: maxKeys
    - source: $input.pollInterval
      target: pollInterval
    - source: $input.marker
      target: marker
    - source: $input.workers
      target: workers
    - source: $input.groupSize
      target: groupSize

# List archive files from marker
- type: task
  name: listArchiveFiles
  resourceArn: acs:fc:::services/<serviceName>/functions/listArchiveFiles
  outputMappings:
    - source: $local.bucketName
      target: bucketName
    - source: $local.filesGroup
      target: filesGroup
    - source: $local.marker
      target: marker
    - source: $local.end
      target: end
    - source: $local.empty
      target: empty
    - source: $local.archiveFilesCount
      target: archiveFilesCount

# Check whether file restore success, if not, retry check
- type: choice
  name: checkEmpty
  choices:
    # If list archive files not empty
    - condition: $.empty == "false"
      steps:
        # Invoke subflow restore to restore listed files
        - type: task
          name: invokeRestoreFlow
          resourceArn: acs:fnf:::flow/<restoreFlow>
          pattern: sync
          serviceParams:
            Input: $
  default:
    goto: checkEnd

# Check list files ended
- type: choice
  name: checkEnd
  choices:
    - condition: $.end == "true"
      goto: success
  default:
    goto: listArchiveFiles

# success
- type: pass
  name: success

解冻文件子流程

子流程 restoreFlow 主要做以下事情:

  1. 并行循环步骤 foreach,并行的对主流程中传入的文件列表生成多个解冻任务。
  2. 解冻任务 restoreTask 调用 FC 函数 restore 对文件列表进行解冻。
  3. 所有解冻任务提交完成后,循环执行:

    • 等待步骤 Wait 等待一段时间
    • 任务步骤 GetJobStatus 获取所有文件的解冻状态
    • 选择步骤 CheckJobComplete 判断是否全部解冻完成,若完成执行结束,否则跳转到 wait 步骤继续循环检测。

子流程定义如下:

version: v1
type: flow
steps:
  - type: foreach
    name: retoreForeach
    iterationMapping:
      collection: $.filesGroup
      item: files
    steps:
      # Invoke restore function
      - type: task
        name: restoreTask
        resourceArn: !Ref OSSRestoreService/restoreFunction
        retry:
          - errors:
              - FC.ResourceThrottled
              - FC.ResourceExhausted
              - FC.InternalServerError
              - FC.Unknown
              - FnF.TaskTimeout
            intervalSeconds: 1
            maxAttempts: 10
            multiplier: 1.5
            maxIntervalSeconds: 10

  # Wait interval for poll files restore status
  - type: wait
    name: Wait
    duration: $.pollInterval

  # Get file restore status
  - type: task
    name: GetJobStatus
    resourceArn: !Ref OSSRestoreService/restoreStatusFunction

  # Check whether file restore success, if not, retry check
  - type: choice
    name: CheckJobComplete
    inputMappings:
      - target: status
        source: $local.status
    choices:
      - condition: $.status == "success"
        goto: JobSucceeded
      - condition: $.status == "running"
        goto: Wait

  - type: succeed
    name: JobSucceeded
    outputMappings:
      - target: filesGroup
        source: $input.filesGroup
      - target: marker
        source: $input.marker

以上应用代码可参考 oss-restore

总结

使用 Serverless 工作流能极大减少重复的流程控制开发,让错误重试更加容易,以及提供实时的执行进度查询。这里提供的解冻 oss 应用,完全可以照搬到其它的需求上,以实现高并发高可靠性的应用。

欢迎加钉钉群 23116481 交流:
image

相关实践学习
基于函数计算一键部署掌上游戏机
本场景介绍如何使用阿里云计算服务命令快速搭建一个掌上游戏机。
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
2月前
|
监控 Serverless 测试技术
Serverless 应用引擎常见问题之生成的图片的oss地址配成自定义的域名如何解决
Serverless 应用引擎(Serverless Application Engine, SAE)是一种完全托管的应用平台,它允许开发者无需管理服务器即可构建和部署应用。以下是Serverless 应用引擎使用过程中的一些常见问题及其答案的汇总:
25 0
|
2月前
|
弹性计算 前端开发 小程序
微信小程序上传文件至阿里云OSS直传(java后端签名+前端直传)
当前的通用文件上传方式是通过前端上传到服务器,再由服务器转存至对象存储。这种方式在处理小文件时效率尚可,但大文件上传因受限于服务器带宽,速度较慢。例如,一个100MB的文件在5Mbps带宽的阿里云ECS上上传至服务器需160秒。为解决此问题,可以采用后端签名的方式,使微信小程序直接上传文件到阿里云OSS,绕过服务器中转。具体操作包括在JAVA后端引入相关依赖,生成签名,并在微信小程序前端使用这个签名进行文件上传,注意设置正确的请求头和formData参数。这样能提高大文件上传的速度。
|
2月前
|
监控 关系型数据库 Serverless
Serverless 应用引擎常见问题之工作流这执行输出通过jsonpath过滤如何解决
Serverless 应用引擎(Serverless Application Engine, SAE)是一种完全托管的应用平台,它允许开发者无需管理服务器即可构建和部署应用。以下是Serverless 应用引擎使用过程中的一些常见问题及其答案的汇总:
392 3
|
5天前
|
分布式计算 大数据 MaxCompute
MaxCompute产品使用合集之使用pyodps读取OSS(阿里云对象存储)中的文件的步骤是什么
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5天前
|
分布式计算 Java 大数据
MaxCompute产品使用合集之大数据计算MaxCompute外部表映射了oss中的csv文件,看到"\N"被解析为"N",是什么原因
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6天前
|
运维 文字识别 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,需要处理的文件大于100MB如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
124 5
|
6天前
|
运维 Dubbo Java
Serverless 应用引擎产品使用之在 Serverless 应用引擎中,查看镜像文件中的 JAR 文件如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
134 2
|
6天前
|
缓存 运维 Serverless
Serverless 应用引擎产品使用之阿里函数计算中。将本地电脑上的项目文件部署到阿里云函数计算(FC)上并实现对外提供API和WebUI如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
24 1
|
6天前
|
运维 JavaScript Java
Serverless 应用引擎产品使用之阿里云Serverless函数计算中,在Node.js环境中执行jar文件如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
14 0
|
6天前
|
运维 Java Serverless
Serverless 应用引擎产品使用之数据文件(例如sdxl)超过了OSS(对象存储服务)的单个上传大小限制(5GB)如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
16 1

热门文章

最新文章

相关产品

  • 函数计算