本文参考了flink committer tison的文章,基于flink 1.13版本源码改动实现。
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/
Flink官方实现了大量的REST API接口,有用于Flink UI展示数据、也用于各自监控面板。这些REST API的webserver作为JobManager
的一部分在运行。默认端口是8081,可以通过flink-conf.yaml
的rest.port
参数进行配置。
在有多个JobManager
的情况下(HA场景下),每个JobManager
将运行自己的REST API实例,而由被选为leader的JobManager
实例提供有关已完成和正在运行的作业的信息。
REST API 位于flink-runtime
项目下,核心实现org.apache.flink.runtime.webmonitor.WebMonitorEndpoint
(因为Flink早期REST API都是用于监控,所以命名是WebMonitorEndpoint。现在其工作职能还包含一些任务启停等非监控场景),其主要是负责server实现和请求路由。
(主要:2个pierre package是笔者下面自定义REST API的地方)
当然Flink REST API实现是基于Netty
和Netty Router
,因为实现比较轻量,所以性能还是比较好的。
而完整的REST API则需要这四大模块:
向http链接 http://${jobmaster-host}:8081/pierre/foo
发起get请求,返回一个json串{"response":"bar"}
当我们要新增加一个REST API的时候,我们至少需要:
MessageHeaders
,作为新请求的接口ResponseBody
,作为返回结果的BodyAbstractRestHandler
,根据添加的MessageHeaders类处理请求org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()
MessageHeaders
package org.apache.flink.runtime.rest.messages.pierre; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; public class FooHeaders implements MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters> { // 单实例模式 private static final FooHeaders INSTANCE = new FooHeaders(); public static FooHeaders getInstance() { return INSTANCE; } @Override public Class<BarResponseBody> getResponseClass() { return BarResponseBody.class; } @Override public HttpResponseStatus HttpResponseStatus() { return HttpResponseStatus.OK; } @Override public String getDescription() { return "pierre foobar service"; } @Override public Class<EmptyRequestBody> getRequestClass() { return EmptyRequestBody.class; } // 解析url里面的参数 @Override public EmptyMessageParameters getUnresolvedMessageParameters() { return EmptyMessageParameters.getInstance(); } @Override public HttpMethodWrapper getHttpMethod() { return HttpMethodWrapper.GET; } // URL路由信息 @Override public String getTargetRestEndpointURL() { return "/pierre/foo"; } }
这里注意:
HttpResponseStatus
、getResponseClass
等均不能return null,否则会有NullPointerException
ResponseBody
package org.apache.flink.runtime.rest.messages.pierre; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; public class BarResponseBody implements ResponseBody { private static final String FIELD_BAR = "response"; @JsonProperty(FIELD_BAR) public final String response = "bar"; private static final BarResponseBody INSTANCE = new BarResponseBody(); public static BarResponseBody getInstance() { return INSTANCE; } }
这里使用到了 jackson注解,需要import FLINK shaded的版本,避免冲突。
package org.apache.flink.runtime.rest.handler.pierre; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.pierre.BarResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import javax.annotation.Nonnull; import java.util.Map; import java.util.concurrent.CompletableFuture; public class FooHandler extends AbstractRestHandler< RestfulGateway, EmptyRequestBody, BarResponseBody, EmptyMessageParameters> { public FooHandler( GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters> messageHeaders) { super(leaderRetriever, timeout, responseHeaders, messageHeaders); } @Override protected CompletableFuture<BarResponseBody> handleRequest( @Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { return CompletableFuture.completedFuture(BarResponseBody.getInstance()); } }
// 自己的handler final FooHandler fooHandler = new FooHandler(leaderRetriever, timeout, responseHeaders, FooHeaders.getInstance()); …… handlers.add(Tuple2.of(fooHandler.getMessageHeaders(), fooHandler));
初次改造Flink代码,不是特别熟悉,列了一下步骤供大家参考:
maven-checkstyle-plugin
的failOnViolation
设置为false
,因为我们的一些小改动不完全符合flink的代码工程规范。当然如果是要给Flink正式贡献代码,肯定还是要符合规范的。mvn spotless:apply
会自动进行代码格式化的工作mvn clean package -DskipTests
进入漫长的package中预计十分钟:flink-dist/target
目录下即可生成最新的可执行文件
./bin/start-cluster.sh
http://${jobmaster-host}:8081/pierre/foo
大功告成,完美第一步!
更多精彩:https://github.com/pierre94/flink-notes
基本介绍 给定 n 个权值作为 n 个叶子节点,构造一颗二叉树,若该树的带权路径长...
溢价 域名 的续费价格如何?通常来说,因为溢价域名的价值高于普通域名,所以溢...
TIOBE 公布了 2021 年 3 月的编程语言排行榜。 本月 TIOBE 指数没有什么有趣的变...
本文转载自微信公众号「bugstack虫洞栈」,作者小傅哥 。转载本文请联系bugstack...
本文转载自公众号读芯术(ID:AI_Discovery)。 这一刻你正在应对什么挑战?这位前...
在Python开发过程中,我们难免会遇到多重条件判断的情况的情况,此时除了用很多...
前言 统计科学家使用交互式的统计工具(比如R)来回答数据中的问题,获得全景的认...
想了解更多内容,请访问: 51CTO和华为官方战略合作共建的鸿蒙技术社区 https://...
背景 我们知道 如果在Kubernetes中支持GPU设备调度 需要做如下的工作 节点上安装...
近几年,互联网行业蓬勃发展,在互联网浪潮的冲击下,互联网创业已成为一种比较...