当前位置:主页 > 查看内容

如何从 0 到 1 开发 PyFlink API 作业

发布时间:2021-04-27 00:00| 位朋友查看

简介:if __name__ == '__main__': data_stream_api_demo() 作业提交 Flink 提供了多种作业部署方式,比如 local、standalone、YARN、K8s 等,PyFlink 也支持上述作业部署方式,请参考 Flink 官方文档 [3],了解更多详细信息。 local 说明:使用该方式执行作业时,……
if __name__ == '__main__': data_stream_api_demo()作业提交

Flink 提供了多种作业部署方式,比如 local、standalone、YARN、K8s 等,PyFlink 也支持上述作业部署方式,请参考 Flink 官方文档 [3],了解更多详细信息。

local

说明:使用该方式执行作业时,会启动一个 minicluster,作业会提交到minicluster 中执行,该方式适合作开发阶段。

示例:python3 table_api_demo.py

standalone

说明:使用该方式执行作业时,作业会提交到一个远端的 standalone 集群。

示例:

./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py

YARN Per-Job

说明:使用该方式执行作业时,作业会提交到一个远端的 YARN 集群。

示例:

./bin/flink run --target yarn-per-job --python table_api_demo.py

K8s application mode

说明:使用该方式执行作业时,作业会提交到 K8s 集群,以 application mode 的方式执行。

示例:

./bin/flink run-application \

--target kubernetes-application \
--parallelism 8 \
-Dkubernetes.cluster-id**=** ClusterId \
-Dtaskmanager.memory.process.size**=**4096m \
-Dkubernetes.taskmanager.cpu**=**2 \
-Dtaskmanager.numberOfTaskSlots**=**4 \
-Dkubernetes.container.image**=** PyFlinkImageName \

--pyModule table_api_demo \

--pyFiles file:///path/to/table_api_demo.py
参数说明

除了上面提到的参数之外,通过 flink run 提交的时候,还有其它一些和 PyFlink 作业相关的参数。

参数名用途描述示例-py / --python指定作业的入口文件-py file:///path/to/table_api_demo.py-pym / --pyModule指定作业的 entry module,功能和--python类似,可用于当作业的 Python 文件为 zip 包,无法通过--python 指定时,相比--python 来说,更通用-pym table_api_demo -pyfs file:///path/to/table_api_demo.py-pyfs / --pyFiles指定一个到多个 Python 文件(.py/.zip等,逗号分割),这些 Python 文件在作业执行的时候,会放到 Python 进程的 PYTHONPATH 中,可以在 Python 自定义函数中访问到-pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip-pyarch / --pyArchives指定一个到多个存档文件(逗号分割),这些存档文件,在作业执行的时候,会被解压之后,放到 Python 进程的 workspace 目录,可以通过相对路径的方式进行访问-pyarch file:///path/to/venv.zip-pyexec / --pyExecutable指定作业执行的时候,Python 进程的路径-pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3-pyreq / --pyRequirements指定 requirements 文件,requirements 文件中定义了作业的依赖-pyreq requirements.txt问题排查

当我们刚刚上手 PyFlink 作业开发的时候,难免会遇到各种各样的问题,学会如何排查问题是非常重要的。接下来,我们介绍一些常见的问题排查手段。

client 端异常输出

PyFlink 作业也遵循 Flink 作业的提交方式,作业首先会在 client 端编译成 JobGraph,然后提交到 Flink 集群执行。如果作业编译有问题,会导致在 client 端提交作业的时候就抛出异常,此时可以在 client 端看到类似这样的输出:

Traceback (most recent call last):
 File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in module 
 data_stream_api_demo()
 File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demo
 table_result = table.execute_insert("my_")
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insert
 return TableResult(self._j_table.executeInsert(table_path, overwrite))
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
 return_value = get_return_value(
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in deco
 raise java_exception
pyflink.util.exceptions.TableException: Sink `default_catalog`.`default_database`.`my_` does not exists
 at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)
 at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)
 at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)
 at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
 at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
 at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 1

比如上述报错说明作业中使用的名字为"my_"的表不存在。

TaskManager 日志文件

有些错误直到作业运行的过程中才会发生,比如脏数据或者 Python 自定义函数的实现问题等,针对这种错误,通常需要查看 TaskManager 的日志文件,比如以下错误反映用户在 Python 自定义函数中访问的 opencv 库不存在。

Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
 response = task()
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in lambda 
 lambda: self.create_worker().do_instruction(request), request)
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction
 return getattr(self, request_type)(
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
 bundle_processor.process_bundle(instruction_id))
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundle
 input_op_by_transform_id[element.transform_id].process_encoded(
 File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
 self.output(decoded_value)
 File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
 File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
 File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
 File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
 File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
 File "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_stream
 File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in split
 import cv2
ModuleNotFoundError: No module named 'cv2'
 at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
 at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
 at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
 at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
 at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
 at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
 at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
 at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
 at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 ... 1 more

说明:

local 模式下,TaskManager 的 log 位于 PyFlink 的安装目录下:site-packages/pyflink/log/,也可以通过如下命令找到:

\ import pyflink

\ print(pyflink.__path__)
['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],则log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目录下

自定义日志

有时候,异常日志的内容并不足以帮助我们定位问题,此时可以考虑在 Python 自定义函数中打印一些日志信息。PyFlink 支持用户在 Python 自定义函数中通过 logging 的方式输出 log,比如:

def split(s):
 import logging
 logging.info("s: " + str(s))
 splits = s[1].split("|")
 for sp in splits:
 yield s[0], sp

通过上述方式,split 函数的输入参数,会打印到 TaskManager 的日志文件中。

远程调试

PyFlink 作业,在运行过程中,会启动一个独立的 Python 进程执行 Python 自定义函数,所以如果需要调试 Python 自定义函数,需要通过远程调试的方式进行,可以参见[4],了解如何在 Pycharm 中进行 Python 远程调试。

1)在 Python 环境中安装 pydevd-pycharm:

pip install pydevd-pycharm~=203.7717.65

2)在 Python 自定义函数中设置远程调试参数:

def split(s):
 import pydevd_pycharm
 pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)
 splits = s[1].split("|")
 for sp in splits:
 yield s[0], sp

3)按照 Pycharm 中远程调试的步骤,进行操作即可,可以参见[4],也可以参考博客[5]中“代码调试”部分的介绍。

说明:Python 远程调试功能只在 Pycharm 的 professional 版才支持。

社区用户邮件列表

如果通过以上步骤之后,问题还未解决,也可以订阅 Flink 用户邮件列表 [6],将问题发送到 Flink 用户邮件列表。需要注意的是,将问题发送到邮件列表时,尽量将问题描述清楚,最好有可复现的代码及数据,可以参考一下这个邮件[7]。

钉钉群

此外,也欢迎大家加入“PyFlink交流群”,交流 PyFlink 相关的问题。

image.png

总结

在这篇文章中,我们主要介绍了 PyFlink API 作业的环境准备、作业开发、作业提交、问题排查等方面的信息,希望可以帮助用户使用 Python 语言快速构建一个 Flink 作业,希望对大家有所帮助。接下来,我们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深入了解 PyFlink 中各种功能、应用场景、最佳实践等。

为此我们推出一个调查问卷,希望大家积极参与这个问卷,帮助我们更好的去整理 PyFlink 相关学习资料。填完问卷后即可参与抽奖,Flink 定制款 Polo 衫送送送!4月30日中午12:00准时开奖哦 ~

image.png

引用链接

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/

[2] https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.0/flink-sql-connector-kafka_2.11-1.12.0.jar

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs

[4] https://www.jetbrains.com/help/pycharm/remote-debugging-with-product.html#remote-debug-config

[5] https://mp.weixin.qq.com/s?__biz=MzIzMDMwNTg3MA== mid=2247485386 idx=1 sn=da24e5200d72e0627717494c22d0372e chksm=e8b43eebdfc3b7fdbd10b49e6749cb761b7aa5f8ddc90b34eb3170119a8bbb3ddd7327acb712 scene=178 cur_album_id=1386152464113811456#rd

[6] https://flink.apache.org/community.html#mailing-lists

[7] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~

image.png

活动推荐:

仅需99元即可体验阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版!点击下方链接了解活动详情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506

image.png


本文转自网络,原文链接:https://developer.aliyun.com/article/783823
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文

  • 周排行
  • 月排行
  • 总排行

随机推荐