Serverless 工作流实现分布式定时调度

本文涉及的产品
简介:

Serverless 工作流 是一个高可用的任务编排服务,提供选择、并行、循环等流程控制,可视化的执行,异常捕捉和自动重试。极大简化复杂系统的开发和调试,让开发人员只需编写业务逻辑,免去流程控制和异常处理的重复性代码。

前言

对很多业务来说定时调度是最常见的需求,比如实现一个集群多台机器的定时状态检查。传统的通过 crond 服务来实现作业定时执行存在以下问题:

  1. 单机不稳定,如果 crond 服务停止或机器故障都会导致业务中断。
  2. 配置多台 worker 机器,分别启动 crond 服务来执行。worker 之间没法做到统一的调度,总的作业很难不重复的分配到各个 worker 上执行。
  3. 单个 worker 的执行从启动、执行、返回都是黑盒,无法可视化。基于所有 worker 总的作业数据也很难搜集。

这里的 worker 是指在某台机器上执行作业的程序。

使用 Serverless 工作流定时调度功能,可以非常简单的解决上述问题,主要有以下优势:

  1. 云端统一的定时调度,可靠性不受单个 worker 所在机器的影响。
  2. 定制化的 worker 输入,多个 worker 的输出自动聚合。
  3. 整个执行流程中每一步都是可视化的,并且可对单个 worker 执行异常做自动重试。

使用流程

可先参考帮助文档 使用 MNS 服务集成及回调编排任意任务类型 的单 worker 实现。以下主要介绍实现多 worker 以及定时调度。

创建 MNS 队列

前往 MNS 控制台 创建用于存放 worker 要执行的任务队列,比如命名为 workers

创建执行 worker 的工作流 flow

前往 Serverless 工作流控制台 使用以下定义创建工作流,比如命名为 demo-schedule-workers

version: v1
type: flow
steps:
  - type: foreach  # 并行循环步骤,并行的下发任务消息到 MNS 队列。
    name: workersForeach
    iterationMapping:
      collection: $.payload.workers
      item: workerName
    steps:
      - type: task
        name: workerTask
        resourceArn: acs:mns:::/queues/workers/messages  # 表示该任务(Task)步骤会向同区域, 同账号下的 MNS 队列 fnf-demo 发送消息。
        pattern: waitForCallback  # 表示该任务步骤在发送 MNS 消息成功后会暂停,直到收到回调。
        inputMappings:
            - target: task_token
              source: $context.task.token  # 从 context 对象中获取标识该任务的令牌 (task token)。
            - target: worker_name
              source: $input.workerName
        serviceParams:  # 服务集成参数。
            MessageBody: $  # 用映射后的 input 作为要发送消息的内容。
            Priority: 1  # 消息队列的优先级。

该流程主要做以下事情:

  1. 读取输入的 workers 任务列表。
  2. 通过 foreach 并行循环步骤遍历任务列表,并行执行 task 步骤 workerTask 将任务消息和系统自动生成的任务 taskToken 下发到 MNS 队列中。
  3. 流程阻塞,等待所有 task 执行完毕和上报状态。

taskToken 为 task 步骤中系统自动生成的 token,用于任务的 worker 报告状态。

编写 worker 脚本

worker 循环读取任务队列,执行任务(可以是任意类型的作业),作业完成后上报状态到工作流中。
一个简单的示例 worker.py 如下:

def main():
    region = os.environ['REGION']
    account_id = os.environ['ACCOUNT_ID']
    ak_id = os.environ['AK_ID']
    ak_secret = os.environ['AK_SECRET']

    queue_name = 'workers'
    fnf_client = AcsClient(
        ak_id,
        ak_secret,
        region,
        debug=False
    )

    mns_endpoint = 'https://{}.mns.{}.aliyuncs.com'.format(account_id, region)
    my_account = Account(mns_endpoint, ak_id, ak_secret)
    my_queue = my_account.get_queue(queue_name)
    my_queue.set_encoding(False)
    wait_seconds = 30

    try:
        while True:
            try:
                # Read message from mns queue
                print('Receiving messages...')
                recv_msg = my_queue.receive_message(wait_seconds)
                print('Received message: {}, body: {}'.format(recv_msg.message_id, recv_msg.message_body))

                # Parse message
                body = json.loads(recv_msg.message_body)
                task_token = body['task_token']
                worker_name = body['worker_name']

                # TODO here to implement your own worker logic
                worker()

                # After worker execution completed, report status to workflow
                output = {
                    worker_name: 'success'
                }
                output_str = json.dumps(output)
                request = ReportTaskSucceededRequest.ReportTaskSucceededRequest()
                request.set_Output(output_str)
                request.set_TaskToken(task_token)
                resp = fnf_client.do_action_with_exception(request)
                print('Report worker: {}, response: {}'.format(worker_name, resp))

                # Delete mns message in queue
                my_queue.delete_message(recv_msg.receipt_handle)
                print('Deleted message: {}'.format(recv_msg.message_id))
            except MNSExceptionBase as e:
                print(e)
            except ServerException as e:
                print(e)
                if e.error_code == 'TaskAlreadyCompleted':
                    my_queue.delete_message(recv_msg.receipt_handle)
                    print('Task already completed, deleted message: {}'.format(recv_msg.message_id))
    except ServerException as e:
        print(e)


def worker():
    print('Hello Serverless Workflow')


if __name__ == '__main__':
    main()
    

为工作流配置定时调度

可参考文档 创建定时调度,设置触发消息:

{"workers": ["worker1", "worker2", "worker3"]}

等待一段时间后,可看到工作流被定时执行,并处于等待 worker 执行状态:
image

启动本地 worker

在本地执行 python worker.py 启动 worker,可一台机器启动多个 worker 或在不同的机器上分别启动。
worker 启动后,可以看到工作流成功执行:
image

总结

通过 Serverless 工作流无需配置任何的服务器,就能实现一个分布式的定时调度系统。欢迎加入我们。

工作流官网客户群:
image

相关实践学习
基于函数计算一键部署掌上游戏机
本场景介绍如何使用阿里云计算服务命令快速搭建一个掌上游戏机。
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
1月前
|
监控 关系型数据库 Serverless
Serverless 应用引擎常见问题之工作流这执行输出通过jsonpath过滤如何解决
Serverless 应用引擎(Serverless Application Engine, SAE)是一种完全托管的应用平台,它允许开发者无需管理服务器即可构建和部署应用。以下是Serverless 应用引擎使用过程中的一些常见问题及其答案的汇总:
391 3
|
4月前
|
运维 Serverless API
四大软件架构:掌握单体、分布式、微服务、Serverless 的精髓
如果一个软件开发人员,不了解软件架构的演进,会制约技术的选型和开发人员的生存、晋升空间。这里我列举了目前主要的四种软件架构以及他们的优缺点,希望能够帮助软件开发人员拓展知识面。
|
6月前
|
运维 Java Serverless
深度解析四大主流软件架构模型:单体架构、分布式应用、微服务与Serverless的优缺点及场景应用
深度解析四大主流软件架构模型:单体架构、分布式应用、微服务与Serverless的优缺点及场景应用
427 0
|
3月前
|
存储 Kubernetes Cloud Native
云原生离线工作流编排利器 -- 分布式工作流 Argo 集群
云原生离线工作流编排利器 -- 分布式工作流 Argo 集群
105121 2
|
4月前
|
数据可视化 Linux 调度
DolphinScheduler【部署 01】分布式可视化工作流任务调度工具DolphinScheduler部署使用实例分享(一篇入门学会使用DolphinScheduler)
DolphinScheduler【部署 01】分布式可视化工作流任务调度工具DolphinScheduler部署使用实例分享(一篇入门学会使用DolphinScheduler)
187 0
|
SQL 负载均衡 Java
分布式定时调度:xxl-job 万字详解
分布式定时调度:xxl-job 万字详解
|
数据采集 Serverless
《Serverless 开发实战-快速开发一个分布式Puppeteer 网页截图服务》电子版地址
Serverless 开发实战-快速开发一个分布式Puppeteer 网页截图服务
《Serverless 开发实战-快速开发一个分布式Puppeteer 网页截图服务》电子版地址
|
Serverless
《Serverless 工作流适用场景及最佳实践》电子版地址
Serverless 工作流适用场景及最佳实践
119 0
《Serverless 工作流适用场景及最佳实践》电子版地址
|
Serverless
如何在云平台创建一个 FC——用 Serverless 协调工作流
如何在云平台创建一个 FC——用 Serverless 协调工作流自制脑图
135 0
如何在云平台创建一个 FC——用 Serverless 协调工作流
|
Serverless
Serverless 工作流
Serverless 工作流自制脑图
76 0
Serverless 工作流

相关产品

  • 函数计算