介绍了Flink的程序结构
任何程序都是需要有输入、处理、输出。 那么Flink同样也是,Flink专业术语对应Source,map,Sink。而在进行这些操作前,需要根据需求初始化运行环境
Flink 执行模式分为两种,一个是流处理、另一个是批处理。再选择好执行模式后,为了开始编写Flink程序,需要根据需求创建一个执行环境。Flink目前支持三种环境的创建方式:
getExecutionEnvironment()
。 它会根据你的环境来选择。 如果你在IDE中的本地环境中执行,那么它将启动本地执行环境。 否则,如果正在执行JAR,则Flink集群管理器将以分布式方式执行该程序。
流处理程序部分代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
批处理程序部分代码:
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"Who's she?","Alice")
Flink的source到底是什么?为了更好地理解,我们这里给出下面一个简单典型的wordcount程序。
//初始化流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定数据源
val text = env.socketTextStream(host, port, '\n')
//对数据源传入的数据进行处理
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
//输出结果
windowCounts.print()
在上述代码中val text = env.socketTextStream(host, port, '\n')
就是指定数据源。Flink的source多种多样,例如我们可以根据不同的需求来自定义source。
基于Socket
基于文件
自定义
基于文件
基于Collection
通用
在读取数据源以后就开始了数据的处理
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
flatMap 、map 、keyBy、timeWindow、sum那么这些就是对数据的处理。更多算子信息:
在上述代码中windowCounts.print()
也就是改程序的保存数据
这里输出可以说是非常简单的。而sink当然跟source一样也是可以自定义的。 因为Flink数据要保存到myslq,是不能直接保存的,所以需要自定义一个sink。不定义sink可以吗?可以的,那就是自己在写一遍,每次调用都复制一遍,这样造成大量的重复,所以我们需要自定义sink。 那么常见的sink有哪些?如下: flink在批处理中常见的sink 1.基于本地集合的sink(Collection-based-sink) 2.基于文件的sink(File-based-sink)
参考