前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Storm】Storm之what

【Storm】Storm之what

作者头像
章鱼carl
发布2022-03-31 10:55:37
6900
发布2022-03-31 10:55:37
举报
文章被收录于专栏:章鱼carl的专栏章鱼carl的专栏

定义

基于:消息推模式(驱动方式)、分布式(物理结构)、(逻辑结构)、实时(性能特点)的计算引擎(本质属性)。

本质:消息队列 + 分布式进程。

抽象

(1) Nimbus:任务管理、监视。

(2) Supervisor:启动/关闭工作进程Worker,并监听任务。

(3) Topology:封装一个业务逻辑。是更高层次的抽象,节点表示一个Spout或Bolt,边表示Bolt订阅了哪些流(Stream)。(想起了布尔逻辑的或与非,类似CPU计算逻辑单元,能够从与或非逻辑角度刻画现实的大部分问题,微观内核逻辑会对宏观表象逻辑产生深远影响)

(4) Stream:消息元组流,是一个没有边界的tuple序列。流(Stream)可以理解为消息的渠道,每种类型的消息可以用一个流来表示。

(5) Tuple:消息元组,Topology处理的最小消息单位是Tuple(元组),它是一个Object的数组。数组中的每个value对象都有一个field,并且该value是可序列化的。

(6) Spout:是高频数据流的源头,负责发出原始Tuple。

(7) Bolt:可以随意订阅某个Spout或Bolt发出的Tuple,只要将这个流导向该Bolt。Spout和Bolt都统称为component(组件)。

(8) Worker:一个Worker就是一个JVM进程。Worker之间通过Netty传送数据,原来为ZMQ。

(9) Executor:一个Executor就是一个线程。Executor只对应单独的某个Spout或Bolt。它会生成若干该Spout或Bolt的实例,被称为Task。

(10) Task:每一个Spout和Bolt都会被当做很多task在整个集群里面运行,每一个task就是一个Spout或Bolt的实例,且对应到一个Executor线程。Topology的Task数量是固定的(由程序一开始的并行度设置决定的),但是可以动态调整线程数量,然后通过负载均衡,多的Task就分配到了空闲线程上。并且通过负载均衡,Storm尽可能的将任务平均分配到进程、线程中去。

(11) Stream groupings:消息分发策略,定义一个Stream应该如何分配给Bolt们。

(12) _acker:每个worker进程都会有一个_acker线程。

再次奉上此图:

Spark

Storm

节点

worker

supervisor、nimbus

进程

executor

worker

线程

core

executor

任务

task(对应spout/bolt实例)

task(对应spout/bolt实例)

结构


组件结构

(1) 集群组件分布结构

(2) 节点内部组件关系

Nimbus(发布序列化任务实例) - Supervisor(启动Worker) - Worker(特定Topology进程) - Executor(特定Task线程) - Task(Spout/Bolt实例)

不会出现一个worker进程为多个topology服务。因此,一个运行中的topology就是由集群中若干台物理机上的多个worker进程组成的。

executor是worker进程启动的一个单独线程。每个executor只会运行1个topology的某个特定的spout或bolt的若干个实例,称作task。注意,storm默认是1个spout或bolt只能生成1个task,executor线程会在每次循环里顺序调用所有它包含的task实例。

task最终运行的是spout或bolt中代码的执行单元,一个task即为spout或bolt的一个实例,executor线程在执行期间会调用该task的nextTuple或excute方法。topology启动后,一个spout或bolt的task数目是不变的,但该spout或bolt使用的executor线程数是可以动态调整的。

默认情况下,一个supervisor节点最多可以启动4个worker进程,每一个topology默认占用1个worker进程,每个spout或者bolt任务会占用1个executor线程,每个executor启动1个task。

并行度

根据业务调整并行度

图中是一个包含有两个worker进程的拓扑。其中,蓝色的 BlueSpout 有两个executor,每个 executor 中有一个 task,并行度为 2;绿色的 GreenBolt 有两个 executor,每个 executor 有两个 task,并行度也为2;而黄色的YellowBolt 有 6 个 executor,每个 executor 中有一个 task,并行度为 6,因此,这个拓扑的总并行度就是 2 + 2 + 6 = 10。具体分配到每个 worker 就有 10 / 2 = 5个executor。

Storm可以随时增加或者减少worker或者executor的数量,而不需要重启集群或者拓扑。具体方式有:CLI、Storm UI,修改后会注销掉topology,并rebalance所有任务。

不建议为每个拓扑在每台机器上分配超过一个worker。而应该改为在一台机器上分配多个线程,而不是在一台分配多个进程来提高并行度。

深刻理解Storm的线程进程

在一个worker中的线程是运行在这个worker的JVM上的,所以生成的静态变量、class对象等是同一个。

例如,在Topology中定义一个静态变量,初始化一个对象。那么,在不同worker中打印这个对象的hashCode,是不同的;但是,在同一个worker中的executor打印这个对象的hashCode是相同的。这就要求我们慎用状态,因为Storm本来就是无状态编程范式,即使使用也要考虑清楚,是否需要worker级别的全局唯一,是加在组件的初始化方法里还是prepare、open里。

消息分发

(1) Shuffle Grouping:随机分组。随机派发stream里的tuple,保证bolt中的每个任务接收到的tuple数目基本均衡(能较好的实现负载均衡)

(2) Fields Grouping:按字段分组。比如按userid来分组,具有同样userid的tuple会被分到同一个任务,而不同userid的tuple会被分到不同的任务。

(3) All Grouping:广播发送。对于每一个tuple,bolt中的所有任务都会收到

(4) Global Grouping:全局分组。这个tuple被分配到storm中的一个bolt的其中一个task,在具体一点就是分配给id值最低的那个task,收集全部bolt的中间计算结果,最后进行聚合时用

两个逻辑

(1)

supervisor(Host)à

Worker(Process)à

Executor(Thread)à

Task(Spout/Bolt实例)

(2)

Stream à Tuple à List<field,value>

流程

总体流程

ZK集群内部有自己的通信机制,Storm借助其通讯机制,例如,任务下发等。在执行一个任务的时候,storm会把任务及相关执行的代码经过序列化之后发送到ZK节点供supervisor去下载,然后才会各自执行自己部分的代码或者任务。每个ZK节点收到的任务是一样的,而supervisor只需要下载属于自己的任务即可。

关于spout/bolt的生命周期,一般来说spout/bolt的生命周期如下:

(1) 在提交了一个topology之后(在nimbus所在的机器),创建spout/bolt实例并进行序列化;

(2) 将序列化的component发送给所有的任务所在的机器;

(3) 在每一个任务上反序列化component;

(4) 在开始执行任务之前,先执行component的初始化方法(spout是open,bolt是prepare);

(5) 因此component的初始化操作应该在prepare/open方法中进行,而不是在实例化component的时候进行。

元数据存储结构

用ZooKeeper来存储组件之间共享的元数据,这些模块在重启之后,可以通过对应的元数据进行恢复。因此Storm的模块是无状态的,这是保证其可靠性及伸缩性的基础。

树中的每一个节点代表ZooKeeper中的一个节点(znode),每一个叶子节点是Storm真正存储数据的地方。从根节点到叶子节点的全路径代表了该数据在ZooKeeper中的存储路径,该路径可被用来写入或获取数据。Storm zookeeper目录树含义:

(1) /storm/workerbeats/<topology-id>/node-port:(小组项目汇报书、工人工作汇报书)

它存储由node-port指定的Worker的运行状态和一些统计信息,主要包括Worker上所有Executor的统计信息(如发送/接收的消息数)、Worker的启动时间以及最后一次更新这些信息的时间。它的内容在运行过程中不断更新。

(2) /storm/storms/<topology-id>:(项目计划书)

存储Topology本身的信息,包括名字、启动时间、运行状态、要使用的Worker数目以及每个组件的并行度设置。它的内容在运行过程中不变。

(3) /storm/assignments/<topology-id>:(项目任务分配书)

存储Nimbus为每个Topology分配的任务信息,包括该Topology在Nimbus机器本地的存储目录、被分配到的Supervisor机器到主机名的映射关系、每个Executor运行在哪个Worker上以及每个Executor的启动时间。该节点的数据在运行过程中会被更新。

(4) /storm/supervisors/<supervisor-id>:(部门的人员架构图)

它存储Supervisor机器本身的运行统计信息,主要包括最近一次更新时间、主机名、supervisor-id、已经使用的端口列表、所有的端口列表以及运行时间。该节点的数据在运行过程中也会被更新。

(5) /storm/errors/<topology-id>/<component-id>/e<sequential-id>:

它存储运行过程中每个组件上发生的错误信息。<sequential-id>是一个递增的序列号,每一个组件最多只会保留最近的10条错误信息。它的内容在运行过程中是不变的(但是有可能被删除)。

算法流程

1. Nimbus

箭头1表示由Nimbus创建的路径:

(1) /storm/workerbeats/<topology-id>

(2) /storm/storms/<topology-id>

(3) /storm/assigments/<topology-id>

其中对于路径a,Nimbus只会创建路径,不会设置数据,数据是由Worker设置的。对于路径b和c,Nimbus在创建它们的时候就会设置数据。a和b只有在提交新Topology的时候才会创建,且b中的数据设置好后就不再变化,c则在第一次为该Topology进行任务分配的时候创建,若任务分配计划有变,Nimbus就会更新它的内容。

箭头2表示Nimbus需要获取数据的路径:

(1) /storm/workerbeats/<topology-id>/node-port

(2) /storm/supervisors/<supervisor-id>

(3) /storm/errors/<topology-id>/<component-id>/e<sequential-id>

Nimbus需要从路径a读取当前已被分配的Worker的运行状态。根据该信息,Nimbus可以得知哪些Worker状态正常,哪些需要被重新调度,同时还会获取到该Worker所有Executor统计信息,这些信息会通过UI呈现给用户。从路径b可以获取当前集群中所有Supervisor的状态,通过这些信息可以得知哪些Supervisor上还有空闲的资源可用,哪些Supervisor则已经不再活跃,需要将分配到它的任务分配到其他节点上。从路径c上可以获取当前所有的错误信息并通过UI呈现给用户。

集群可动态增减机器,这会引起ZooKeeper中元数据的变化,Nimbus通过不断获取这些元数据信息来调整任务分配,故Storm具有良好的可伸缩性。当Nimbus死掉时,其他节点是可以继续工作的,但是不能提交新的Topology,也不能重新进行任务分配和负载调整,因此目前Nimbus还是存在单点的问题。随后Storm可配多个Nimbus,就不存在单节点问题了。

2. Supervisor

同Nimbus类似,Supervisor也要通过ZooKeeper来创建和获取元数据。除此之外,Supervisor还通过监控指定的本地文件来检测由它启动的所有Worker的运行状态。

箭头3表示Supervisor在ZooKeeper中创建的路径是/storm/supervisors/<supervisor-id>。新节点加人时,会在该路径下创建一个节点。值得注意的是,该节点是一个临时节点(创建ZooKeeper节点的一种模式),即只要Supervisor与ZooKeeper的连接稳定存在,该节点就一直存在;一旦连接断开,该节点则会被自动删除。该目录下的节点列表代表了目前活跃的机器。这保证了Nimbus能及时得知当前集群中机器的状态,这是Nimbus可以进行任务分配的基础,也是Storm具有容错性以及可伸缩性的基础。

箭头4表示Supervisor需要获取数据的路径是/storm/assigments/<topology-id>。我们知道它是Nimbus对Topology的任务分配信息,Supervisor从该路径可以获取到Nimbus分配给它的所有任务。Supervisor在本地保存上次的分配信息,对比这两部分信息可以得知分配信息是否有变化。若发生变化,则需要关闭被移除任务所对应的Worker,并启动新的Worker执行新分配的任务。Nimbus会尽量保持任务分配的稳定性。

箭头9表示Supervisor会从LocalState中获取由它启动的所有Worker的心跳信息。Supervisor会每隔一段时间检査一次这些心跳信息,如果发现某个Worker在这段时间内没有更新心跳信息,表明该Worker当前的运行状态出了问题。这时Supervisor就会杀掉这个Worker,原本分配给这个Worker的任务也会被Nimbus重新分配。

3. Worker

Worker也需要利用ZooKeeper来创建和获取元数据,同时它还需要利用本地的文件来记录自己的心跳信息。

箭头5表示Worker在ZooKeeper中创建的路径是/storm/workerbeats/<topology-id>/node-port。在Worker启动时,将创建一个与其对应的节点,相当于对自身进行注册。需要注意的是,Nimbus在Topology被提交时只会创建路径/storm/workerbeats/<topology-id>,而不会设置数据,数据则留到Worker启动之后由Worker创建。这样安排的目的之一是为了避免多个Worker同时创建路径时所导致的冲突。

箭头6表示Worker需要获取数据的路径是/storm/assignments/<topology-id>,Worker会从这些任务分配信息中取出分配给它的任务并执行。

箭头8表示Worker在LocalState中保存心跳信息。LocalState实际上将这些信息保存在本地文件中,Worker用这些信息跟Supervisor保持心跳,每隔几秒钟需要更新一次心跳信息。Worker与Supervisor属于不同的进程,因而Storm采用本地文件的方式来传递心跳。(进程间通信:共享文件)

4. Executor

箭头7表示Executor在ZooKeeper中创建的路径是/storm/errors/<topology-id>/<component-id>/e<sequential-id>。每个Executor会在运行过程中记录发生的错误。

5. 总结

(1) Nimbus感知Supervisor:通过/storm/supervisors/<supervisor-id>路径对应的数据进行心跳保持。Supervisor创建这个路径时釆用的是临时节点模式,所以只要Supervisor死掉,对应路径的数据就会被删掉,Nimbus就会将原本分配给该Supervisor的任务重新分配。

(2) Nimbus感知Worker:之间通过/storm/workerbeats/<topology-id>/node-port中的数据进行心跳保持。Nimbus会每隔一定时间获取该路径下的数据,同时Nimbus还会在它的内存中保存上一次的信息。如果发现某个Worker的心跳信息有一段时间没更新,就认为该Worker已经死掉了,Nimbus会对任务进行重新分配,将分配至该Worker的任务分配给其他Worker。

(3) Supervisor感知Worker:之间通过本地文件(基于LocalState ) 进行心跳保持。

特点

(1) 性能:并行程度高,基于消息,很少磁盘读写,处理速度快。

(2) 可靠性:依靠组件无状态,状态信息(元信息)保存在zookeeper上这种机制来保证。

(3) 可伸缩性:线性增加资源来提高性能(分布式系统的特点)。Storm的模块是无状态的,这是保证其可靠性及可伸缩性的基础。

(4) 快速失败,无状态:Storm的两种组件Nimbus和Supervisor都是快速失败的,没有状态。任务状态和心跳信息等都保存在Zookeeper。计算单元的依赖的数据全部在接收的消息中可以找到。

(5) 可扩展性:并行编程框架,思路清晰,业务代码容易扩展。消息分组方式是可扩展性的基础。

(6) 基本性质:Storm是一种计算引擎,Hadoop是一种大数据平台,包含计算引擎和存储系统。

(7) 数据来源:Hadoop处理的是HDFS上TB级别的数据(历史数据);Storm处理的是实时新增的某一笔数据(实时数据);

(8) 处理过程:Hadoop是批处理,分Map阶段到Reduce阶段;Storm是用户定义的流处理,流程中每个步骤可以是数据源(Spout)或处理逻辑(Bolt);

(9) 是否结束:Hadoop的Job执行完毕后结束;Storm的Topology没有结束状态。

(10) 无数据丢失:Storm创新性提出的ACK消息追踪框架。

本文参与?腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-12-06,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 章鱼沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com