下图就 word count 的 sum 所使用的StreamGroupedReduce类为例讲解了如何在代码中使用 keyed state:
//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/
使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)
//-1.定义一个状态用来存放最大值
private transient ValueState<Long> maxValueState;
//-2.创建一个状态描述符对象
ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
//-3.根据状态描述符获取State
maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);
?//-4.使用State
Long historyValue = maxValueState.value();
//判断当前值和历史值谁大
if (historyValue == null || currentValue > historyValue)
//-5.更新状态
maxValueState.update(currentValue);?????
package cn.it.state;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
?* Author lanson
?* Desc
?* 使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
?*/
public class StateDemo01_KeyedState {
????public static void main(String[] args) throws Exception {
????????//1.env
????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);//方便观察
????????//2.Source
????????DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements(
????????????????Tuple2.of("北京", 1L),
????????????????Tuple2.of("上海", 2L),
????????????????Tuple2.of("北京", 6L),
????????????????Tuple2.of("上海", 8L),
????????????????Tuple2.of("北京", 3L),
????????????????Tuple2.of("上海", 4L)
????????);
????????//3.Transformation
????????//使用KeyState中的ValueState获取流数据中的最大值(实际中直接使用maxBy即可)
????????//实现方式1:直接使用maxBy--开发中使用该方式即可
????????//min只会求出最小的那个字段,其他的字段不管
????????//minBy会求出最小的那个字段和对应的其他的字段
????????//max只会求出最大的那个字段,其他的字段不管
????????//maxBy会求出最大的那个字段和对应的其他的字段
????????SingleOutputStreamOperator<Tuple2<String, Long>> result = tupleDS.keyBy(t -> t.f0)
????????????????.maxBy(1);
????????//实现方式2:使用KeyState中的ValueState---学习测试时使用,或者后续项目中/实际开发中遇到复杂的Flink没有实现的逻辑,才用该方式!
????????SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0)
????????????????.map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
????????????????????//-1.定义状态用来存储最大值
????????????????????private ValueState<Long> maxValueState = null;
????????????????????@Override
????????????????????public void open(Configuration parameters) throws Exception {
????????????????????????//-2.定义状态描述符:描述状态的名称和里面的数据类型
????????????????????????ValueStateDescriptor descriptor = new ValueStateDescriptor("maxValueState", Long.class);
????????????????????????//-3.根据状态描述符初始化状态
????????????????????????maxValueState = getRuntimeContext().getState(descriptor);
????????????????????}
????????????????????@Override
????????????????????public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
????????????????????????//-4.使用State,取出State中的最大值/历史最大值
????????????????????????Long historyMaxValue = maxValueState.value();
????????????????????????Long currentValue = value.f1;
????????????????????????if (historyMaxValue == null || currentValue > historyMaxValue) {
????????????????????????????//5-更新状态,把当前的作为新的最大值存到状态中
????????????????????????????maxValueState.update(currentValue);
????????????????????????????return Tuple3.of(value.f0, currentValue, currentValue);
????????????????????????} else {
????????????????????????????return Tuple3.of(value.f0, currentValue, historyMaxValue);
????????????????????????}
????????????????????}
????????????????});
????????//4.Sink
????????//result.print();
????????result2.print();
????????//5.execute
????????env.execute();
????}
}
下图对 word count 示例中的FromElementsFunction类进行详解并分享如何在代码中使用 operator state:
//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/
使用ListState存储offset模拟Kafka的offset维护
//-1.声明一个OperatorState来记录offset
private ListState<Long> offsetState = null;
private Long offset = 0L;
//-2.创建状态描述器
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<Long>("offsetState", Long.class);
//-3.根据状态描述器获取State
offsetState = context.getOperatorStateStore().getListState(descriptor);
//-4.获取State中的值
Iterator<Long> iterator = offsetState.get().iterator();
if (iterator.hasNext()) {//迭代器中有值
????offset = iterator.next();//取出的值就是offset
}
offset += 1L;
ctx.collect("subTaskId:" + getRuntimeContext().getIndexOfThisSubtask() + ",当前的offset为:" + offset);
if (offset % 5 == 0) {//每隔5条消息,模拟一个异常
//-5.保存State到Checkpoint中
offsetState.clear();//清理内存中存储的offset到Checkpoint中
//-6.将offset存入State中
offsetState.add(offset);
package cn.it.state;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
?* Author lanson
?* Desc
?* 需求:
?* 使用OperatorState支持的数据结构ListState存储offset信息, 模拟Kafka的offset维护,
?* 其实就是FlinkKafkaConsumer底层对应offset的维护!
?*/
public class StateDemo02_OperatorState {
????public static void main(String[] args) throws Exception {
????????//1.env
????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
????????env.setParallelism(1);
????????//先直接使用下面的代码设置Checkpoint时间间隔和磁盘路径以及代码遇到异常后的重启策略,下午会学
????????env.enableCheckpointing(1000);//每隔1s执行一次Checkpoint
????????env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
????????env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
????????//固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
????????env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));
????????//2.Source
????????DataStreamSource<String> sourceData = env.addSource(new MyKafkaSource());
????????//3.Transformation
????????//4.Sink
????????sourceData.print();
????????//5.execute
????????env.execute();
????}
????/**
?????* MyKafkaSource就是模拟的FlinkKafkaConsumer并维护offset
?????*/
????public static class MyKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
????????//-1.声明一个OperatorState来记录offset
????????private ListState<Long> offsetState = null;
????????private Long offset = 0L;
????????private boolean flag = true;
????????@Override
????????public void initializeState(FunctionInitializationContext context) throws Exception {
????????????//-2.创建状态描述器
????????????ListStateDescriptor descriptor = new ListStateDescriptor("offsetState", Long.class);
????????????//-3.根据状态描述器初始化状态
????????????offsetState = context.getOperatorStateStore().getListState(descriptor);
????????}
????????@Override
????????public void run(SourceContext<String> ctx) throws Exception {
????????????//-4.获取并使用State中的值
????????????Iterator<Long> iterator = offsetState.get().iterator();
????????????if (iterator.hasNext()){
????????????????offset = iterator.next();
????????????}
????????????while (flag){
????????????????offset += 1;
????????????????int id = getRuntimeContext().getIndexOfThisSubtask();
????????????????ctx.collect("分区:"+id+"消费到的offset位置为:" + offset);//1 2 3 4 5 6
????????????????//Thread.sleep(1000);
????????????????TimeUnit.SECONDS.sleep(2);
????????????????if(offset % 5 == 0){
????????????????????System.out.println("程序遇到异常了.....");
????????????????????throw new Exception("程序遇到异常了.....");
????????????????}
????????????}
????????}
????????@Override
????????public void cancel() {
????????????flag = false;
????????}
????????/**
?????????* 下面的snapshotState方法会按照固定的时间间隔将State信息存储到Checkpoint/磁盘中,也就是在磁盘做快照!
?????????*/
????????@Override
????????public void snapshotState(FunctionSnapshotContext context) throws Exception {
????????????//-5.保存State到Checkpoint中
????????????offsetState.clear();//清理内存中存储的offset到Checkpoint中
????????????//-6.将offset存入State中
????????????offsetState.add(offset);
????????}
????}
}