UDF类型
描述
UDF User Defined Scalar Function
用户自定义标量值函数。其输入与输出是一对一的关系 即读入一行数据 输出一个值。
UDTF User Defined Table Valued Function
自定义表值函数。用于解决调用一次函数输出多行数据的需求。UDTF是唯一能够返回多个字段的自定义函数。UDTF不等于UDT User Defined Type 。
UDAF User Defined Aggregation Function
自定义聚合函数。其输入与输出是多对一的关系 即将多条输入记录聚合成一个输出值。UDAF可以与SQL中的GROUP BY语句联用。具体语法请参见
。
二、UDF参数解析MaxCompute数据类型与Java数据类型的对应关系如下。
注意点
此处ARRAY类型对应的Java类型是List 而不是数组。VARCHAR,BINART,STRUCT一些数据类型是ODPS独有的Java中对应的数据类型以及返回值数据类型是对象 数据类型首字母需大写。MaxCompute Type
Java Type
TINYINT
java.lang.Byte
SMALLINT
java.lang.Short
INT
java.lang.Integer
BIGINT
java.lang.Long
FLOAT
java.lang.Float
DOUBLE
java.lang.Double
DECIMAL
java.math.BigDecimal
BOOLEAN
java.lang.Boolean
STRING
java.lang.String
VARCHAR
com.aliyun.odps.data.Varchar
BINARY
com.aliyun.odps.data.Binary
DATETIME
java.util.Date
TIMESTAMP
java.sql.Timestamp
ARRAY
java.util.List
MAP
java.util.Map
STRUCT
com.aliyun.odps.data.Struct
MaxCompute 2.0版本支持定义Java UDF时 使用Writable类型作为参数和返回值。MaxCompute数据类型和Java Writable类型的映射关系如下。
MaxCompute Type
Java Writable Type
TINYINT
ByteWritable
SMALLINT
ShortWritable
INT
IntWritable
BIGINT
LongWritable
FLOAT
FloatWritable
DOUBLE
DoubleWritable
DECIMAL
BigDecimalWritable
BOOLEAN
BooleanWritable
STRING
Text
VARCHAR
VarcharWritable
BINARY
BytesWritable
DATETIME
DatetimeWritable
TIMESTAMP
TimestampWritable
INTERVAL_YEAR_MONTH
IntervalYearMonthWritable
INTERVAL_DAY_TIME
IntervalDayTimeWritable
ARRAY
N/A
MAP
N/A
STRUCT
N/A
MaxCompute SQL Type
Python 2 Type
BIGINT
INT
STRING
STR
DOUBLE
FLOAT
BOOLEAN
BOOL
DATETIME
INT
FLOAT
FLOAT
CHAR
STR
VARCHAR
STR
BINARY
BYTEARRAY
DATE
INT
DECIMAL
DECIMAL.DECIMAL
ARRAY
LIST
MAP
DICT
STRUCT
COLLECTIONS.NAMEDTUPLE
MaxCompute SQL Type
Python 3 Type
BIGINT
INT
STRING
UNICODE
DOUBLE
FLOAT
BOOLEAN
BOOL
DATETIME
DATETIME.DATETIME
FLOAT
FLOAT
CHAR
UNICODE
VARCHAR
UNICODE
BINARY
BYTES
DATE
DATETIME.DATE
DECIMAL
DECIMAL.DECIMAL
ARRAY
LIST
MAP
DICT
STRUCT
COLLECTIONS.NAMEDTUPLE
三、UDF的使用方式UDF、UDTF、UDAT可进行参考文档
https://help.aliyun.com/document_detail/27867.html?spm a2c4g.11186623.6.762.463d7468xnFPHb
JAVA UDF
UDF的高级使用
3.1UDF中的变长参数java语言
package com.mrtest.cn; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.annotation.Resolve; import java.util.ArrayList; import java.util.List; Resolve({ *- array }) public class TestUDF extends UDF { public List evaluate(String ... s) { List list new ArrayList(); for (String name : s) { list.add(name); return list; }
Python语言
from odps.udf import annotate annotate( *- bigint ) class ParamFunc(object): def evaluate(self, *nums): sum 0 for num in nums: sum num sum return sum3.2UDF的重载
注意事项 对于List与List是不能解析对应的方法的 这种属于类型擦除
package com.aliyun.odps.examples.udf; import com.aliyun.odps.udf.UDF; public class UDFExample extends UDF { public String evaluate(String a) { return s2s: a; public String evaluate(String a, String b) { return ss2s: a , b; public String evaluate(String a, String b, String c) { return sss2s: a , b , c; }3.3UDF访问对应文件和表
java语言
package com.aliyun.odps.examples.udf; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Iterator; public class UDFResource extends UDF { ExecutionContext ctx; long fileResourceLineCount; long tableResource1RecordCount; long tableResource2RecordCount; Override public void setup(ExecutionContext ctx) throws UDFException { this.ctx ctx; try { InputStream in ctx.readResourceFileAsStream( file_resource.txt ); BufferedReader br new BufferedReader(new InputStreamReader(in)); String line; fileResourceLineCount 0; while ((line br.readLine()) ! null) { fileResourceLineCount ; br.close(); Iterator iterator ctx.readResourceTable( table_resource1 ).iterator(); tableResource1RecordCount 0; while (iterator.hasNext()) { tableResource1RecordCount ; iterator.next(); iterator ctx.readResourceTable( table_resource2 ).iterator(); tableResource2RecordCount 0; while (iterator.hasNext()) { tableResource2RecordCount ; iterator.next(); } catch (IOException e) { throw new UDFException(e); * project: example_project table: wc_in2 partitions: p2 1,p1 2 columns: colc,colb public String evaluate(String a, String b) { return ss2s: a , b |fileResourceLineCount fileResourceLineCount |tableResource1RecordCount tableResource1RecordCount |tableResource2RecordCount tableResource2RecordCount; }
python语言
#coding: utf-8 from odps.udf import annotate from odps.distcache import get_cache_file annotate( double - double ) class Compute(object): def __init__(self): import json #获取对应文本文件 cache_file get_cache_file( file.txt ) dataMat [] for line in cache_file : curLine line.strip().split( , ) #处理逻辑 cache_file.close() #获取对应的表文件 records list(get_cache_table( table_resource1 )) for record in records: self.my_dict[record[0]] [record[1]] #处理逻辑 def evaluate(self, input): #处理逻辑
? ? ??
3.4UDF访问外部网络(VPC、外部网络、专有网络https://help.aliyun.com/document_detail/187866.html
3.5UDF使用第三方包https://help.aliyun.com/document_detail/189752.html
#coding: utf-8 # explode.py from odps.udf import annotate from odps.distcache import get_cache_archive import datetime def include_package_path(res_name): import os, sys archive_files get_cache_archive(res_name) dir_names sorted ([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if .dist_info not in f.name], key lambda v: len(v)) sys.path.append(os.path.dirname(dir_names[0])) annotate( string- boolean ) class is_workday_udf(object): def __init__(self): include_package_path( chinese-calendar-master.zip ) def evaluate(self, date_str): # try: import chinese_calendar date_strs date_str.split( - ) year_num int(date_strs[0]) month_num int(date_strs[1]) day_num int(date_strs[2]) date_num datetime.date(year year_num, month month_num, day day_num) result chinese_calendar.is_workday(date_num) return result # except: # return True
函数的注册
执行的select的的操作
set odsp.pypy.enabled false; set odps.isolation.session.enable true; select my_json( { info : 11 , desc : a|b , filename : 4b-2a-3c-4d-5b } ) as a;
java语言
CREATE TEMPORARY FUNCTION foo AS com.mypackage.Reverse USING #CODE ( lang JAVA ) package com.mypackage; import com.aliyun.odps.udf.UDF; public class Reverse extends UDF { public String evaluate(String input) { if (input null) return null; StringBuilder ret new StringBuilder(); for (int i input.toCharArray().length - 1; i 0; i--) { ret.append(input.toCharArray()[i]); return ret.toString(); #END CODE;
SELECT foo( abdc
嵌入式代码块可以置于USING后或脚本末尾 置于USING后的代码块作用域仅为CREATE TEMPORARY FUNCTION语句。CREATE TEMPORARY FUNCTION创建的函数为临时函数 仅在本次执行生效 不会存入MaxCompute的Meta系统。python语言
CREATE TEMPORARY FUNCTION foo AS embedded.UDFTest USING #CODE ( lang PYTHON , filename embedded ) from odps.udf import annotate annotate( bigint- bigint ) class UDFTest(object): def evaluate(self, a): return a * a #END CODE;
SELECT foo(4);
Python代码的缩进需要符合Python语言规范。由于注册Python UDF时AS后的类名需要包含Python源码的文件名 您可以通过’filename’ ’embedded’指定一个虚拟文件名。3.7使用SQL语言定义函数create sql function my_sum( a BIGINT, b BIGINT, c BIGINT) returns my_sum BIGINT as begin temp : a b; my_sum : temp c; end;
create sql function my_func( s STRING) AS if( s rlike git_(m|a) , 1, 0);
本文使用的方案 在其中使用的时候要注意几点
1.如果在使用企业版DataWorks的API测试时 出现权限错误 需要提工单开通DataWorks的API功能。
2.在使用时要注意计算的类型以及计算方式 将本文的实践代码略作修改 另外统计时也要关注各个字段的单位 以及数据类型方便最后的统计计算。
3.要仔细查看对应的文档介绍 熟悉其使用方法以及对应的参数设置。
欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
计算的下一步发展是什么,将如何影响组织的战略?专家预测了边缘计算在2021年的发...
本文转载自公众号读芯术(ID:AI_Discovery) 如果你即将要面临大型科技公司的技术...
开源 RPC 框架有哪些呢?一类是跟某种特定语言平台绑定的,另一类是与语言无关即...
来源 | 阿里飞天CIO学堂微信公众号 金融数字化转型过程中,市场的细微变化,客户...
与普通的IDC机房或服务器厂商相比,阿里云提供的云服务器ECS具有高可用性、安全...
为了使伸缩组自动加入的实例自动部署应用,您需要创建私有镜像,确保该镜像上有...
一、背景 ? 我们大部分人的编程习惯都是线性编程,所谓线性编程就是一个请求涉及...
最近,在为 Coco 优化分层架构之时,我陷入了各种决策困难之中。所以我通过不断...
一、数据中台是真的热 在2018年之前可能只有一少部分人在谈中台,从2018年下半年...
游戏市场的热度已经不言而喻,随着民众生活水平的提升,大家对于精神娱乐生活的...