2026/6/11 6:57:44
网站建设
项目流程
网站降权的原因,中山学校的网站建设,网站建设填空题,郑州发布官网前面我们介绍了 Flink 状态的分类和应用。今天从源码层面再看一下 Flink 是如何管理状态的。State 概述
关于 State 的详细介绍可以参考 Flink学习笔记#xff1a;状态类型和应用 和 Flink学习笔记#xff1a;状态后端这两篇文章#xff0c;为了方面阅读#xff0c;这里我…前面我们介绍了 Flink 状态的分类和应用。今天从源码层面再看一下 Flink 是如何管理状态的。State 概述关于 State 的详细介绍可以参考 Flink学习笔记状态类型和应用 和 Flink学习笔记状态后端这两篇文章为了方面阅读这里我们再简单介绍一下。State 使用State 是 Flink 做复杂逻辑所依赖的核心组件。它的分类如下常见的是 Keyed State 和 Operator StateKeyed State 作用于 KeyedStream 上Operator State 可以作用于所有的 Operator 上。Keyed State 使用时需要先创建 StateDescriptor然后再调用 getState 获取。ValueStateDescriptorTuple2Long,LongdescriptornewValueStateDescriptor(average,TypeInformation.of(newTypeHintTuple2Long,Long(){}));ValueStateTuple2Long,LongsumgetRuntimeContext().getState(descriptor);Opeartor State 的获取方式与 Keyed State 类似都需要 StateDescriptor。Operator State 在定义时需要实现 CheckpointedFunction。State 存储State Backend 用来管理 State 存储根据存储格式和存储类型的组合可以分为三类MemoryStateBackendHashMapStateBackend 和 JobManagerCheckpointStorage 的组合即将 State 以 Java 对象的形式存储在 JobManager 内存中。FsStateBackendHashMapStateBackend 和 FileSystemCheckpointStorage 的组合将 State 以 Java 对象的形式存储在远端文件系统中。RocksDBStateBackendEmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage 的组合State 序列化后存储在 RocksDB。创建 State Backend创建 State Backend 的入口在 StreamTaskStreamTask 是 Flink 部署和运行在 TaskManager 的基本单元。在 StreamTask 的 invoke 方法中会先调用 restoreStateAndGates 方法去创建 State Backend。完整的调用链路如下图所示。在 streamOperatorStateContext 方法中分别调用了 keyedStatedBackend 和 operatorStateBackend 来创建两种 State Backend。我们先来看 keyedStateBackend 的逻辑。protectedK,RextendsDisposableCloseableRkeyedStatedBackend(TypeSerializerKkeySerializer,StringoperatorIdentifierText,PrioritizedOperatorSubtaskStateprioritizedOperatorSubtaskStates,CloseableRegistrybackendCloseableRegistry,MetricGroupmetricGroup,doublemanagedMemoryFraction,StateObject.StateObjectSizeStatsCollectorstatsCollector,KeyedStateBackendCreatorK,RkeyedStateBackendCreator)throwsException{if(keySerializernull){returnnull;}......finalKeyGroupRangekeyGroupRangeKeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(taskInfo.getMaxNumberOfParallelSubtasks(),taskInfo.getNumberOfParallelSubtasks(),taskInfo.getIndexOfThisSubtask());// Now restore processing is included in backend building/constructing process, so we need// to make sure// each stream constructed in restore could also be closed in case of task cancel, for// example the data// input stream opened for serDe during restore.CloseableRegistrycancelStreamRegistryForRestorenewCloseableRegistry();backendCloseableRegistry.registerCloseable(cancelStreamRegistryForRestore);BackendRestorerProcedureR,KeyedStateHandlebackendRestorernewBackendRestorerProcedure((stateHandles)-{KeyedStateBackendParametersImplKparametersnewKeyedStateBackendParametersImpl(...);returnkeyedStateBackendCreator.create(...),parameters);},backendCloseableRegistry,logDescription);try{returnbackendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState(),statsCollector);}finally{if(backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)){IOUtils.closeQuietly(cancelStreamRegistryForRestore);}}}这里的创建过程也比较简单先是获取 KeyGroupRange它表示的是当前 Operator 上处理的 key 的范围。然后就是创建 StateBackend 实例这里通过 BackendRestorerProcedure 封装统一的恢复、异常处理和资源清理逻辑。operatorStateBackend 方法的逻辑相比较来说只是少了 KeyGroupRange 的处理直接创建 StateBackend 实例。创建和使用 State创建 KeyedStateKeyedState 是通过调用 StreamingRuntimeContext.getState 方法获取的。我们先来看完整的调用流程。在调用 getState 这些方法时都会先调用 keyedStateStore 提供的方法它是 Flink 提供的一个封装 keyedStateBackend 的接口。调用流程的最后是调用 keyedStateBackend 中的 createOrUpdateInternalState 方法这里我们以 HeapStateBackend 为例。publicN,SV,SEV,SextendsState,ISextendsSIScreateOrUpdateInternalState(NonnullTypeSerializerNnamespaceSerializer,NonnullStateDescriptorS,SVstateDesc,NonnullStateSnapshotTransformFactorySEVsnapshotTransformFactory,booleanallowFutureMetadataUpdates)throwsException{StateTableK,N,SVstateTabletryRegisterStateTable(namespaceSerializer,stateDesc,getStateSnapshotTransformFactory(stateDesc,snapshotTransformFactory),allowFutureMetadataUpdates);SuppressWarnings(unchecked)IScreatedState(IS)createdKVStates.get(stateDesc.getName());if(createdStatenull){StateCreateFactorystateCreateFactorySTATE_CREATE_FACTORIES.get(stateDesc.getType());if(stateCreateFactorynull){thrownewFlinkRuntimeException(stateNotSupportedMessage(stateDesc));}createdStatestateCreateFactory.createState(stateDesc,stateTable,getKeySerializer());}else{StateUpdateFactorystateUpdateFactorySTATE_UPDATE_FACTORIES.get(stateDesc.getType());if(stateUpdateFactorynull){thrownewFlinkRuntimeException(stateNotSupportedMessage(stateDesc));}createdStatestateUpdateFactory.updateState(stateDesc,stateTable,createdState);}createdKVStates.put(stateDesc.getName(),createdState);returncreatedState;}privatestaticfinalMapStateDescriptor.Type,StateCreateFactorySTATE_CREATE_FACTORIESStream.of(Tuple2.of(StateDescriptor.Type.VALUE,(StateCreateFactory)HeapValueState::create),Tuple2.of(StateDescriptor.Type.LIST,(StateCreateFactory)HeapListState::create),Tuple2.of(StateDescriptor.Type.MAP,(StateCreateFactory)HeapMapState::create),Tuple2.of(StateDescriptor.Type.AGGREGATING,(StateCreateFactory)HeapAggregatingState::create),Tuple2.of(StateDescriptor.Type.REDUCING,(StateCreateFactory)HeapReducingState::create)).collect(Collectors.toMap(t-t.f0,t-t.f1));这里首先是注册了一个 StateTable这个是 State 中一个非常重要的成员变量它内部是一个类似 Map 的结构用来保存 key 和 key 的状态。STATE_CREATE_FACTORIES 这个变量保存了不同类型的 State 和它对应的创建方法同理 STATE_UPDATE_FACTORIES 保存的是不同 State 对应的 更新方法。创建 OperatorState看完了 KeyedState 的创建过程后我们再来看下 OperatorState 的创建过程。OperatorState 的创建方法是通过 FunctionInitializationContext 先获取到 OperatorStateStore它与 KeyedStateStore 类似都是对 StateBackend 的方法进行了封装。OverridepublicvoidinitializeState(FunctionInitializationContextcontext)throwsException{ListStateDescriptorTuple2String,IntegerdescriptornewListStateDescriptor(buffered-elements,TypeInformation.of(newTypeHintTuple2String,Integer(){}));checkpointedStatecontext.getOperatorStateStore().getListState(descriptor);if(context.isRestored()){for(Tuple2String,Integerelement:checkpointedState.get()){bufferedElements.add(element);}}}OperatorStateStore 的 getListState 方法中直接创建出了 PartitionableListState同时也做了一些缓存操作。privateSListStateSgetListState(ListStateDescriptorSstateDescriptor,OperatorStateHandle.Modemode)throwsStateMigrationException{......PartitionableListStateSpartitionableListState(PartitionableListStateS)registeredOperatorStates.get(name);if(nullpartitionableListState){// no restored state for the state name; simply create new state holderpartitionableListStatenewPartitionableListState(newRegisteredOperatorStateBackendMetaInfo(name,partitionStateSerializer,mode));registeredOperatorStates.put(name,partitionableListState);}else{......}accessedStatesByName.put(name,partitionableListState);returnpartitionableListState;}PartitionableListState 内部有一个 ArrayList 用于保存数据。使用 KeyedState了解完 State 的创建之后接下来就是 State 的使用了。我们以 HeapValueState 为例来看如何获取 State。// HeapValueState 类publicVvalue(){finalVresultstateTable.get(currentNamespace);if(resultnull){returngetDefaultValue();}returnresult;}在 HeapValueState 类的 value 方法中直接调用 StateTable 的 get 方法最终调用的是 CopyOnWriteStateMap 的 get 方法这个方法与 HashMap 的 get 方法比较类似。publicSget(Kkey,Nnamespace){finalinthashcomputeHashForOperationAndDoIncrementalRehash(key,namespace);finalintrequiredVersionhighestRequiredSnapshotVersion;finalStateMapEntryK,N,S[]tabselectActiveTable(hash);intindexhash(tab.length-1);for(StateMapEntryK,N,Setab[index];e!null;ee.next){finalKeKeye.key;finalNeNamespacee.namespace;if((e.hashhashkey.equals(eKey)namespace.equals(eNamespace))){// copy-on-write check for stateif(e.stateVersionrequiredVersion){// copy-on-write check for entryif(e.entryVersionrequiredVersion){ehandleChainedEntryCopyOnWrite(tab,hash(tab.length-1),e);}e.stateVersionstateMapVersion;e.stategetStateSerializer().copy(e.state);}returne.state;}}returnnull;}使用 OperatorStateOperatorState 底层使用的是 PartitionableListState前面也提到了它的内部用了一个 ArrayList 来保存数据对于 OperatorState 的各种操作也都是来操作这个 ArrayList。Overridepublicvoidclear(){internalList.clear();}OverridepublicIterableSget(){returninternalList;}Overridepublicvoidadd(Svalue){Preconditions.checkNotNull(value,You cannot add null to a ListState.);internalList.add(value);}Overridepublicvoidupdate(ListSvalues){internalList.clear();addAll(values);}OverridepublicvoidaddAll(ListSvalues){Preconditions.checkNotNull(values,List of values to add cannot be null.);if(!values.isEmpty()){for(Svalue:values){checkNotNull(value,Any value to add to a list cannot be null.);add(value);}}}总结本文对 State 的相关代码进行了梳理。包括 StateBackend 的创建KeyedState 和 OperatorState 的创建和使用。State 和 Checkpoint 两者需要结合使用因此后面我们会再梳理 Checkpoint 的相关代码。