Python实现MaxCompute UDF/UDAF/UDTF

简介: 参数与返回值类型 参数与返回值通过如下方式指定: @odps.udf.annotate(signature) Python UDF目前支持ODPS SQL数据类型有:bigint, string, double, boolean和datetime。

参数与返回值通过如下方式指定:

@odps.udf.annotate(signature)

Python UDF目前支持ODPS SQL数据类型有:bigint, string, double, boolean和datetime。SQL语句在执行之前,所有函数的参数类型和返回值类型必须确定。因此对于Python这一动态类型语言,需要通过对UDF类加decorator的方式指定函数签名。

函数签名signature通过字符串指定,语法如下:

arg_type_list '->' type_list

 

arg_type_list: type_list | '*' | ''

 

type_list: [type_list ','] type

 

type: 'bigint' | 'string' | 'double' | 'boolean' | 'datetime'

·         箭头左边表示参数类型,右边表示返回值类型。

·         只有UDTF的返回值可以是多列, UDF和UDAF只能返回一列。

·         ‘*’代表变长参数,使用变长参数,UDF/UDTF/UDAF可以匹配任意输入参数。

下面是合法的signature的例子:

'bigint,double->string'            # 参数为bigintdouble,返回值为string

 

'bigint,boolean->string,datetime'  # UDTF参数为bigintboolean,返回值为string,datetime

 

'*->string'                        # 变长参数,输入参数任意,返回值为string

 

'->double'                         # 参数为空,返回值为double

Query语义解析阶段会将检查到不符合函数签名的用法,抛出错误禁止执行。执行期,UDF函数的参数会以函数签名指定的类型传给用户。用户的返回值类型也要与函数签名指定的类型一致,否则检查到类型不匹配时也会报错。ODPS SQL数据类型对应Python类型如下:

ODPS SQL Type

Bigint

String

Double

Boolean

Datetime

Python Type

int

str

float

bool

int

注解:

·         Datetime类型是以int的形式传给用户代码的,值为epoch utc time起始至今的毫秒数。用户可以通过Python标准库中的datetime模块处理日期时间类型。

·         NULL值对应Python里的None。

此外,odps.udf.int(value[, silent=True])的参数也做了调整。增加了参数 silent 。当 silent 为 True 时,如果 value 无法转为 int ,不会抛出异常,而是返回 None 。

UDF

实现Python UDF非常简单,只需要定义一个new-style class,并实现 evaluate 方法。下面是一个例子:

from odps.udf import annotate

 

@annotate("bigint,bigint->bigint")

class MyPlus(object):

 

   def evaluate(self, arg0, arg1):

       if None in (arg0, arg1):

           return None

       return arg0 + arg1

注解:Python UDF必须通过annotate指定函数签名。

·         class odps.udf.BaseUDAF:继承此类实现Python UDAF。

·         BaseUDAF.new_buffer():实现此方法返回聚合函数的中间值的buffer。buffer必须是mutable object(比如list, dict),并且buffer的大小不应该随数据量递增,在极限情况下,buffer marshal过后的大小不应该超过2Mb。

·         BaseUDAF.iterate(buffer[, args, ...]):实现此方法将args聚合到中间值buffer中。

·         BaseUDAF.merge(buffer, pbuffer):实现此方法将两个中间值buffer聚合到一起,即将pbuffer merge到buffer中。

·         BaseUDAF.terminate(buffer):实现此方法将中间值buffer转换为ODPS SQL基本类型。

下面是一个UDAF求平均值的例子。

#coding:utf-8

from odps.udf import annotate

from odps.udf import BaseUDAF

 

@annotate('double->double')

class Average(BaseUDAF):

 

    def new_buffer(self):

        return [0, 0]

 

    def iterate(self, buffer, number):

        if number is not None:

            buffer[0] += number

            buffer[1] += 1

 

    def merge(self, buffer, pbuffer):

        buffer[0] += pbuffer[0]

        buffer[1] += pbuffer[1]

 

    def terminate(self, buffer):

        if buffer[1] == 0:

            return 0.0

        return buffer[0] / buffer[1]

·         class odps.udf.BaseUDTF:Python UDTF的基类,用户继承此类,并实现 process , close 等方法。

·         BaseUDTF.init():初始化方法,继承类如果实现这个方法,则必须在一开始调用基类的初始化方法 super(BaseUDTF, self).init() 。init 方法在整个UDTF生命周期中只会被调用一次,即在处理第一条记录之前。当UDTF需要保存内部状态时,可以在这个方法中初始化所有状态。

·         BaseUDTF.process([args, ...]):这个方法由ODPS SQL框架调用,SQL中每一条记录都会对应调用一次 process , process 的参数为SQL语句中指定的UDTF输入参数。

·         BaseUDTF.forward([args, ...]):UDTF的输出方法,此方法由用户代码调用。每调用一次 forward ,就会输出一条记录。 forward 的参数为SQL语句中指定的UDTF的输出参数。

·         BaseUDTF.close():UDTF的结束方法,此方法由ODPS SQL框架调用,并且只会被调用一次,即在处理完最后一条记录之后。

下面是一个UDTF的例子。

#coding:utf-8

# explode.py

 

from odps.udf import annotate

from odps.udf import BaseUDTF

 

 

@annotate('string -> string')

class Explode(BaseUDTF):

   """string按逗号分隔输出成多条记录

   """

 

   def process(self, arg):

       props = arg.split(',')

       for p in props:

           self.forward(p)

注解:Python UDTF也可以不加annotate指定参数类型和返回值类型。这样,函数在SQL中使用时可以匹配任意输入参数,但返回值类型无法推导,所有输出参数都将认为是string类型。因此在调用 forward 时,就必须将所有输出值转成 str 类型。

Python UDF可以通过 odps.distcache 模块引用资源文件,目前支持引用文件资源和表资源。

·         odps.distcache.get_cache_file(resource_name)

o    返回指定名字的资源内容。 resource_name 为 str 类型,对应当前Project中已存在的资源名。如果资源名非法或者没有相应的资源,会抛出异常。

o    返回值为 file-like object ,在使用完这个object后,调用者有义务调用 close 方法释放打开的资源文件。

下面是使用 get_cache_file 的例子:

from odps.udf import annotate

from odps.distcache import get_cache_file

 

@annotate('bigint->string')

class DistCacheExample(object):

 

def __init__(self):

    cache_file = get_cache_file('test_distcache.txt')

    kv = {}

    for line in cache_file:

        line = line.strip()

        if not line:

            continue

        k, v = line.split()

        kv[int(k)] = v

    cache_file.close()

    self.kv = kv

 

def evaluate(self, arg):

    return self.kv.get(arg)

·         

odps.distcache.get_cache_table(resource_name)

o    返回指定资源表的内容。 resource_name 为 str 类型,对应当前Project中已存在的资源表名。如果资源名非法或者没有相应的资源,会抛出异常。

o    返回值为 generator 类型,调用者通过遍历获取表的内容,每次遍历得到的是以 tuple 形式存在的表中的一条记录。

下面是使用 get_cache_table 的例子:

from odps.udf import annotate

from odps.distcache import get_cache_table

 

@annotate('->string')

class DistCacheTableExample(object):

    def __init__(self):

        self.records = list(get_cache_table('udf_test'))

        self.counter = 0

        self.ln = len(self.records)

 

    def evaluate(self):

        if self.counter > self.ln - 1:

            return None

        ret = self.records[self.counter]

        self.counter += 1

        return str(ret)

 


FAQ:

Q:pyudf 如何像 java udf一样 初始化 加载数据?

A:写在 init 里。


相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
7天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
17天前
|
机器学习/深度学习 分布式计算 数据挖掘
阿里云 MaxCompute MaxFrame 开启免费邀测,统一 Python 开发生态
阿里云 MaxCompute MaxFrame 正式开启邀测,统一 Python 开发生态,打破大数据及 AI 开发使用边界。
196 1
|
21天前
|
机器学习/深度学习 人工智能 数据可视化
基于Python的数据可视化技术在大数据分析中的应用
传统的大数据分析往往注重数据处理和计算,然而数据可视化作为一种重要的技术手段,在大数据分析中扮演着至关重要的角色。本文将介绍如何利用Python语言中丰富的数据可视化工具,结合大数据分析,实现更直观、高效的数据展示与分析。
|
1月前
|
算法 大数据 数据挖掘
python数据分析——大数据伦理风险分析
大数据伦理风险分析在当前数字化快速发展的背景下显得尤为重要。随着大数据技术的广泛应用,企业、政府以及个人都在不断地产生、收集和分析海量数据。然而,这些数据的利用也带来了诸多伦理风险,如隐私泄露、数据滥用、算法偏见等。因此,对大数据伦理风险进行深入分析,并采取相应的防范措施,对于保障数据安全、维护社会公平正义具有重要意义。
47 0
|
1月前
|
存储 大数据 数据挖掘
python数据分析——大数据和云计算
大数据和云计算作为当代信息技术的两大核心驱动力,正在以前所未有的速度改变着我们的生活、工作和思维方式。它们不仅为各行各业的创新提供了强大的技术支持,更是推动了整个社会的数字化转型。 从大数据的角度来看,它的核心价值在于通过对海量数据的收集、存储、分析和挖掘,发现其中的关联性和趋势,从而为决策提供更为科学、精准的依据。无论是商业领域的市场预测、消费者行为分析,还是公共服务领域的城市规划、交通管理,大数据都发挥着不可或缺的作用。同时,随着物联网、传感器等技术的普及,大数据的来源和种类也在不断扩展,这使得我们能够更全面地认识世界,把握规律。
47 0
|
2月前
|
分布式计算 DataWorks 大数据
maxcompute函数问题之udaf函数传到线上报错如何解决
MaxCompute函数包括内置函数和自定义函数(UDF),它们用于在MaxCompute平台上执行数据处理和分析任务;本合集将介绍MaxCompute函数的使用方法、函数编写和优化技巧,以及常见的函数错误和解决途径。
|
2月前
|
分布式计算 并行计算 大数据
Python多进程在数据处理和大数据分析中的应用
Python多进程在数据处理和大数据分析中的应用
|
2月前
|
数据采集 数据可视化 大数据
处理大数据:Python 与数据库的结合
在处理大数据的领域中,Python 凭借其强大的数据处理和分析能力,成为了与数据库结合的理想选择。Python 提供了丰富的数据库接口和工具,可以与各种主流的关系型数据库和 NoSQL 数据库进行高效交互。本文将探讨 Python 在处理大数据方面与数据库结合的一些关键技术和应用。
|
2月前
|
大数据 Java 编译器
关于 Python 在 for 循环里处理大数据的一些推荐方法
关于 Python 在 for 循环里处理大数据的一些推荐方法
38 0
|
4月前
|
存储 数据采集 数据可视化
【大数据实训】python石油大数据可视化(八)
【大数据实训】python石油大数据可视化(八)
46 1

相关产品

  • 云原生大数据计算服务 MaxCompute