前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Flink(二十四):???????Allowed Lateness案例演示

2021年大数据Flink(二十四):???????Allowed Lateness案例演示

作者头像
Lansonli
发布2021-10-09 18:01:07
5640
发布2021-10-09 18:01:07
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

Allowed Lateness案例演示

需求

有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

要求每隔5s,计算5秒内,每个用户的订单总金额

并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。

并使用OutputTag+allowedLateness解决数据丢失问题

API

代码语言:javascript
复制
package cn.it.watermaker;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

/**
?* Author lanson
?* Desc
?* 模拟实时订单数据,格式为: (订单ID,用户ID,订单金额,时间戳/事件时间)
?* 要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
?* 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。
?*/
public class WatermakerDemo03_AllowedLateness {
????public static void main(String[] args) throws Exception {
????????//1.env
????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
????????//2.Source
????????//模拟实时订单数据(数据有延迟和乱序)
????????DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {
????????????private boolean flag = true;
????????????@Override
????????????public void run(SourceContext<Order> ctx) throws Exception {
????????????????Random random = new Random();
????????????????while (flag) {
????????????????????String orderId = UUID.randomUUID().toString();
????????????????????int userId = random.nextInt(3);
????????????????????int money = random.nextInt(100);
????????????????????//模拟数据延迟和乱序!
????????????????????long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;
????????????????????ctx.collect(new Order(orderId, userId, money, eventTime));

????????????????????//TimeUnit.SECONDS.sleep(1);
????????????????}
????????????}
????????????@Override
????????????public void cancel() {
????????????????flag = false;
????????????}
????????});


????????//3.Transformation
????????DataStream<Order> watermakerDS = orderDS
????????????????.assignTimestampsAndWatermarks(
????????????????????????WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
????????????????????????????????.withTimestampAssigner((event, timestamp) -> event.getEventTime())
????????????????);

????????//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了
????????//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
????????OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));

????????SingleOutputStreamOperator<Order> result = watermakerDS
????????????????.keyBy(Order::getUserId)
????????????????//.timeWindow(Time.seconds(5), Time.seconds(5))
????????????????.window(TumblingEventTimeWindows.of(Time.seconds(5)))
????????????????.allowedLateness(Time.seconds(5))
????????????????.sideOutputLateData(outputTag)
????????????????.sum("money");

????????DataStream<Order> result2 = result.getSideOutput(outputTag);

????????//4.Sink
????????result.print("正常的数据和迟到不严重的数据");
????????result2.print("迟到严重的数据");

????????//5.execute
????????env.execute();
????}
????@Data
????@AllArgsConstructor
????@NoArgsConstructor
????public static class Order {
????????private String orderId;
????????private Integer userId;
????????private Integer money;
????????private Long eventTime;
????}
}
本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Allowed Lateness案例演示
    • 需求
      • API
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
      http://www.vxiaotou.com