当前位置:主页 > 查看内容

如何通过API构建自动补数据工具

发布时间:2021-07-17 00:00| 位朋友查看

简介:Dataphin版本 2.9.2及以上 需开通OpenAPI模块 Dataphin-OpenAPI 运维 Dataphin平台提供了补数据功能 当需要补数据时 用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中 有些上游的数据到达的时间晚于预期时间 比如门店的数据延迟几天……

Dataphin版本 2.9.2及以上

需开通OpenAPI模块 Dataphin-OpenAPI 运维


Dataphin平台提供了补数据功能 当需要补数据时 用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中 有些上游的数据到达的时间晚于预期时间 比如门店的数据延迟几天或者一个月的时间才收集上报 或者上游的数据错误需进行更正时 就需要进行补数据操作。此类操作重复度高 且由于补数据的时间较长带来较大的运维成本。通过Open API的运维模块就可以根据特定的业务场景开发适用于特定业务场景的个性化的补数据或运维工具。


以下的例子是创建批量补数据工具的API调用的基本步骤

根据需要补数据的节点的特点查询节点 ListNode选择需要补数据的节点的下游节点 可利用QueryDagFromPhysicalNode查询下游节点选择需要补数据的节点及其下游节点和需要补数据的业务日期 创建补数据工作流 CreatePhysicalNodeSupplement查询补数据流下的每个业务日期下对应的DagRun的运行状态 ListSupplementDagrun查询补数据工作流下的业务日期的补数据实例并获取实例状态 ListSupplementInstance


请注意 批量补数据时 若不想影响线上正常的周期调度任务 您需要控制补数据实例的生成和运行的频次或避开高峰期以控制系统压力。

以下为样例代码

/* import 省略... */
 * 搜索节点 对节点及下游补数据 并查看补数据实例 搜索时假设不确定是哪一个节点 
 * 即不指定node_id,对返回的结果取第一个节点补数据 为方便示例 假设该节点存在下游 
 * date 2021-05-17 17:22
public class SupplementTest {
 private static final String ENV PROD ;
 private static final Long testProjectId 1022928L;
 // 实际使用请通过AK/endpoint自行创建API client
 private DataphinAcsClient client LocalDataphinAcsClient.getTestEnvClient();
 public void supplementTest() throws ClientException {
 //Step1: 根据需要补数据的节点的特点查询节点
 NodeOverview nodeOverview getOneNode();
 //Step2: 获取需要补数据的下游节点 此处假设获取下两层的下游节点
 int downStreamDepth,
 List String downstreamNodes getDownNode(nodeOverview.getNodeId(), downStreamDepth);
 //Step3 选择需要补数据的节点及其下游节点和需要补数据的业务日期 创建补数据工作流
 String flowId createSupplementFlow(nodeOverview, downstreamNodes, 2021-05-11 , 2021-05-15 );
 //Step4: 查询补数据流下的每个业务日期下对应的DagRun的运行状态
 List ScheduleDagrun dagRunList listSupplementDagrun(flowId);
 //Step5: 查询补数据工作流下的业务日期的补数据实例并获取实例状态
 //查看 2021-05-11 的 补数据实例
 List Instance instances Lists.newArrayList();
 dagRunList.stream().filter(d - d.getBizDate().equals( 2021-05-11 ))
 .findFirst()
 .ifPresent(d - instances.addAll(listInstanceByDagRun(d.getDagrunId())));
 // 查看实例的状态
 instances.forEach(ins - System.out.println(ins.getStatus()));
 * 列出 dagRun下面的实例
 * param dagRunId dagRunId
 * return dagRun下面的实例列表
 private List Instance listInstanceByDagRun(String dagRunId) {
 ListSupplementInstanceRequest request new ListSupplementInstanceRequest();
 request.setDagRunId(dagRunId);
 request.setEnv(ENV);
 try {
 ListSupplementInstanceResponse response client.getAcsResponse(request);
 if(null response.getData() || response.getData().size() 0) {
 throw new RuntimeException( not found any instance by dagrun id dagRunId);
 return response.getData();
 } catch (ClientException e) {
 throw new RuntimeException(e.getMessage());
 * 列出补数据的 dagrun 当前例子中 正常应该有5个
 * param flowId 补数据的工作流的ID
 * return dagrun 列表
 private List ScheduleDagrun listSupplementDagrun(String flowId) throws ClientException {
 ListSupplementDagrunRequest request new ListSupplementDagrunRequest();
 request.setEnv(ENV);
 request.setFlowId(flowId);
 ListSupplementDagrunResponse response client.getAcsResponse(request);
 if(response.getData() null || response.getData().size() 0) {
 throw new RuntimeException( not found any dagrun by flow id flowId);
 return response.getData();
 * 创建补数据工作流
 * param nodeOverview 补数据起始节点
 * param downStreamDepth 补数据的层级深度
 * return 补数据的工作流ID
 private String createSupplementFlow(NodeOverview nodeOverview, List String downstreamNodes, String minPartition, String maxPartition) throws ClientException {
 CreatePhysicalNodeSupplementRequest createNodeSupplementRequest new CreatePhysicalNodeSupplementRequest();
 NodeSupplementCommand nodeSupplementCommand new NodeSupplementCommand();
 nodeSupplementCommand.setName( open_api_test_20210508_ System.currentTimeMillis());
 nodeSupplementCommand.setProjectId(Long.toString(testProjectId));
 nodeSupplementCommand.setMinPartition(minPartition);
 nodeSupplementCommand.setMaxPartition(maxPartition);
 nodeSupplementCommand.setParallelism(1);
 nodeSupplementCommand.setSupplementNodeId(nodeOverview.getNodeId());
 //包含全部下游 或者这里可以选择部分下游
 nodeSupplementCommand.setIncludedNodeIdList(downstreamNodes);
 createNodeSupplementRequest.setNodeSupplementCommand(nodeSupplementCommand);
 createNodeSupplementRequest.setEnv(ENV);
 CreatePhysicalNodeSupplementResponse createNodeSupplementResponse client.getAcsResponse(
 createNodeSupplementRequest);
 return createNodeSupplementResponse.getFlowId();
 * 查询节点的下游
 * param startNodeId 起始节点
 * param downStreamDepth 下游层级深度
 * return 下游节点的ID列表
 private List String getDownNode(String startNodeId, int downStreamDepth) throws ClientException {
 QueryDagFromPhysicalNodeRequest queryNodeDagDownStreamRequest new QueryDagFromPhysicalNodeRequest();
 NodeDagQueryCommand nodeDagQueryDownStreamCommand new NodeDagQueryCommand();
 nodeDagQueryDownStreamCommand.setStartNodeId(startNodeId);
 //只搜索向下两层的节点
 nodeDagQueryDownStreamCommand.setDownStreamDepth(downStreamDepth);
 nodeDagQueryDownStreamCommand.setUpStreamDepth(0);
 queryNodeDagDownStreamRequest.setNodeDagQueryCommand(nodeDagQueryDownStreamCommand);
 queryNodeDagDownStreamRequest.setEnv(ENV);
 QueryDagFromPhysicalNodeResponse queryNodeDagDownStreamResponse 
 client.getAcsResponse(queryNodeDagDownStreamRequest);
 NodeDagInfo nodeDagDownStreamInfo queryNodeDagDownStreamResponse.getNodeDagInfo();
 List LogicalNodeInfo downNodes nodeDagDownStreamInfo.getNodes();
 if(null downNodes) {
 return Lists.newArrayListWithCapacity(0);
 return downNodes.stream().map(n - n.getBasicInfo().getNodeId().getId()).collect(Collectors.toList());
 * 列出节点列表
 private NodeOverview getOneNode() throws ClientException {
 ListNodesRequest.PageParam pageParam new PageParam();
 pageParam.setPageNum(1);
 pageParam.setPageSize(20);
 ListNodesRequest listNodesRequest new ListNodesRequest();
 ListNodesRequest.NodeQueryCommand nodeQueryCommand new NodeQueryCommand();
 nodeQueryCommand.setNodeBizType( SCRIPT );
 nodeQueryCommand.setNodeScheduleType( NORMAL );
 listNodesRequest.setNodeQueryCommand(nodeQueryCommand);
 listNodesRequest.setPageParam(pageParam);
 listNodesRequest.setProjectId(testProjectId);
 listNodesRequest.setEnv(ENV);
 ListNodesResponse listNodesResponse client.getAcsResponse(listNodesRequest);
 PagedData overviewPagedData listNodesResponse.getPagedNodes();
 List NodeOverview nodeOverviewList overviewPagedData.getData();
 if(null nodeOverviewList || nodeOverviewList.size() 0) {
 throw new RuntimeException( not found any node );
 return nodeOverviewList.get(0);
}


点击这里查看Dataphin OpenAPI概览。


本文转自网络,原文链接:https://developer.aliyun.com/article/785348
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!

推荐图文

  • 周排行
  • 月排行
  • 总排行

随机推荐