1. 前言
给大学生讲解SPARK时,说spark相比其它的大数据框架,其运行速度更快,是其显著的特点之一。之所以运行速度快,其原因之一因其使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎。SPARK提供了名为RDD(弹性分布式数据集(Resilient Distributed Dataset)的简称)抽象的数据集。DAG引擎用来保证RDD数据集之间依赖的有序性、可靠性。
不理解DAG具体为何物以及其底层原理,并不妨碍使用SPARK,使用者只需要调用其提供的API,用于分析处理不同领域的数据便可。但是,如果能理解DAG的底层结构,对理解和学习SPARK将会有质的提升。
2.DAG
2.1 ?基本概念
什么是DAG?
DAG是图结构中的一种,称为有向无环图。有向说明图中节点之间是有方向的,无环指图中没有环(回路),意味着从任一顶点出发都不可能回到顶点本身。如下图:
DAG往往用来描述事物之间的依赖关系或工作流中子流程之间的顺序,所以DAG中一定存在入度为0和出度为0的节点。入度为0的节点表示流程的开始,出度为0的节点表示流程的结束。根据工作流的特点,入度为0和出度为0的节点可能不只有一个。
如上图,可以理解为对于整个工作流而言,只有当编号为1的子流程完成后,才可以开始2号和3号子流程,当2号完成后,才能是4号,3号完成后才能5,4、5号完成后才能是6号。最终可以用线性结构描述出来。
这个过程称为DAG的线性化过程,也称为DAG的拓扑排序,这里的排序并不是指大小上的有序,而是指时间上的有序。因有可能子流程间不存在时间上的依赖性,如上图的2和3以及4和5节点,不存在相互的依赖,所以DAG的拓扑排序并不只有一种可能。如下图中的所有线性化都认为是合法。
一旦有了工作流的DAG结构图,在设计工作流进程时,则可以引入并行(并发)方案。如上图的2->4和3->5进程可以使用多线程或多进程方案,加快工作流的执行速度,这也是SPARk的DAG引擎能加快处理速度的底层原理。
因是描述工作流中子流程的顺序,显然整个工作流中不能出现环,环的出现,标志着循坏依赖。如下图,2号工作流依赖1号工作流的完成,4号依赖2号工作流的完成,从传递性上讲,4号也依赖1。从结构图中可以看得出1号又依赖4号 ,这便形成了一个引用循环链,从现实角度和实现角度都是违背常规认知和基本逻辑的。
Tips: 环意味着存在循环依赖,会导致系统死锁。
所以,在对DAG线性化之前,务必先要检查图中是否存在环。
2.2 环的检查
SPARk为了保证RDD的有序性,在进程初始时也需要检查其中是否存在环。下面讲解几种环的检查算法思想。
2.2.1 入度和出度
先检查图中节点之间的连通性,在一个连通分量上,如果边的数量大于或等于节点数,存在至少一个入度和一个出度的所有节点必然会构成一个环。下图左边的结构符合每一个节点都有一个入度和出度;右图中的1-2-4-6中的6号节点有2个入度,一个出度,其它节点都至少有一个入度和出度。如果一个节点只能有一个度,要么是入度,要么是出度。
连通性的检查可能使用并查集或者Floyd算法,或者直接使用DFS、BFS搜索算法。这里就不过多解释。入度和出度的检查也很简单,只需要构建图时记录一下节点的度数。
2.2.2 检查回边
所谓回边,指从一个节点出发,然后又能回到此节点的边。如下图,从1号节点开始搜索,经过如下图中的3->1和6->1又回到1号节点,称3->1和6->1为回边。
如果能证明回边的存在,则可以证明图结构中有环。回边的检查可以直接使用DFS搜索算法,其间有两个小技巧性。
搜索某一个节点时,检查节点的祖先节点是否和某一个子节点重合。如上图中,从1号节点(祖先节点)开始搜索,当搜索到6号节点时,发现1号子节点即是6号节点祖先节点又是子节点,显然6->1就是回边。
实现逻辑较简单,标记每一个访问过的节点,当从一个节点访问其子节点时,如果子节点已经被访问且不是直接父节点,可以断定回边的存在。
#include?
using?namespace?std;
//图
int?graph[100][100];
//是否访问过
int?vis[100];
//节点的父节点
int?parent[100];
int?INF=999;
//节点数、边数
int?n,m;
//初始化图,自己和自己的距离为0,和其它节点距离为?INF
void?init()?{
for(int?i=1;?i
for(int?j=1;?j
if(i==j)graph[i][j]=0;
else?graph[i][j]=INF;
}
vis[i]=0;
parent[i]=0;
}
}
//交互式得到节点之间关系
void?read()?{
int?f,t,w;
for(int?i=1;?i
cin>>f>>t>>w;
graph[f][t]=w;
}
}
/*
*有向无环图中找环
*?s:节点编号
*?f:父节点编号
*/
int??findCircle(int?s,int?f)?{
//标记为已经访问
vis[s]=true;
parent[s]=f;
//查找其子节点
for(int?i=1;?i
if(?graph[s][i]!=INF?&&?graph[s][i]!=0??)?{
if(?vis[i]==1?&&??i!=f?)?{?//找到回边
parent[i]=s;
return?i;
}
return?findCircle(?i,s?);
}
}
return?-1;
}
/*
*
*找出环上的所有点
*/
void?findCircle(int?s)?{
int?p=parent[s];
while(p!=s)?{
cout
p=parent[p];
}
}
int?main(int?argc,?char**?argv)?{
cin>>n>>m;
init();
read();
int?res=?findCircle(1,0);
findCircle(res);
cout
return?0;
}
//测试数据
6?6
1?2?1
6?1?1
2?4?1
4?6?1
3?5?1
5?6?1
测试结果:
另一个技巧就是为每一个节点设置一个开关变量,访问时(入栈)设置为true、访问结束(出栈)后设置为false。如果在还没有结束(出栈)时又重新访问到了此节点,可说明此节点有回边。
以下图为例。根据出栈入栈顺序做标记。
绿色虚线表示DFS时的递进线,递进时设置节点为访问状态(用 1 表示)。黄色虚线表示DFS时的回溯线,回溯时,设置节点访问结束状态(用 0 表示)。节点1的特殊在于会被两次标记为1。也就在第二次标记为1,表示它曾经被访问。
为什么要在回溯时设置节点为0,恢复原始状态。有可能出现如下图的情况。如果仅通过节点2是否被访问过确定此处有回边,是不正确的。
编码实现:
/*
*有向无环图中找环
*?s:节点编号
*/
int?findCircle(int?s,int?f)?{
if(vis[s])?{
parent[s]=f;
//如果进入栈时标记为?1,说明已经被访问过,有环存在
return?s;
}
//入栈时标记为已经访问
vis[s]=true;
parent[s]=f;
//查找其子节点
for(int?i=1;?i
if(?graph[s][i]!=INF?&&?graph[s][i]!=0??)?{
return?findCircle(?i,s?);
}
}
//出栈时恢复状态
vis[s]=false;
return?-1;
}
/*
*
*找出环上的所有点
*/
void?findCircle(int?s)?{
int?p=parent[s];
while(p!=s)?{
cout
p=parent[p];
}
}
int?main(int?argc,?char**?argv)?{
cin>>n>>m;
init();
read();
int?res=?findCircle(1,0);
findCircle(res);
cout
return?0;
}
是否有环检查后,便可进入拓扑排序过程。
2.3 拓扑排序
拓扑排序过程即为检查节点之间的依赖性的过程(通俗而言,就是谁依赖谁的问题)。
设计一个工作流时,往往会把整个工作流分解成几个子工作流,有些子工作流是可以同时进行的,有些子工作流需要等其它子工作流完毕后才能工作(一个子工作流的开始条件是另一个工作流的结束结果)。从多线程(进程)的角度而言,即存在并发时刻也存在互斥时刻。通过把子工作流建模成DAG结构,借助拓扑排序算法,能帮助建立稳定、健全、快速的工作流系统。
拓扑排序算法的两种实现。
广度搜索
遍历图结构,从入度为0的节点开始搜索,找到后删除与相邻节点之间的出度。重复这个过程,至到最后一个节点。如下图:
找到入度为0的节点1。入度为0的节点从工作流而言,表示不存在对其它任何子工作流的依赖,自然是要先执行的。遍历出来,并删除与其邻接的2号和3号节点相连接的边,表示2和3的所依赖的1号目标已经完成。
此时2号和3号节点入度变为0,均可以遍历出来。至于先遍历那一个,可以随机选择。也说明这两个节点表示的子工作流可以并行运行,同时删除与相邻节点的边。依次重复直到遍历出所有节点。
编码实现:
#include?
using?namespace?std;
//图
int?graph[100][100];
//是否访问过
int?vis[100];
//节点的父节点
int?parent[100];
int?INF=999;
//节点数、边数
int?n,m;
//栈,存储拓扑排序结果
queue?myq;
//计数器
int?count=0;
//初始化图,自己和自己的距离为0,和其它节点距离为?INF
void?init()?{
for(int?i=1;?i
for(int?j=1;?j
graph[i][j]=0;
}
vis[i]=0;
parent[i]=0;
}
}
//交互式得到节点之间关系
void?read()?{
int?f,t,w;
for(int?i=1;?i
cin>>f>>t>>w;
graph[f][t]=w;
}
}
//查找入度为?0?的节点且删除与之相邻的出边
int?findNode(int?i)?{
bool?is=true;
for(int?j=1;?j
if(?graph[j][i]!=0)?{
is=false;
break;
}
}
if(is)?{
for(int?j=1;?j
graph[i][j]=0;
}
}
return?is;
}
//找到入度为0?的节点压入队列
void?pushQueue()?{
for(int?i=1;?i
if(?findNode(i)?&&?!vis[i]?)?{
//找到,入队列
vis[i]=true;
myq.push(i);
}
}
}
/*
*拓扑排序
*/
void?tp()?{
//初始化入度为?0?的节点入队列
pushQueue();
while(?!myq.empty()?)?{
int?t=??myq.front();
count++;
cout
myq.pop();
pushQueue();
}
//如果出队列的节点数量和原节点数量不相同,说明有环
if(?count!=n?)cout
}
int?main(int?argc,?char**?argv)?{
cin>>n>>m;
init();
read();
tp();
return?0;
}
深度搜索
把DAG看成有向树,在后序遍历位置遍历节点,最后就能得到DAG的拓扑排序。如下图,表示对一棵二叉树后序遍历后的结果。
观察可知,把后序遍历的结果再逆输出,就能得到拓扑排序的结果1、3、7、9、8、6、2、5、4。
#include?
using?namespace?std;
//图
int?graph[100][100];
//是否访问过
int?vis[100];
//节点的父节点
int?parent[100];
int?INF=999;
//节点数、边数
int?n,m;
//是否有环
int?isCircle=0;
//栈,存储拓扑排序结果
stack?stk;
//自己和自己的距离为0
void?init()?{
for(int?i=1;?i
for(int?j=1;?j
if(i==j)graph[i][j]=0;
else?graph[i][j]=INF;
}
vis[i]=0;
parent[i]=0;
}
}
//交互式得到节点之间关系
void?read()?{
int?f,t,w;
for(int?i=1;?i
cin>>f>>t>>w;
graph[f][t]=w;
}
}
/*
*有向无环图中找环
*?s:节点编号
*/
void?findCircle(int?s,int?f)?{
if(vis[s])?{
parent[s]=f;
isCircle=1;
//如果进入栈时标记为?1,说明已经被访问过
return;
}
//入栈时标记为已经访问
vis[s]=true;
parent[s]=f;
//查找其子节点
for(int?i=1;?i
if(?graph[s][i]!=INF?&&?graph[s][i]!=0??)?{
findCircle(?i,s?);
}
}
//出栈时恢复状态
vis[s]=false;
//存储后序遍历结果
stk.push(s);
}
/*
*拓扑排序
*/
void?tp(int?s,int?f)?{
findCircle(s,f);
if(isCircle)?{
return;
}
cout
while(!stk.empty())?{
cout
stk.pop();
}
}
int?main(int?argc,?char**?argv)?{
cin>>n>>m;
init();
read();
tp(1,0);
return?0;
}
//测试数据
6?6
1?2?1
1?3?1
2?4?1
3?5?1
4?6?1
5?6?1
3. 总结
如果你不懂得DAG的底层结构以及拓扑排序算法相关知识,并不妨碍你去使用SPARK。如果你没有用过SPARk,也不会影响你学习DAG。但是如果你懂得了DAG,又学会使用了SPARK,对高级应用和低级算法之间的关系会有更高层面的感悟。有一天,SPARk会死,但底层结构和算法思想却会永存。
领取专属 10元无门槛券
私享最新 技术干货