利用函数计算流式 gz 打包 ECS 上的单个 超大文件

本文涉及的产品
简介: 在某些业务场景下,生成超大的日志文件或者其他文件, 这些文件需要及时移出并 gz 压缩保存到 OSS,但是压缩文件可能会大于 3G 超出函数计算执行环境的最大内存限制, 本文提供流式解决这个问题的方案

背景

在某些业务场景下,生成超大的日志文件或者其他文件, 这些文件需要及时移出并 gz 压缩保存到 OSS,但是压缩文件可能会大于 3G 超出函数计算执行环境的最大内存限制, 本文提供流式解决这个问题的方案

  • 函数计算配置VPC, 内网打通ecs
  • OSS 和 函数计算在相同 region, 内网传输

示例代码

依赖使用第三方库 paramiko, 但是默认的库在传输大文件上有传输速率限制, 需要做如下改造, 同时构造 paramiko.SFTPClient 的时候需要设置好 window_size 和 max_packet_size 这两个参数

image

import paramiko
import gzip
import oss2
import logging
import os
import time

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.22.3"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

def handler(event, context):
    start = time.time()
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    window_size = 2 ** 28
    max_packet_size = 2 ** 26
    sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)
    
    pos = 0
    CHUNK_SIZE = 1024*1024
    APPEND_SIZE = 128
     
    LOG_FILE = "test-1.log"
    DST_OBJ = 'dst/' + LOG_FILE + '.gz'
    
    data_out = []
    with sftp.open("/root/" + LOG_FILE, "r") as f:
        while 1:
            data=f.read(CHUNK_SIZE)      
            if not data:
                if len(data_out) > 0 and len(data_out) < APPEND_SIZE:
                  result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
                break    
            data_out.append(gzip.compress(data))
            if len(data_out) == APPEND_SIZE:
                upload_data = b"".join(data_out)
                result = bucket.append_object(DST_OBJ, pos, b"".join(data_out))
                pos += len(upload_data)
                data_out = []
    
    print("total time = ", time.time() - start)
    return "OK"

这个方案, 虽然解决了函数计算内存限制, 但是对于某些超大文件, 比如15G 以上的文件, 10分钟的时间限制又是一个limit

ImproveMent

OSS 支持分片上传功能,那基于分片上传,配合 FC 的弹性伸缩功能, 可以有如下方案:

image

示例代码:

master:

# -*- coding: utf-8 -*-
import logging
import fc2
import oss2
import paramiko
import math
import json
import time
from threading import Thread

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

SRC_LOG_FILE =  "/root/huge.log"
DST_FILE = "jiahe/push_log_50000_5G.gz"
BUCKET_NAME = "oss-demo"

ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

SUB_LOG = 5*1024*1024*1024  # 交给每个work函数处理的日志大小
PART_SIZE = 16 * 1024 * 1024  # work 函数上传分片的大小, 和work函数大小必须相同

SERVICE_NMAE = "fc_demo"    # work函数所在的service, 最好在同一个service
SUB_FUNCTION_NAME = "worker"  # work函数的名字

parts = []
def sub_gz(fcClient, subevent):
    content=fcClient.invoke_function(SERVICE_NMAE, SUB_FUNCTION_NAME, json.dumps(subevent)).data
    global parts
    #print(content)
    parts.extend(json.loads(content))

def handler(event, context):
    #start = time.time()
    global parts
    parts = []
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    sftp=paramiko.SFTPClient.from_transport(scp)
    info = sftp.stat(SRC_LOG_FILE)
    total_size = info.st_size
    print(total_size)
    sftp.close()

    endpoint = "http://{0}.{1}-internal.fc.aliyuncs.com".format(context.account_id, context.region)
    fcClient = fc2.Client(endpoint=endpoint,accessKeyID=creds.accessKeyId,accessKeySecret=creds.accessKeySecret, securityToken=creds.securityToken,Timeout=900)

    threadNum = int(math.ceil(float(total_size)/SUB_LOG))
    key = DST_FILE
    upload_id = bucket.init_multipart_upload(key).upload_id

    part_step = int(math.ceil(float(SUB_LOG)/PART_SIZE))
    ts = []
    left_size = total_size
    for i in range(threadNum):
        part_start = part_step * i + 1
        size = SUB_LOG if SUB_LOG < left_size else left_size
        subEvt = {
          "src": SRC_LOG_FILE,
          "dst": DST_FILE,
          "offset": i * SUB_LOG,
          "size": size,
          "part_number":  part_start, 
          "upload_id" : upload_id,
        }
        print(i, subEvt)
        t = Thread(target=sub_gz, args=(fcClient, subEvt,))
        left_size = left_size - SUB_LOG
        t.start()
        ts.append(t)

    for t in ts:
        t.join()

    parts.sort(key=lambda k: (k.get('part_number', 0)))
    #print(parts)
    part_objs = []
    for part in parts:
        part_objs.append(oss2.models.PartInfo(part["part_number"], part["etag"], size = part["size"], part_crc = part["part_crc"]))
    bucket.complete_multipart_upload(key, upload_id, part_objs)
    #print(time.time() - start)
    return "ok"

worker:

import paramiko
import gzip
import oss2
import logging
import os
import time
import json

logging.getLogger("oss2.api").setLevel(logging.ERROR)
logging.getLogger("oss2.auth").setLevel(logging.ERROR)

# config
BUCKET_NAME = "oss-demo"
ECS_INNER_IP = "192.168.2.12"
SSH_PORT = 22
USR_NAME = 'xiaoming'
USR_PWD = '123456'

CHUNK_SIZE = 1024*1024
PART_SIZE = 16 * CHUNK_SIZE

def handler(event, context):
    region = context.region
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, 'oss-' + region + '-internal.aliyuncs.com', BUCKET_NAME)

    scp=paramiko.Transport((ECS_INNER_IP, SSH_PORT))
    scp.connect(username=USR_NAME, password=USR_PWD)
    window_size = 2 ** 30
    max_packet_size = 2 ** 28
    sftp=paramiko.SFTPClient.from_transport(scp,window_size=window_size, max_packet_size=max_packet_size)

    evt = json.loads(event)
    src_log_file = evt['src']
    offset = evt['offset']
    size = evt['size']
    dst_gz_file = evt['dst']
    part_number = int(evt['part_number'])
    upload_id = evt['upload_id']

    key =  dst_gz_file
    data_out = []
    partsInfo = []

    with sftp.open(src_log_file, "r") as f:
        f.seek(offset)
        while 1:
            data=f.read(CHUNK_SIZE)
            cur = f.tell()
            if not data or (cur > offset + size):
                if data_out:
                    upload_data = b"".join(data_out)
                    size_to_upload = len(upload_data)
                    # 这里有可能出现分片不足100K的情况, 比如你的文件是 15G+1k, 这个时候出现1K漏单的情况或者即使大于100K但是压缩之后小于100K的情况
                    # 对于日志文件, 可以考虑填充点无效的字符在后面
                    if size_to_upload < 100 * 1024:
                        # fill_data=b"\n\n\n\n\n\n\n\n\n\nAliyunFCFill" + os.urandom(102400)
                        fill_data=b"\t\t\t\t\t\t\t\t\t\t\t\n"*1024*1024*5
                        upload_data += gzip.compress(fill_data) # fill_data 压缩后的结果为>100K
                        size_to_upload = len(upload_data)
                    result = bucket.upload_part(key, upload_id, part_number, upload_data)
                    partsInfo.append({
                        "part_number":part_number,
                        "etag":result.etag,
                        "size":size_to_upload,
                        "part_crc":result.crc})
                break
            data_out.append(gzip.compress(data))
            # 16M gz压缩的结果生成一个分片, oss要求一个分片最小为100K(102400), 通常16M压缩后的文件应该大于100K
            if (cur - offset) % PART_SIZE == 0: 
                upload_data = b"".join(data_out)
                size_to_upload = len(upload_data)
                result = bucket.upload_part(key, upload_id, part_number, upload_data)
                partsInfo.append({
                      "part_number":part_number,
                      "etag":result.etag,
                      "size":size_to_upload,
                      "part_crc":result.crc})
                part_number += 1
                data_out = []
    
    return json.dumps(partsInfo)
相关实践学习
一小时快速掌握 SQL 语法
本实验带您学习SQL的基础语法,快速入门SQL。
7天玩转云服务器
云服务器ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,可降低 IT 成本,提升运维效率。本课程手把手带你了解ECS、掌握基本操作、动手实操快照管理、镜像管理等。了解产品详情:&nbsp;https://www.aliyun.com/product/ecs
目录
相关文章
|
27天前
|
存储 弹性计算 数据可视化
要将ECS中的文件直接传输到阿里云网盘与相册(
【2月更文挑战第31天】要将ECS中的文件直接传输到阿里云网盘与相册(
413 4
|
2月前
|
数据可视化 Shell Linux
shell+crontab+gitlab实现ecs服务器文件的web展示
本文通过把ecs服务器上的文件定时上传至gitlab,实现文件的页面可视化和修改历史。技术点:shell、crontab、gitlab。
50 3
|
23天前
|
监控 Serverless
函数计算(FC)作为一种无服务器的计算服务,在使用过程中可能会遇到各种问题
函数计算(FC)作为一种无服务器的计算服务,在使用过程中可能会遇到各种问题
20 4
|
3月前
|
Serverless Shell PHP
Serverless 应用引擎报错问题之下载文件报错如何解决
在进行Serverless应用开发和部署时,开发者可能会遇到不同类型的报错信息;本合集着重收录了Serverless环境中常见的报错问题及其解决策略,以助于开发者迅速诊断和解决问题,保证服务的连续性和可用性。
92 1
|
1月前
|
Linux 网络安全 Python
如何在服务器上运行python文件
如何在服务器上运行python文件
|
16天前
|
存储 Cloud Native Serverless
云原生最佳实践系列 7:基于 OSS Object FC 实现非结构化文件实时处理
阿里云OSS对象存储方案利用函数计算FC,在不同终端请求时实时处理OSS中的原图,减少衍生图存储,降低成本。
|
24天前
|
安全 数据处理 C#
C# Post数据或文件到指定的服务器进行接收
C# Post数据或文件到指定的服务器进行接收
|
1月前
|
缓存 监控 NoSQL
函数计算常见问题之提示文件错误如何解决
函数计算(Function Compute, FC)是阿里云提供的无服务器计算服务,它允许用户在无需管理服务器的情况下运行代码,但在配置和执行过程中可能遇到报错,本合集致力于梳理FC服务中的常见报错和配置问题,并提供解决方案,帮助用户优化函数执行环境。
55 0
函数计算常见问题之提示文件错误如何解决
|
2月前
|
Java
java上传、下载、预览、删除ftp服务器上的文件
java上传、下载、预览、删除ftp服务器上的文件
|
2月前
|
Linux
本地下载使用证书登陆的linux服务器的文件的命令
本地下载使用证书登陆的linux服务器的文件的命令

热门文章

最新文章

相关产品

  • 函数计算