如何使用函数计算实现流式处理大文件 — AWS S3 Select

本文涉及的产品
简介: 函数计算实现流式处理大文件

目前AWS 推出了S3 Select 预览版,对于带分隔符的文本文件(如csv)和 JSON数据,可使用简单的 SQL 表达式轻松检索对象内容中较小且具有针对性的数据集。

在某些场景中,用户需要对oss上的某个大文件抽取出某些信息。本教程展示如何结合oss和函数计算实现类似AWS S3 Select功能,以oss上的一个大小为400M,内含大约105万条记录的csv文件为例,讲解如果利用函数计算流式处理, 查找出包含'Lilly'的记录。

本demo做了什么?

1, 从一个包含100多万行记录的大csv文件中迅速(1s左右)查找到符合条件的记录,以查找出包含'Lilly'的记录为例,返回结果如下:

[
   "Lilly Patton,School,Weyerhauser Company,26",
   "Lilly Davidson,School,Bristol-Myers Squibb Company,50",
   "Lilly Drake,School,Federal Express Corp.,38",
   "Lilly Carlson,School,Pentair Inc,62",
   "Lilly Fisher,School,Progressive Corporation,42",
   "Lilly Morales,School,Mercury General Corporation,27",
   "Lilly Lewis,School,The Black & Decker Corporation,35",
   "Walter Rivera,School,Eli Lilly and Company,24",
   "Lilly Mills,School,Owens & Minor Inc.,60",
   "Lilly Jimenez,School,AOL Time Warner Inc.,42",
   ... 记录条目比较多,就不都展示了
]

2, 单线程分块读取处理与多线程分治处理方案效率比较,分别调用函数100次,耗时比较如下:

耗时 单线程分块读取处理 多线程分治处理
最小时间 6.69807887077 0.690281152725
最大时间 7.1105670929 0.946150064468
平均时间 6.85662093163 0.748612663746

具体步骤:

注:下面步骤中函数和oss的region是一致的,本例中都是华东2

1, 将例子的csv文件上传到oss中, 在本例子中,该csv文件上传到一个在华东2region中名叫demo-oss的oss bucket中

image

2, 创建好service,并且配置service的角色,该角色具有oss的读写权限和fc invoke权限
  • 2.1 用控制台创建service ApiFc, 控制台操作可以参考函数计算入门示例 - hello world
  • 2.2 用控制台创建一个具有oss读写权限和fc invoke权限的角色 fc-demo,并把角色赋予service ApiFc

2.2.1 用控制台创建角色并把角色赋予service ApiFc 的过程可以参考函数模板使用 中步骤6,7

  • 角色fc-demo,拥有两个policy,一个是对oss的读写,一个是函数调用
    image
  • 角色fc-demo 赋予 service ApiFc
    image
3, 在service ApiFc 下创建函数, 执行
3.1 单线程demo setup

从本文最后下载附件,解压以后可以看到三个zip文件, 用sing.zip创建函数single.

  • 创建函数
    image
  • 函数设置
    image
  • 创建成功后,进入代码编辑页面,配置一下event的参数,保存,直接按执行就可以执行函数了
    image
3.2 多线程demo setup
  • 3.2.1 参考single函数的创建方法,用query.zipquery_part.zipApiFc的service下面创建两个函数query和query_part
  • 3.2.2 主函数query会调用子函数query_part,如果你的service名字和子函数名不是'ApiFc'和'query_part',就需要修改query中相应的代码:
    image
  • 3.2.3 创建完成后,点击执行就行,如下图所示:
    image

具体代码实现:

以下是具体的两种解法:

  • 单线程分块读取处理 -> 速度比较慢, 适合小文件查询处理
  • 多线程分治处理 -> 速度快,适合大文件查询处理
一,单线程分块读取处理

配置event如下:

{
    "file_name": "sampleusers_10m_2.csv",
    "oss_bucket": "demo-oss",
    "mode": ".*Lilly.*"
}

注:目前mode的含义很简单,就是一条正则表达式,该例子就是寻找包含Lilly的记录

代码如下:

# coding=utf-8
import re
import json
import oss2
import logging
import time
import csv

CHUNK_SIZE = 1024*8
  
 # 函数服务主函数
def handler(event, context):
    start = time.time()
    evt = json.loads(event)
    endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
    oss_bucket, file_name, mode = evt['oss_bucket'], evt['file_name'],evt['mode']
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, endpoint, oss_bucket)
    start = time.time()
    r = bucket.get_object(file_name)
    last_str = ''
    result = []
    pattern = re.compile(mode, re.DOTALL)
    while 1:
    # 函数需要的内存跟CHUNK_SIZE有关,这边8k实验速度最快,当oss默认的chunk速度不合适的时候,可以尝试换下chunk大小
        data = r.read(CHUNK_SIZE) 
        if not data:
            break
        msg = (last_str + data) if last_str else data
        rows = msg.splitlines()
        if data[-1] != '\n':
            # 本数据块剩下的字符串,和下一个数据块首部一部分是完整的一行
            last_str = rows[-1]
            rows = rows[:-1]
        else:
            last_str = ''
            
        for row in rows:
            if pattern.match(row):
                result.append(row)
        # 如果使用csv库中可以转为可读性更好的dict的处理,可以支持更强的查询,但是性能很差,40s+
        # 如果真的需要强查询,这边可以结合'Lilly' in row,再csv.DictReader,能取个中间效果
        # 或者这边可以用正则来处理一些复杂查询
        # 或是基于querycsv进行一些改造,https://pythonhosted.org/querycsv/
        #f_csv = csv.DictReader(rows)
        #for row in f_csv:
        #    if row['name'] == 'Micheal Swanson':
        #        result.append(row)
        
     # 将查询结果写回oss
    csv_str = 'name,school,company,age\n' + '\n'.join(result)
    bucket.put_object('output.csv' , csv_str)
   
    # response这边显示100行
    if len(result) > 100:
        result = result[0:100]
        tips = "to many records, only show 100, you can see in oss output.csv"
        result = (tips, result)
    
    cost_time = time.time() - start
    print cost_time
    return result

上面这个例子调用100次数据,消耗时间如下:

avg: 6.85662093163

min: 6.69807887077

max: 7.1105670929

二,分治法

将大数据分割,使用尽量等分的规则将数据分片分别交给子函数query_part处理,然后将结果汇总

主函数query

配置event如下:

{
    "file_name": "sampleusers_10m_2.csv",
    "oss_bucket": "demo-oss",
    "mode": ".*Lilly.*",
    "part_num":16,
    "account_id":123456
}

注:上面的account_id替换成自己的阿里云account id,目前mode的含义很简单,就是一条正则表达式,该例子就是寻找包含Lilly的记录

代码如下:

# -*- coding: utf-8 -*-
import logging
import fc
import json
import oss2
import math
import threading
import time

result = []

# 文件一行最大为可能512bytes,这是一个尝试值
MAX_PER_ROW_BYTES = 512

class MyThread(threading.Thread):
    def run(self):
        # change your service, sub function
        r = self.client.invoke_function('ApiFc', 'query_part', self.payload) 
        global result
        result.extend(json.loads(r))

    def set_init_data(self, client, payload):
        self.client = client
        self.payload = payload
   
def handler(event, context):
    evt = json.loads(event)
    oss_endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
    oss_bucket, file_name, mode = evt['oss_bucket'], evt['file_name'],evt['mode']
    part_num = int(evt['part_num'])
    creds = context.credentials
    fc_endpoint = '{0}.{1}-internal.fc.aliyuncs.com'.format(evt['account_id'], context.region)
    client = fc.Client(
            endpoint= fc_endpoint,
            accessKeyID=creds.accessKeyId,
            accessKeySecret=creds.accessKeySecret,
            securityToken=creds.securityToken
        )
    
    # get bucket length
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
    r = bucket.get_object(file_name)
    bytes_length = int(r.stream.fileobj.response.headers['Content-Length'])
    
    # multiThread process
    global result 
    result = []
    threads = []
    PART_SIZE = int( math.ceil( float(bytes_length)/part_num ) )

    begin = 0
    end = PART_SIZE-1
    
    for i in xrange(part_num):
        # 这边分块一定要科学,即分出来的块是full lines
        off_set = -1
        if end < bytes_length - 1:
            for i in range(10000):
                if off_set == -1:
                    # 每次以MAX_PER_ROW_BYTES大小去找到本数据块的最后一个'\n'
                    r_s = bucket.get_object(file_name, byte_range=(end+MAX_PER_ROW_BYTES*i, end+MAX_PER_ROW_BYTES*(i+1)))
                    data = r_s.read()
                    off_set = data.find('\n')
                
                if off_set != -1:
                    break
                
            end = end + off_set
        
        payload={'begin':begin, 'end':end, 'oss_bucket':oss_bucket,'file_name':file_name, 'mode':mode}   
        t = MyThread()
        t.set_init_data(client, json.dumps(payload))
        t.start()
        threads.append(t)
        
        begin = end + 1
        end = end + PART_SIZE - 1
        if end >= bytes_length:
            end = bytes_length-1
        
        if begin >= bytes_length:
            break
        
    for t in threads:
        t.join()
        
    # 将查询结果写回oss
    csv_str = 'name,school,company,age\n' + '\n'.join(result)
    bucket.put_object('output.csv' , csv_str)
   
    # response这边显示100行
    if len(result) > 100:
        result = result[0:100]
        tips = "to many records, only show 100, you can see in oss output.csv"
        result = (tips, result)
     
    return result

具体的查询每个数据块的函数query_part代码如下:

# coding=utf-8
import re
import urllib
import json
import datetime
import oss2
import time
import csv

CHUNK_SIZE = 1024*8
  
 # 函数服务子函数
def handler(event, context):
    start = time.time()
    evt = json.loads(event)
    begin, end = evt['begin'], evt['end']
    file_name, oss_bucket = evt['file_name'], evt['oss_bucket']
    mode = evt['mode']
    oss_endpoint = "oss-{}-internal.aliyuncs.com".format(context.region)
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket)
    pattern = re.compile(mode, re.DOTALL)
    r = bucket.get_object(file_name, byte_range=(begin, end))
    last_str = ''
    result = []
    while 1:
        data = r.read(CHUNK_SIZE)
        if not data:
            break    
        msg = (last_str + data) if last_str else data
        rows = msg.splitlines()
        if data[-1] != '\n':
            last_str = rows[-1]
            rows = rows[:-1]
        else:
            last_str = ''
            
        for row in rows:
           if pattern.match(row):
                result.append(row)
    print "query_part #### ",  evt, time.time() - start, "last_str = ", last_str 
    return result

上面这个例子调用100次数据,分块的数目设置为16的时候,消耗时间如下:

avg: 0.748612663746

min: 0.690281152725

max: 0.946150064468

本地评估invoke的时间的代码如下:

注意:记得修改query主函数的返回值是函数执行时间

#coding=utf-8
import os

# account_id 改成自己的account_id
command = '''fcli function invoke -f query  -s ApiFc --event-str '{"file_name":  "sampleusers_10m_2.csv", "part_num": 16, "account_id": 12345, "oss_bucket": "demo-oss", "mode": ".*Lilly.*"}' '''

sum = 0
NUM = 100
time_lst = []
min_t = 10000000
max_t = 0

for i in xrange(NUM):
    r = os.popen(command) 
    info = r.readlines()  
    for line in info:  
        line = line.strip('\r\n')
        t = float(line)
        if t > max_t:
            max_t = t
        if t < min_t:
            min_t = t
        sum += t
        time_lst.append(t)

print "avg: ", sum/NUM
print "min: ", min_t
print "max: ", max_t
print "all: ", time_lst

总结:

从上面的方法和测试结果可以看出,利用分治的方法可以做到1秒左右从百万级别记录中查询到满足条件的记录。

相关实践学习
基于函数计算一键部署掌上游戏机
本场景介绍如何使用阿里云计算服务命令快速搭建一个掌上游戏机。
建立 Serverless 思维
本课程包括: Serverless 应用引擎的概念, 为开发者带来的实际价值, 以及让您了解常见的 Serverless 架构模式
目录
相关文章
|
3月前
|
Serverless Shell PHP
Serverless 应用引擎报错问题之下载文件报错如何解决
在进行Serverless应用开发和部署时,开发者可能会遇到不同类型的报错信息;本合集着重收录了Serverless环境中常见的报错问题及其解决策略,以助于开发者迅速诊断和解决问题,保证服务的连续性和可用性。
92 1
|
5月前
|
存储 Serverless
可以在函数计算FC中使用这些挂载目录来存储和访问你的文件和数据
可以在函数计算FC中使用这些挂载目录来存储和访问你的文件和数据
49 1
|
9月前
|
存储 人工智能 Serverless
将Stable Diffusion模型文件转存到FC环境的NAS
本文将会指导你开通基于NAS的Stable Diffusion 函数计算FC环境,并且可以将SD模型库的模型转存下载到FC应用下的NAS存储空间
1138 1
将Stable Diffusion模型文件转存到FC环境的NAS
|
22天前
|
存储 Cloud Native Serverless
云原生最佳实践系列 7:基于 OSS Object FC 实现非结构化文件实时处理
阿里云OSS对象存储方案利用函数计算FC,在不同终端请求时实时处理OSS中的原图,减少衍生图存储,降低成本。
|
1月前
|
缓存 监控 NoSQL
函数计算常见问题之提示文件错误如何解决
函数计算(Function Compute, FC)是阿里云提供的无服务器计算服务,它允许用户在无需管理服务器的情况下运行代码,但在配置和执行过程中可能遇到报错,本合集致力于梳理FC服务中的常见报错和配置问题,并提供解决方案,帮助用户优化函数执行环境。
57 0
函数计算常见问题之提示文件错误如何解决
|
3月前
|
Serverless 开发工具 git
Serverless 应用引擎报错问题之下载文件报错如何解决
Serverless部署是指将应用程序部署到无服务器架构中,该架构允许开发者专注于代码而无需关心底层服务器的运行和维护;针对Serverless部署过程中可能遇到的挑战,本合集提供全面的指南和最佳实践,帮助开发者顺利实现应用的无服务器化部署。
140 0
|
7月前
|
存储 弹性计算 Serverless
使用函数计算打包下载OSS文件
本场景介绍如何使用函数计算将对象存储OSS上多个文件(Object)打包下载到本地。
430 0
|
8月前
|
存储 JSON JavaScript
AWS Serverless培训分享
AWS Serverless培训分享
91 0
|
10月前
|
Serverless Go
阿里函数计算 go mod tidy 的时候提示文件只读
阿里函数计算 go mod tidy 的时候提示文件只读
94 1
|
存储 Serverless 对象存储
函数计算批量处理海量 OSS 文件
函数计算批量处理海量 OSS 文件自制脑图
97 0
函数计算批量处理海量 OSS 文件

热门文章

最新文章

相关产品

  • 函数计算