1.在官网下载Flink 并解压到?/opt/software/flink-text/
tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
2.解压成功后
?local模式不需要添加额外配置
./bin/start-cluster.sh
3.验证是否正常启动
输入jps 验证进程是否启动
输入网址节点IP加端口号8081
flink单节点安装已经完成。
在大数据处理领域 批处理任务与流处理任务一般被认为是两种不同的任务 一个大数据框架一般会被设计为只能处理其中一种任务。
例如Storm只支持流处理任务 而MapReduce、Spark只支持批处理任务。Spark Streaming是Apache Spark之上支持流处理任务的子系统 看似是一个特例 其实并不是——Spark Streaming采用了一种micro-batch的架构 即把输入的数据流切分成细粒度的batch 并为每一个batch数据提交一个批处理的Spark任务 所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理 和Storm等完全流式的数据处理方式完全不同。Flink通过灵活的执行引擎 能够同时支持批处理任务与流处理任务在执行引擎这一层 流处理系统与批处理系统最大不同在于节点间的数据传输方式。对于一个流处理系统 其节点间数据传输的标准模型是 当一条数据被处理完成后 序列化到缓存中 然后立刻通过网络传输到下一个节点 由下一个节点继续处理而对于一个批处理系统 其节点间数据传输的标准模型是 当一条数据被处理完成后 序列化到缓存中 并不会立刻通过网络传输到下一个节点 当缓存写满 就持久化到本地硬盘上 当所有数据都被处理完成后 才开始将处理后的数据通过网络传输到下一个节点这两种数据传输模式是两个极端 对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求Flink的执行引擎采用了一种十分灵活的方式 同时支持了这两种数据传输模型Flink以固定的缓存块为单位进行网络数据传输 用户可以通过设置缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0 则Flink的数据传输方式类似上文所提到流处理系统的标准模型 此时系统可以获得最低的处理延迟如果缓存块的超时值为无限大 则Flink的数据传输方式类似上文所提到批处理系统的标准模型 此时系统可以获得最高的吞吐量同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小 则Flink流处理执行引擎的数据处理延迟越低 但吞吐量也会降低 反之亦然。通过调整缓存块的超时阈值 用户可根据需求灵活地权衡系统延迟和吞吐量3、Flink应用场景分析1.优化电商网站的实时搜索结果阿里巴巴的所有基础设施团队使用flink实时更新产品细节和库存信息(Blink)针对数据分析团队提供实时流处理服务
通过flink数据分析平台提供实时数据分析服务 及时发现问题网络/传感器检测和错误检测
Bouygues电信公司 是法国最大的电信供应商之一 使用flink监控其有线和无线网络 实现快速故障响应商业智能分析ETL
Zalando使用flink转换数据以便于加载到数据仓库 将复杂的转换操作转化为相对简单的并确保分析终端用户可以更快的访问数据(实时ETL2.Flink vs Storm vs SparkStreamingFlink在吞吐量上要优于strom 在延时上要强于spark流处理
小型项目低延迟建议用strom轻量级方标使用。
大型项目并且秒级别的实时处理可以满足需求的话 建议使用sparkStreaming。
要求消息投递语义为 Exactly Once 的场景 数据量较大 要求高吞吐低延迟的场景 需要进行状态管理或窗口统计的场景 建议使用flink。
4、Flink入门案例-wordCount需求分析
手工通过socket实时产生一些单词 使用flink实时接收数据 对指定时间窗口内(例如 2秒)的数据进行聚合统计 并且把时间窗口内计算的结果打印出来代码编写步骤如下
1 获得一个执行环境
2 加载/创建 初始化数据
3 指定操作数据的transaction算子
4 指定把计算好的数据放在哪
5 调用execute()触发执行程序
注意 Flink程序是延迟计算的 只有最后调用execute()方法的时候才会真正触发执行程序。延迟计算好处 你可以开发复杂的程序 但是Flink可以将复杂的程序转成一个Plan 将Plan作为一个整体单元执行测试执行
在自己的虚拟机上执行?nc -l 9000 然后输入字母
就会在控制台现在单词数量结果如下
public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception{ //获取需要的端口号 int port; try { ParameterTool parameterTool ParameterTool.fromArgs(args); port parameterTool.getInt( port ); }catch (Exception e){ System.err.println( No port set. use default port 9000--java ); port 9000; //获取flink的运行环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); String hostname 192.168.78.130 ; String delimiter \n ; //连接socket获取输入的数据 DataStreamSource String text env.socketTextStream(hostname, port, delimiter); // a a c // a 1 // a 1 // c 1 DataStream WordWithCount windowCounts text.flatMap(new FlatMapFunction String, WordWithCount () { public void flatMap(String value, Collector WordWithCount out) throws Exception { String[] splits value.split( \\s ); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); }).keyBy( word ) .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒 指定时间间隔为1秒 .sum( count );//在这里使用sum或者reduce都可以 /*.reduce(new ReduceFunction WordWithCount () { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word,a.count b.count); })*/ //把数据打印到控制台并且设置并行度 windowCounts.print().setParallelism(1); //这一行代码一定要实现 否则程序不执行 env.execute( Socket window count ); public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word,long count){ this.word word; this.count count; Override public String toString() { return WordWithCount{ word word \ , count count5、DataStream API之Data Sources
source是程序的数据源输入 你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。
flink提供了大量的已经实现好的source方法 你也可以自定义source
通过实现sourceFunction接口来自定义无并行度的source或者你也可以通过实现ParallelSourceFunction?接口 or 继承RichParallelSourceFunction?来自定义有并行度的source
已经实现好的source
基于文件
readTextFile(path)读取文本文件 文件遵循TextInputFormat 读取规则 逐行读取并返回。 不常用基于socket
socketTextStream基于集合
fromCollection(Collection)通过java 的collection集合创建一个数据流 集合中的所有元素必须是相同类型的。 常用自己测试自定义source的实现
没有并行度的数据源public class MyNoParalleSource implements SourceFunction Long { private long count 1L; private boolean isRunning true; * 主要的方法 * 启动一个source * 大部分情况下 都需要在这个run方法中实现一个循环 这样就可以循环产生数据了 * param ctx * throws Exception Override public void run(SourceContext Long ctx) throws Exception { while(isRunning){ ctx.collect(count); count ; //每秒产生一条数据 Thread.sleep(1000); * 取消一个cancel的时候会调用的方法 Override public void cancel() { isRunning false; }
测试程序
public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource Long text env.addSource(new MyNoParalleSource()).setParallelism(1);//注意 针对此source 并行度只能设置为1 DataStream Long num text.map(new MapFunction Long, Long () { Override public Long map(Long value) throws Exception { System.out.println( 接收到数据 value); return value; //每2秒钟处理一次数据 DataStream Long sum num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果 sum.print().setParallelism(1); String jobName StreamingDemoWithMyNoPralalleSource.class.getSimpleName(); env.execute(jobName); }
测试结果
有并行度的数据public class MyParalleSource implements ParallelSourceFunction Long { private long count 1L; private boolean isRunning true; * 主要的方法 * 启动一个source * 大部分情况下 都需要在这个run方法中实现一个循环 这样就可以循环产生数据了 * param ctx * throws Exception Override public void run(SourceContext Long ctx) throws Exception { while(isRunning){ ctx.collect(count); count ; //每秒产生一条数据 Thread.sleep(1000); * 取消一个cancel的时候会调用的方法 Override public void cancel() { isRunning false;
测试代码
public class StreamingDemoWithMyPralalleSource { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源 DataStreamSource Long text env.addSource(new MyParalleSource()).setParallelism(2);//主要是这里的不同 这里设置的并行度是2 首先数据源是一个并行的数据源 然后在设置你用几个平行去接这个数据源 DataStream Long num text.map(new MapFunction Long, Long () { Override public Long map(Long value) throws Exception { System.out.println( 接收到数据 value); return value; //每2秒钟处理一次数据 DataStream Long sum num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果 sum.print().setParallelism(1); String jobName StreamingDemoWithMyPralalleSource.class.getSimpleName(); env.execute(jobName); }
测试结果
高级有并行的实现/** * 自定义实现一个支持并行度的source * RichParallelSourceFunction 会额外提供open和close方法 * 针对source中如果需要获取其他链接资源 那么可以在open方法中获取资源链接 在close中关闭资源链接 * Created by xuwei.tech on 2018/10/23. public class MyRichParalleSource extends RichParallelSourceFunction Long { private long count 1L; private boolean isRunning true; * 主要的方法 * 启动一个source * 大部分情况下 都需要在这个run方法中实现一个循环 这样就可以循环产生数据了 * param ctx * throws Exception Override public void run(SourceContext Long ctx) throws Exception { while(isRunning){ ctx.collect(count); count ; //每秒产生一条数据 Thread.sleep(1000); * 取消一个cancel的时候会调用的方法 Override public void cancel() { isRunning false;
本文转自网络,原文链接:https://developer.aliyun.com/article/784957
本站部分内容转载于网络,版权归原作者所有,转载之目的在于传播更多优秀技术内容,如有侵权请联系QQ/微信:153890879删除,谢谢!
在 Go 语言中文网微信群里有人问了这么一个问题:(要加群记得在公众号回复消息入...
TOP云 (west.cn)3月30日讯,昨天是TOP云第一街.xyz精品保留 域名拍卖 会的倒数...
1. 接口描述 接口请求域名: cvm.tencentcloudapi.com 。 本接口 (RebootInstanc...
当前,制造业的转型升级是适应国际国内双循环的内在要求,也是推动碳达峰、碳中...
步骤 : ? 1)环境安装 node.js (下载安装包,下一步.....就行了) 这个是我的版本 I...
作者 缪可延 IBM 副总裁,大中华区云计算与认知软件事业部总经理 企业正处在一个...
一、getProperties()方法 1.System类提供一个getProperties()方法用来获取当前系...
本文转载自微信公众号「小麦大叔」,作者菜刀和小麦。转载本文请联系小麦大叔公...
吴雪军(东锤) 一、2020数据已经成为生产要素 数据中台为什么成为企业增长的刚需...
批量查询 域名 是否被注册? 注册域名 前需要查询域名,查询想要注册的域名是否...