前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Flink(二十六):???????State代码示例

2021年大数据Flink(二十六):???????State代码示例

作者头像
Lansonli
发布2021-10-09 17:58:24
6270
发布2021-10-09 17:58:24
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

State代码示例

Keyed State

下图就 word count 的 sum 所使用的StreamGroupedReduce类为例讲解了如何在代码中使用 keyed state:

官网代码示例

//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/

需求:

使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)

编码步骤

//-1.定义一个状态用来存放最大值

private transient ValueState<Long> maxValueState;

//-2.创建一个状态描述符对象

ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);

//-3.根据状态描述符获取State

maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);

?//-4.使用State

Long historyValue = maxValueState.value();

//判断当前值和历史值谁大

if (historyValue == null || currentValue > historyValue)

//-5.更新状态

maxValueState.update(currentValue);?????

代码示例

代码语言:javascript
复制
package cn.it.state;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
?* Author lanson
?* Desc
?* 使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
?*/
public class StateDemo01_KeyedState {
????public static void main(String[] args) throws Exception {
????????//1.env
????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);//方便观察

????????//2.Source
????????DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
????????????????Tuple2.of("北京", 1L),
????????????????Tuple2.of("上海", 2L),
????????????????Tuple2.of("北京", 6L),
????????????????Tuple2.of("上海", 8L),
????????????????Tuple2.of("北京", 3L),
????????????????Tuple2.of("上海", 4L)
????????);

????????//3.Transformation
????????//使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
????????//实现方式1:直接使用maxBy--开发中使用该方式即可
????????//min只会求出最小的那个字段,其他的字段不管
????????//minBy会求出最小的那个字段和对应的其他的字段
????????//max只会求出最大的那个字段,其他的字段不管
????????//maxBy会求出最大的那个字段和对应的其他的字段
????????SingleOutputStreamOperator<Tuple2<String, Long>> result = tupleDS.keyBy(t -> t.f0)
????????????????.maxBy(1);

????????//实现方式2:使用KeyState中的ValueState---学习测试时使用,或者后续项目中/实际开发中遇到复杂的Flink没有实现的逻辑,才用该方式!
????????SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0)
????????????????.map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
????????????????????//-1.定义状态用来存储最大值
????????????????????private ValueState<Long> maxValueState = null;

????????????????????@Override
????????????????????public void open(Configuration parameters) throws Exception {
????????????????????????//-2.定义状态描述符:描述状态的名称和里面的数据类型
????????????????????????ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
????????????????????????//-3.根据状态描述符初始化状态
????????????????????????maxValueState = getRuntimeContext().getState(descriptor);
????????????????????}

????????????????????@Override
????????????????????public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
????????????????????????//-4.使用State,取出State中的最大值/历史最大值
????????????????????????Long historyMaxValue = maxValueState.value();
????????????????????????Long currentValue = value.f1;
????????????????????????if (historyMaxValue == null || currentValue > historyMaxValue) {
????????????????????????????//5-更新状态,把当前的作为新的最大值存到状态中
????????????????????????????maxValueState.update(currentValue);
????????????????????????????return Tuple3.of(value.f0, currentValue, currentValue);
????????????????????????} else {
????????????????????????????return Tuple3.of(value.f0, currentValue, historyMaxValue);
????????????????????????}
????????????????????}
????????????????});


????????//4.Sink
????????//result.print();
????????result2.print();

????????//5.execute
????????env.execute();
????}
}

Operator State

下图对 word count 示例中的FromElementsFunction类进行详解并分享如何在代码中使用 operator state:

官网代码示例

//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/

需求:

使用ListState存储offset模拟Kafka的offset维护

编码步骤:

//-1.声明一个OperatorState来记录offset

private ListState<Long> offsetState = null;

private Long offset = 0L;

//-2.创建状态描述器

ListStateDescriptor<Long> descriptor = new ListStateDescriptor<Long>("offsetState", Long.class);

//-3.根据状态描述器获取State

offsetState = context.getOperatorStateStore().getListState(descriptor);

//-4.获取State中的值

Iterator<Long> iterator = offsetState.get().iterator();

if (iterator.hasNext()) {//迭代器中有值

????offset = iterator.next();//取出的值就是offset

}

offset += 1L;

ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",当前的offset为:" + offset);

if (offset % 5 == 0) {//每隔5条消息,模拟一个异常

//-5.保存State到Checkpoint中

offsetState.clear();//清理内存中存储的offset到Checkpoint中

//-6.将offset存入State中

offsetState.add(offset);

代码示例

代码语言:javascript
复制
package cn.it.state;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/**
?* Author lanson
?* Desc
?* 需求:
?* 使用OperatorState支持的数据结构ListState存储offset信息, 模拟Kafka的offset维护,
?* 其实就是FlinkKafkaConsumer底层对应offset的维护!
?*/
public class StateDemo02_OperatorState {
????public static void main(String[] args) throws Exception {
????????//1.env
????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);
????????//先直接使用下面的代码设置Checkpoint时间间隔和磁盘路径以及代码遇到异常后的重启策略,下午会学
????????env.enableCheckpointing(1000);//每隔1s执行一次Checkpoint
????????env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
????????env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
????????//固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
????????env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));

????????//2.Source
????????DataStreamSource<String> sourceData = env.addSource(new MyKafkaSource());

????????//3.Transformation
????????//4.Sink
????????sourceData.print();

????????//5.execute
????????env.execute();
????}

????/**
?????* MyKafkaSource就是模拟的FlinkKafkaConsumer并维护offset
?????*/
????public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
????????//-1.声明一个OperatorState来记录offset
????????private ListState<Long> offsetState = null;
????????private Long offset = 0L;
????????private boolean flag = true;

????????@Override
????????public void initializeState(FunctionInitializationContext context) throws Exception {
????????????//-2.创建状态描述器
????????????ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);
????????????//-3.根据状态描述器初始化状态
????????????offsetState = context.getOperatorStateStore().getListState(descriptor);
????????}

????????@Override
????????public void run(SourceContext<String> ctx) throws Exception {
????????????//-4.获取并使用State中的值
????????????Iterator<Long> iterator = offsetState.get().iterator();
????????????if (iterator.hasNext()){
????????????????offset = iterator.next();
????????????}
????????????while (flag){
????????????????offset += 1;
????????????????int id = getRuntimeContext().getIndexOfThisSubtask();
????????????????ctx.collect("分区:"+id+"消费到的offset位置为:" + offset);//1 2 3 4 5 6
????????????????//Thread.sleep(1000);
????????????????TimeUnit.SECONDS.sleep(2);
????????????????if(offset % 5 == 0){
????????????????????System.out.println("程序遇到异常了.....");
????????????????????throw new Exception("程序遇到异常了.....");
????????????????}
????????????}
????????}

????????@Override
????????public void cancel() {
????????????flag = false;
????????}

????????/**
?????????* 下面的snapshotState方法会按照固定的时间间隔将State信息存储到Checkpoint/磁盘中,也就是在磁盘做快照!
?????????*/
????????@Override
????????public void snapshotState(FunctionSnapshotContext context) throws Exception {
????????????//-5.保存State到Checkpoint中
????????????offsetState.clear();//清理内存中存储的offset到Checkpoint中
????????????//-6.将offset存入State中
????????????offsetState.add(offset);
????????}
????}
}
本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • State代码示例
    • Keyed State
      • 官网代码示例
      • 需求:
      • 编码步骤
      • 代码示例
    • Operator State
      • 官网代码示例
      • 需求:
      • 编码步骤:
      • 代码示例
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com