Dataphin版本 2.9.2及以上
需开通OpenAPI模块 Dataphin-OpenAPI 运维
Dataphin平台提供了补数据功能 当需要补数据时 用户可手动对某个节点及其下游的节点的特定业务日期的补数据。在实际场景中 有些上游的数据到达的时间晚于预期时间 比如门店的数据延迟几天或者一个月的时间才收集上报 或者上游的数据错误需进行更正时 就需要进行补数据操作。此类操作重复度高 且由于补数据的时间较长带来较大的运维成本。通过Open API的运维模块就可以根据特定的业务场景开发适用于特定业务场景的个性化的补数据或运维工具。
以下的例子是创建批量补数据工具的API调用的基本步骤
根据需要补数据的节点的特点查询节点 ListNode选择需要补数据的节点的下游节点 可利用QueryDagFromPhysicalNode查询下游节点选择需要补数据的节点及其下游节点和需要补数据的业务日期 创建补数据工作流 CreatePhysicalNodeSupplement查询补数据流下的每个业务日期下对应的DagRun的运行状态 ListSupplementDagrun查询补数据工作流下的业务日期的补数据实例并获取实例状态 ListSupplementInstance请注意 批量补数据时 若不想影响线上正常的周期调度任务 您需要控制补数据实例的生成和运行的频次或避开高峰期以控制系统压力。
以下为样例代码
/* import 省略... */ * 搜索节点 对节点及下游补数据 并查看补数据实例 搜索时假设不确定是哪一个节点 * 即不指定node_id,对返回的结果取第一个节点补数据 为方便示例 假设该节点存在下游 * date 2021-05-17 17:22 public class SupplementTest { private static final String ENV PROD ; private static final Long testProjectId 1022928L; // 实际使用请通过AK/endpoint自行创建API client private DataphinAcsClient client LocalDataphinAcsClient.getTestEnvClient(); public void supplementTest() throws ClientException { //Step1: 根据需要补数据的节点的特点查询节点 NodeOverview nodeOverview getOneNode(); //Step2: 获取需要补数据的下游节点 此处假设获取下两层的下游节点 int downStreamDepth, List String downstreamNodes getDownNode(nodeOverview.getNodeId(), downStreamDepth); //Step3 选择需要补数据的节点及其下游节点和需要补数据的业务日期 创建补数据工作流 String flowId createSupplementFlow(nodeOverview, downstreamNodes, 2021-05-11 , 2021-05-15 ); //Step4: 查询补数据流下的每个业务日期下对应的DagRun的运行状态 List ScheduleDagrun dagRunList listSupplementDagrun(flowId); //Step5: 查询补数据工作流下的业务日期的补数据实例并获取实例状态 //查看 2021-05-11 的 补数据实例 List Instance instances Lists.newArrayList(); dagRunList.stream().filter(d - d.getBizDate().equals( 2021-05-11 )) .findFirst() .ifPresent(d - instances.addAll(listInstanceByDagRun(d.getDagrunId()))); // 查看实例的状态 instances.forEach(ins - System.out.println(ins.getStatus())); * 列出 dagRun下面的实例 * param dagRunId dagRunId * return dagRun下面的实例列表 private List Instance listInstanceByDagRun(String dagRunId) { ListSupplementInstanceRequest request new ListSupplementInstanceRequest(); request.setDagRunId(dagRunId); request.setEnv(ENV); try { ListSupplementInstanceResponse response client.getAcsResponse(request); if(null response.getData() || response.getData().size() 0) { throw new RuntimeException( not found any instance by dagrun id dagRunId); return response.getData(); } catch (ClientException e) { throw new RuntimeException(e.getMessage()); * 列出补数据的 dagrun 当前例子中 正常应该有5个 * param flowId 补数据的工作流的ID * return dagrun 列表 private List ScheduleDagrun listSupplementDagrun(String flowId) throws ClientException { ListSupplementDagrunRequest request new ListSupplementDagrunRequest(); request.setEnv(ENV); request.setFlowId(flowId); ListSupplementDagrunResponse response client.getAcsResponse(request); if(response.getData() null || response.getData().size() 0) { throw new RuntimeException( not found any dagrun by flow id flowId); return response.getData(); * 创建补数据工作流 * param nodeOverview 补数据起始节点 * param downStreamDepth 补数据的层级深度 * return 补数据的工作流ID private String createSupplementFlow(NodeOverview nodeOverview, List String downstreamNodes, String minPartition, String maxPartition) throws ClientException { CreatePhysicalNodeSupplementRequest createNodeSupplementRequest new CreatePhysicalNodeSupplementRequest(); NodeSupplementCommand nodeSupplementCommand new NodeSupplementCommand(); nodeSupplementCommand.setName( open_api_test_20210508_ System.currentTimeMillis()); nodeSupplementCommand.setProjectId(Long.toString(testProjectId)); nodeSupplementCommand.setMinPartition(minPartition); nodeSupplementCommand.setMaxPartition(maxPartition); nodeSupplementCommand.setParallelism(1); nodeSupplementCommand.setSupplementNodeId(nodeOverview.getNodeId()); //包含全部下游 或者这里可以选择部分下游 nodeSupplementCommand.setIncludedNodeIdList(downstreamNodes); createNodeSupplementRequest.setNodeSupplementCommand(nodeSupplementCommand); createNodeSupplementRequest.setEnv(ENV); CreatePhysicalNodeSupplementResponse createNodeSupplementResponse client.getAcsResponse( createNodeSupplementRequest); return createNodeSupplementResponse.getFlowId(); * 查询节点的下游 * param startNodeId 起始节点 * param downStreamDepth 下游层级深度 * return 下游节点的ID列表 private List String getDownNode(String startNodeId, int downStreamDepth) throws ClientException { QueryDagFromPhysicalNodeRequest queryNodeDagDownStreamRequest new QueryDagFromPhysicalNodeRequest(); NodeDagQueryCommand nodeDagQueryDownStreamCommand new NodeDagQueryCommand(); nodeDagQueryDownStreamCommand.setStartNodeId(startNodeId); //只搜索向下两层的节点 nodeDagQueryDownStreamCommand.setDownStreamDepth(downStreamDepth); nodeDagQueryDownStreamCommand.setUpStreamDepth(0); queryNodeDagDownStreamRequest.setNodeDagQueryCommand(nodeDagQueryDownStreamCommand); queryNodeDagDownStreamRequest.setEnv(ENV); QueryDagFromPhysicalNodeResponse queryNodeDagDownStreamResponse client.getAcsResponse(queryNodeDagDownStreamRequest); NodeDagInfo nodeDagDownStreamInfo queryNodeDagDownStreamResponse.getNodeDagInfo(); List LogicalNodeInfo downNodes nodeDagDownStreamInfo.getNodes(); if(null downNodes) { return Lists.newArrayListWithCapacity(0); return downNodes.stream().map(n - n.getBasicInfo().getNodeId().getId()).collect(Collectors.toList()); * 列出节点列表 private NodeOverview getOneNode() throws ClientException { ListNodesRequest.PageParam pageParam new PageParam(); pageParam.setPageNum(1); pageParam.setPageSize(20); ListNodesRequest listNodesRequest new ListNodesRequest(); ListNodesRequest.NodeQueryCommand nodeQueryCommand new NodeQueryCommand(); nodeQueryCommand.setNodeBizType( SCRIPT ); nodeQueryCommand.setNodeScheduleType( NORMAL ); listNodesRequest.setNodeQueryCommand(nodeQueryCommand); listNodesRequest.setPageParam(pageParam); listNodesRequest.setProjectId(testProjectId); listNodesRequest.setEnv(ENV); ListNodesResponse listNodesResponse client.getAcsResponse(listNodesRequest); PagedData overviewPagedData listNodesResponse.getPagedNodes(); List NodeOverview nodeOverviewList overviewPagedData.getData(); if(null nodeOverviewList || nodeOverviewList.size() 0) { throw new RuntimeException( not found any node ); return nodeOverviewList.get(0); }
点击这里查看Dataphin OpenAPI概览。
【51CTO.com快译】 数据可视化工具不断发展,提供更强大的功能,同时改善可访问...
Docker生成新镜像版本的两种方式 There are two ways Docker can generate new m...
信息化2.0时代提出开展智慧教育创新发展行动。2019年2月,中共中央、国务院印发...
从 10.0.0 版开始,异步迭代器就出现在 Node 中了,在本文中,我们将讨论异步迭...
在Python语言中有如下3种方法: 成员方法 类方法(classmethod) 静态方法(staticm...
摘要 元旦期间 订单业务线 告知 推送系统 无法正常收发消息,作为推送系统维护者...
2021年3月24日,主题为《数据的世界,世界的数据》的星环科技2021春季新品发布会...
前提条件 请您在购买前确保已完成注册和充值。详细操作请参见 如何注册公有云管...
本文整理自直播《Hologres 数据导入/导出实践-王华峰(继儒)》 视频链接: https:/...
建站 什么 虚拟主机 够用?这要看搭建的是什么类型的网站。比如个人博客类型的网...