当前位置:主页 > 查看内容

PyFlink Table API - Python 自定义函数

发布时间:2021-05-12 00:00| 位朋友查看

简介:DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())])) 说明: 需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate……
DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())]))

说明:

需要通过名字为 “ udaf ” 的装饰器,声明这是一个 aggregate function,需要分别通过装饰器中的 result_type 及 accumulator_type 参数,声明 aggregate function 的结果类型及 accumulator 类型;create_accumulator,get_value 和 accumulate 这 3 个方法必须要定义,retract 方法可以根据需要定义,详细信息可以参见 Flink 官方文档 [1];需要注意的是,由于必须定义 create_accumulator,get_value 和 accumulate 这 3 个方法,Python UDAF 只能通过继承AggregateFunction 的方式进行定义(Pandas UDAF 没有这方面的限制)。

定义完 Python UDAF 之后,可以在 Python Table API 中这样使用:

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")],
 ["value", "count", "name"])
t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg"))
Python UDTAF

Python UDTAF,即 Python TableAggregateFunction。Python UDTAF 用来针对一组数据进行聚合运算,比如同一个 window 下的多条数据、或者同一个 key 下的多条数据等,与 Python UDAF 不同的是,针对同一组输入数据,Python UDTAF 可以产生 0 条、1 条、甚至多条输出数据。

以下示例,定义了一个名字为 Top2 的 Python UDTAF:

from pyflink.common import Row
from pyflink.table import DataTypes
from pyflink.table.udf import udtaf, TableAggregateFunction
class Top2(TableAggregateFunction):
 def create_accumulator(self):
 # 存储当前最大的两个值
 return [None, None]
 def accumulate(self, accumulator, input_row):
 if input_row[0] is not None:
 # 新的输入值最大
 if accumulator[0] is None or input_row[0] accumulator[0]:
 accumulator[1] = accumulator[0]
 accumulator[0] = input_row[0]
 # 新的输入值次大
 elif accumulator[1] is None or input_row[0] accumulator[1]:
 accumulator[1] = input_row[0]
 def emit_value(self, accumulator): 
 yield Row(accumulator[0])
 if accumulator[1] is not None:
 yield Row(accumulator[1])
top2 = udtaf(f=Top2(),
 result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),
 accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))

说明:

Python UDTAF 功能是 Flink 1.13 之后支持的新功能;create_accumulator,accumulate 和 emit_value 这 3 个方法必须定义,此外 TableAggregateFunction 中支持 retract、merge 等方法,可以根据需要选择是否定义,详细信息可以参见 Flink 官方文档[2]。

定义完 Python UDTAF 之后,可以在 Python Table API 中这样使用:

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t = t_env.from_elements([(1, 'Hi', 'Hello'),
 (3, 'Hi', 'hi'),
 (5, 'Hi2', 'hi'),
 (2, 'Hi', 'Hello'),
 (7, 'Hi', 'Hello')],
 ['a', 'b', 'c'])
t_env.execute_sql("""
 CREATE TABLE my_sink (
 word VARCHAR,
 `sum` BIGINT
 ) WITH (
 'connector' = 'print'
 """)
result = t.group_by(t.b).flat_aggregate(top2).select("b, a").execute_insert("my_sink")
# 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
# 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
result.wait()

当执行以上程序,可以看到类似如下输出:

11 +I[Hi, 7]
10 +I[Hi2, 5]
11 +I[Hi, 3]

说明:

Python UDTAF 只能用于 Table API,不能用于 SQL 语句中;flat_aggregate 的结果包含了原始的 grouping 列以及 UDTAF(top 2)的输出,因此,可以在 select 中访问列 “ b ”。Python 自定义函数进阶在纯 SQL 作业中使用 Python 自定义函数

Flink SQL 中的 CREATE FUNCTION 语句支持注册 Python 自定义函数,因此用户除了可以在 PyFlink Table API 作业中使用 Python 自定义函数之外,还可以在纯 SQL 作业中使用 Python 自定义函数。

CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON
CREATE TABLE source (
 a VARCHAR
) WITH (
 'connector' = 'datagen'
CREATE TABLE sink (
 a VARCHAR
) WITH (
 'connector' = 'print'
INSERT INTO sink
SELECT sub_string(a, 1, 3)
FROM source;
在 Java 作业中使用 Python 自定义函数

用户可以通过 DDL 的方式注册 Python 自定义函数,这意味着,用户也可以在 Java Table API 作业中使用 Python 自定义函数,比如:

TableEnvironment tEnv = TableEnvironment.create(
 EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON");
tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a"));
tEnv.executeSql("SELECT sub_string(a) FROM source").collect();

详细示例可以参见 PyFlink Playground [3]。

该功能的一个重要用处是将 Java 算子与 Python 算子混用。用户可以使用 Java 语言来开发绝大部分的作业逻辑,当作业逻辑中的某些部分必须使用 Python 语言来编写时,可以通过如上方式来调用使用 Python 语言编写的自定义函数。

如果是 DataStream 作业,可以先将 DataStream 转换成 Table,然后再通过上述方式,调用 Python 语言编写的自定义函数。

依赖管理

在 Python 自定义函数中访问第三方 Python 库是非常常见的需求,另外,在机器学习预测场景中,用户也可能需要在 Python 自定义函数中加载一个机器学习模型。当我们通过 local 模式执行 PyFlink 作业时,可以将第三方 Python 库安装在本地 Python 环境中,或者将机器学习模型下载到本地;然而当我们将 PyFlink 作业提交到远程执行的时候,这也可能会出现一些问题:

第三方 Python 库如何被 Python 自定义函数访问。不同的作业,对于 Python 库的版本要求是不一样的,将第三方 Python 库预安装到集群的 Python 环境中,只适用于安装一些公共的依赖,不能解决不同作业对于 Python 依赖个性化的需求;机器学习模型或者数据文件,如何分发到集群节点,并最终被 Python 自定义函数访问。

除此之外,依赖可能还包括 JAR 包等,PyFlink 中针对各种依赖提供了多种解决方案

依赖类型解决方案用途描述示例(flink run) flink run参数配置项API 作业入口文件-py / --python无无指定作业的入口文件,只能是.py文件-py file:///path/to/table_api_demo.py作业入口entry module-pym / --pyModule无无指定作业的entry module,功能和--python类似,可用于当作业的Python文件为zip包等情况,无法通过--python指定的时候,相比--python来说,更通用-pym table_api_demo-pyfs file:///path/to/table_api_demo.pyPython三方库文件-pyfs / --pyFilespython.filesadd_python_file指定一个到多个Python文件(.py/.zip/.whl等,逗号分割),这些Python文件在作业执行时,会放到Python进程的PYTHONPATH中,可以在Python自定义函数中直接访问-pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip存档文件-pyarch /--pyArchivespython.archivesadd_python_archive指定一个到多个存档文件(逗号分割),这些存档文件,在作业执行的时候,会被解压,并放到Python进程的工作目录,可以通过相对路径的方式进行访问-pyarchfile:///path/to/venv.zipPython解释器路径-pyexec / --pyExecutablepython.executableset_python_executable指定作业执行时,所使用的Python解释器路径-pyarchfile:///path/to/venv.zip-pyexec venv.zip/venv/bin/python3requirements文件-pyreq / --pyRequirementspython.requirementsset_python_requirements指定requirements文件,requirements文件中定义了作业的Python三方库依赖,作业执行时,会根据requirements的内容,通过pip安装相关依赖-pyreq requirements.txtJAR包无pipeline.classpaths,pipeline.jars没有专门的API,可以通过configuration的set_string方法设置指定作业依赖的JAR包,通常用于指定connector JAR包无

说明:

需要注意的是,Python UDF 的实现所在的文件,也需要在作业执行的时候,作为依赖文件上传;可以通过合用 “存档文件” 与 “ Python 解释器路径”,指定作业使用上传的 Python 虚拟环境来执行,比如:
table_env.add_python_archive("/path/to/py_env.zip")
# 指定使用py_env.zip包中带的python来执行用户自定义函数,必须通过相对路径来指定
table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
推荐用户使用 conda 来构建 Python 虚拟环境,conda 构建的 Python 虚拟环境包含了执行 Python 所需的绝大多数底层库,可以极大地避免当本地环境与集群环境不一样时,所构建的 Python 虚拟环境在集群执行时,缺少各种底层依赖库的问题。关于如何使用 conda 构建的 Python 虚拟环境,可以参考阿里云 VVP 文档中 “使用 Python 三方包” 章节的介绍 [4]

有些 Python 三方库需要安装才能使用,即并非 ”将其下载下来就可以直接放到 PYTHONPATH 中引用“,针对这种类型的 Python 三方库,有两种解决方案:

将其安装在 Python 虚拟环境之中,指定作业运行使用所构建的 Python 虚拟环境;找一台与集群环境相同的机器(或 docker),安装所需的 Python 三方库,然后将安装文件打包。该方式相对于 Python 虚拟环境来说,打包文件比较小。详情可以参考阿里云 VVP 文档中 “使用自定义的 Python 虚拟环境” 章节的介绍 [5]。调试

PyFlink 支持用户通过远程调试的方式,来调试 Python 自定义函数,具体方法可以参见文章 “如何从 0 到 1 开发 PyFlink API 作业” [6] 中 “远程调试” 章节的介绍。

另外,用户还可以在 Python 自定义函数中,通过 logging 的方式,打印日志。需要注意的是,日志输出需要在 TaskManager 的日志文件中查看,而不是当前 console。具体使用方式,请参见 “如何从 0 到 1 开发 PyFlink API 作业” [6] 中 “自定义日志” 章节的介绍。需要注意的是,当通过 local 方式运行作业的时候,TM 的日志位于 PyFlink 的安装目录,比如:

\ import pyflink

['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink']

调优

Python 自定义函数的性能在很大程度上取决于 Python 自定义函数自身的实现,如果遇到性能问题,您首先需要想办法尽可能优化 Python 自定义函数的实现。

除此之外,Python 自定义函数的性能也受以下参数取值的影响。

参数说明python.fn-execution.bundle.sizePython自定义函数的执行是异步的,在作业执行过程中,Java算子将数据异步发送给Python进程进行处理。Java算子在将数据发送给Python进程之前,会先将数据缓存起来,到达一定阈值之后,再发送给Python进程。python.fn-execution.bundle.size参数可用来控制可缓存的数据最大条数,默认值为100000。python.fn-execution.bundle.time用来控制数据的最大缓存时间。当缓存的数据条数到达python.fn-execution.bundle.size定义的阈值或缓存时间到达python.fn-execution.bundle.time定义的阈值时,会触发缓存数据的计算。默认值为1000,单位是毫秒。python.fn-execution.arrow.batch.size用来控制当使用Pandas UDF时,一个arrow batch可容纳的数据最大条数,默认值为10000。说明 python.fn-execution.arrow.batch.size参数值不能大于python.fn-execution.bundle.size参数值。

说明:

checkpoint 时,会触发缓存数据的计算,因此当上述参数配置的值过大时,可能会导致checkpoint 时需要处理过多的数据,从而导致 checkpoint 时间过长,甚至会导致 checkpoint 失败。当遇到作业的 checkpoint 时间比较长的问题时,可以尝试减少上述参数的取值。常见问题

1)Python 自定义函数的实际返回值类型与 result_type 中声明的类型不一致,该问题会导致 Java 算子在收到 Python 自定义函数的执行结果,进行反序列化时报错,错误堆栈类似:

Caused by: java.io.EOFException
 at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
 at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
 at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
 at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
 at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
 at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
 at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
 at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
 at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
 at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]

2)在 Python 自定义函数的 init 方法里实例化了一个不能被 cloudpickle 序列化的对象。

在提交作业的时候,PyFlink 会通过 cloudpickle 序列化 Python 自定义函数,若 Python 自定义函数包含不能被 cloudpickle 序列化的对象,则会遇到类似错误:TypeError: can't pickle xxx,可以将这种变量放在 open 方法里初始化。

3)在 Python 自定义函数的 init 方法里 load 一个非常大的数据文件。

由于在提交作业的时候,PyFlink 会通过 cloudpickle 序列化 Python 自定义函数,若在 init 方法里 load 一个非常大的数据文件,则整个数据文件都会被序列化并作为 Python 自定义函数实现的一部分,若数据文件非常大,可能会导致作业执行失败,可以将 load 数据文件的操作放在 open 方法里执行。

4)客户端 Python 环境与集群端 Python 环境不一致,比如 Python 版本不一致、PyFlink 版本不一致(大版本需要保持一致,比如都为 1.12.x)等。

总结

在这篇文章中,我们主要介绍了各种 Python 自定义函数的定义及使用方式,以及 Python 依赖管理、 Python 自定义函数调试及调优等方面的信息,希望可以帮助用户了解 Python 自定义函数。接下来,我们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深入了解 PyFlink 中各种功能、应用场景、最佳实践等。

另外,阿里云实时计算生态团队长期招聘优秀大数据人才(包括实习+社招),我们的工作包括:

实时机器学习:支持机器学习场景下实时特征工程和 AI 引擎配合,基于 Apache Flink 及其生态打造实时机器学习的标准,推动例如搜索、推荐、广告、风控等场景的全面实时化;大数据 + AI 一体化:包括编程语言一体化 (PyFlink 相关工作),执行引擎集成化 (TF on Flink),工作流及管理一体化(Flink AI Flow)。

如果你对开源、大数据或者 AI 感兴趣,请发简历到:fudian.fd@alibaba-inc.com

此外,也欢迎大家加入 “PyFlink交流群”,交流 PyFlink 相关的问题。

pyflink群.png

引用链接

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#aggregate-functions

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/table/udfs/python_udfs/#table-aggregate-functions

[3] https://github.com/pyflink/playgrounds#7-python-udf-used-in-java-table-api-jobs

[4] https://help.aliyun.com/document_detail/207351.html?spm=a2c4g.11186623.6.687.1fe76f50loCz96#title-09r-29j-9d7

[5] https://help.aliyun.com/document_detail/207351.html?spm=a2c4g.11186623.6.687.4b18419aCuhgmq#title-r01-50c-j82

[6] https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q

活动推荐一

image.png
报名链接:https://1712399719478.huodongxing.com/event/1594531547711

活动推荐二

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
99元试用实时计算 Flink 全托管版本(包年包月、10CU)即可得定制 Flink 独家定制T恤;另包3个月及以上还有85折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png


本文转自网络,原文链接:https://developer.aliyun.com/article/784004
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文

  • 周排行
  • 月排行
  • 总排行

随机推荐