在上面的例子中,我们定义了一个 MapFunction,该 MapFunction 中定义了一个名字为 “cnt_state” 的 ValueState,用于记录每一个 key 出现的次数。
说明:
除了 ValueState 之外,Python DataStream API 还支持 ListState、MapState、ReducingState,以及 AggregatingState;定义 state 的 StateDescriptor 时,需要声明 state 中所存储的数据的类型(TypeInformation)。另外需要注意的是,当前 TypeInformation 字段并未被使用,默认使用 pickle 进行序列化,因此建议将 TypeInformation 字段定义为 Types.PICKLED_BYTE_ARRAY() 类型,与实际所使用的序列化器相匹配。这样的话,当后续版本支持使用 TypeInformation 之后,可以保持后向兼容性;state 除了可以在 KeyedStream 的 map 操作中使用,还可以在其它操作中使用;除此之外,还可以在连接流中使用 state,比如:ds1 = ... # type DataStream ds2 = ... # type DataStream ds1.connect(ds2) \ .key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \ .map(MyCoMapFunction()) # 可以在MyCoMapFunction中使用state
可以使用 state 的 API 列表如下:
操作自定义函数KeyedStreammapMapFunctionflat_mapFlatMapFunction reduceReduceFunction filterFilterFunction processKeyedProcessFunction ConnectedStreamsmapCoMapFunctionflat_mapCoFlatMapFunction processKeyedCoProcessFunction WindowedStreamapplyWindowFunction processProcessWindowFunctionstate 工作原理上图是 PyFlink 中,state 工作原理的架构图。从图中我们可以看出,Python 自定义函数运行在 Python worker 进程中,而 state backend 运行在 JVM 进程中(由 Java 算子来管理)。当 Python 自定义函数需要访问 state 时,会通过远程调用的方式,访问 state backend。
我们知道,远程调用的开销是非常大的,为了提升 state 读写的性能,PyFlink 针对 state 读写做了以下几个方面的优化工作:
Lazy Read:对于包含多个 entry 的 state,比如 MapState,当遍历 state 时,state 数据并不会一次性全部读取到 Python worker 中,只有当真正需要访问时,才从 state backend 读取。
Async Write:当更新 state 时,更新后的 state,会先存储在 LRU cache 中,并不会同步地更新到远端的 state backend,这样做可以避免每次 state 更新操作都访问远端的 state backend;同时,针对同一个 key 的多次更新操作,可以合并执行,尽量避免无效的 state 更新。
LRU cache:在 Python worker 进程中维护了 state 读写的 cache。当读取某个 key 时,会先查看其是否已经被加载到读 cache 中;当更新某个 key 时,会先将其存放到写 cache 中。针对频繁读写的 key,LRU cache 可以避免每次读写操作,都访问远端的 state backend,对于有热点 key 的场景,可以极大提升 state 读写性能。
Flush on Checkpoint:为了保证 checkpoint 语义的正确性,当 Java 算子需要执行 checkpoint时,会将 Python worker中的写 cache 都 flush 回 state backend。
其中 LRU cache 可以细分为二级,如下图所示:
说明:
二级 cache 为 global cache,二级 cache 中的读 cache 中存储着当前 Python worker 进程中所有缓存的原始 state 数据(未反序列化);二级 cache 中的写 cache 中存储着当前 Python worker 进程中所有创建的 state 对象。一级 cache 位于每一个 state 对象内,在 state 对象中缓存着该 state 对象已经从远端的 state backend 读取的 state 数据以及待更新回远端的 state backend 的 state 数据。工作流程:
当在 Python UDF 中,创建一个 state 对象时,首先会查看当前 key 所对应的 state 对象是否已经存在(在二级 cache 中的 “Global Write Cache” 中查找),如果存在,则返回对应的 state 对象;如果不存在,则创建新的 state 对象,并存入 “Global Write Cache”;state 读取:当在 Python UDF 中,读取 state 对象时,如果待读取的 state 数据已经存在(一级 cache),比如对于 MapState,待读取的 map key/map value 已经存在,则直接返回对应的 map key/map value;否则,访问二级 cache,如果二级 cache 中也不存在待读取的 state 数据,则从远端的 state backend 读取;state 写入:当在 Python UDF 中,更新 state 对象时,先写到 state 对象内部的写 cache 中(一级 cache);当 state 对象中待写回 state backend 的 state 数据的大小超过指定阈值或者当遇到 checkpoint 时,将待写回的 state 数据写回远端的 state backend。state 性能调优通过前一节的介绍,我们知道 PyFlink 使用了多种优化手段,用于提升 state 读写的性能,这些优化行为可以通过以下参数配置:
配置说明python.state.cache-sizePython worker 中读 cache 以及写 cache 的大小。(二级 cache)需要注意的是:读 cache、写 cache是独立的,当前不支持分别配置读 cache 以及写 cache 的大小。python.map-state.iterate-response-batch-size当遍历 MapState 时,每次从 state backend 读取并返回给 Python worker 的 entry 的最大个数。python.map-state.read-cache-size一个 MapState 的读 cache 中最大允许的 entry 个数(一级 cache)。当一个 MapState 中,读 cache 中的 entry 个数超过该阈值时,会通过 LRU 策略从读 cache 中删除最近最少访问过的 entry。python.map-state.write-cache-size一个 MapState 的写 cache 中最大允许的待更新 entry 的个数(一级 cache)。当一个 MapState 中,写 cache 中待更新的 entry 的个数超过该阈值时,会将该 MapState 下所有待更新 state 数据写回远端的 state backend。需要注意的是,state 读写的性能不仅取决于以上参数,还受其它因素的影响,比如:
输入数据中 key 的分布:输入数据的 key 越分散,读 cache 命中的概率越低,则性能越差。
Python UDF 中 state 读写次数:state 读写可能涉及到读写远端的 state backend,应该尽量优化 Python UDF 的实现,减少不必要的 state 读写。
checkpoint interval:为了保证 checkpoint 语义的正确性,当遇到 checkpoint 时,Python worker 会将所有缓存的待更新 state 数据,写回 state backend。如果配置的 checkpoint interval 过小,则可能并不能有效减少 Python worker 写回 state backend 的数据量。
bundle size / bundle time:当前 Python 算子会将输入数据划分成多个批次,发送给 Python worker 执行。当一个批次的数据处理完之后,会强制将 Python worker 进程中的待更新 state 写回 state backend。与 checkpoint interval 类似,该行为也可能会影响 state 写性能。批次的大小可以通过 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 参数控制。
三、timer 功能介绍timer 使用示例除了 state 之外,用户还可以在 Python DataStream API 中使用定时器 timer。
import datetime from pyflink.common import Row, WatermarkStrategy from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import TimestampAssigner from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.datastream.state import ValueStateDescriptor from pyflink.table import StreamTableEnvironment
本文转自网络,原文链接:https://developer.aliyun.com/article/784551
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!
据国际数据公司(IDC)近日发布的《中国云运营服务市场(2020上半年)跟踪》报告显示...
阿里云与西奥电梯联合共同打造西奥可信电梯物联网平台,通过工业互联网的规则引...
作者:朱永生 什么是企业搜索 企业搜索,顾名思义,就是企业使用的搜索服务或者...
private Map Character, Set Character constructGraph(String[] words) { Map C...
11月30日,由工业和信息化部、北京市人民政府共同主办的2020年中国网络安全产业...
ssd 云服务器 是什么?就是存储模式选择为ssd超高速 云盘 的 云服务器 。ssd超高...
1.爱上一个人跟拉屎一样简单,忘记一个人跟吃屎一样难。 2.大姨妈是吐血鬼,卫...
作者 阿里云技术运营望宸 技术实践的门槛不仅在于应用上线后各类问题的排查难度 ...
今天,猿妹要和大家介绍Python程序员在2021年最不应该错过的优秀VS Code扩展: 1...
办网站必须要备案吗?是的,使用中国大陆境内的服务器开办网站,必须先办理 网站...