那个平台的网页游戏好玩,南宁seo教程,做网站包括哪些,app开发公司赚钱吗一文搞懂 Flink Graph 构建过程 1. StreamGraph构建过程1.1 transform(): 构建的核心1.2 transformOneInputTransform1.3 构造顶点1.4 构造边1.5 transformSource1.6 transformPartition1.7 transformSink 1. StreamGraph构建过程
链接: 一文搞懂 Flink 其他重要源码点击我
e… 一文搞懂 Flink Graph 构建过程 1. StreamGraph构建过程1.1 transform(): 构建的核心1.2 transformOneInputTransform1.3 构造顶点1.4 构造边1.5 transformSource1.6 transformPartition1.7 transformSink 1. StreamGraph构建过程
链接: 一文搞懂 Flink 其他重要源码点击我
env.execute将进行任务的提交和执行在执行之前会对任务进行StreamGraph和JobGraph的构建然后再提交JobGraph。 那么现在就来分析一下StreamGraph的构建过程在正式环境下会调用StreamContextEnvironment.execute()方法
public JobExecutionResult execute(String jobName) throws Exception {Preconditions.checkNotNull(jobName, Streaming Job name should not be null.);// 获取 StreamGraphreturn execute(getStreamGraph(jobName));
}public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {// 构建StreamGraphStreamGraph streamGraph getStreamGraphGenerator().setJobName(jobName).generate();if (clearTransformations) {// 构建完StreamGraph之后可以清空transformations列表this.transformations.clear();}return streamGraph;
}实现在StreamGraphGenerator.generate(this, transformations);这里的transformations列表就是在之前调用map、flatMap、filter算子时添加进去的生成的DataStream的StreamTransformation是DataStream的描述信息。 接着看
public StreamGraph generate() {// 实例化 StreamGraphstreamGraph new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);streamGraph.setStateBackend(stateBackend);streamGraph.setChaining(chaining);streamGraph.setScheduleMode(scheduleMode);streamGraph.setUserArtifacts(userArtifacts);streamGraph.setTimeCharacteristic(timeCharacteristic);streamGraph.setJobName(jobName);streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);// 记录了已经transformed的StreamTransformation。alreadyTransformed new HashMap();for (Transformation? transformation: transformations) {// 根据 transformation 创建StreamGraphtransform(transformation);}final StreamGraph builtStreamGraph streamGraph;alreadyTransformed.clear();alreadyTransformed null;streamGraph null;return builtStreamGraph;
}1.1 transform(): 构建的核心
private CollectionInteger transform(Transformation? transform) {if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}LOG.debug(Transforming transform);if (transform.getMaxParallelism() 0) {// if the max parallelism hasnt been set, then first use the job wide max parallelism// from the ExecutionConfig.int globalMaxParallelismFromConfig executionConfig.getMaxParallelism();if (globalMaxParallelismFromConfig 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}// call at least once to trigger exceptions about MissingTypeInfotransform.getOutputType();CollectionInteger transformedIds;// 判断 transform 属于哪类 Transformationif (transform instanceof OneInputTransformation?, ?) {transformedIds transformOneInputTransform((OneInputTransformation?, ?) transform);} else if (transform instanceof TwoInputTransformation?, ?, ?) {transformedIds transformTwoInputTransform((TwoInputTransformation?, ?, ?) transform);} else if (transform instanceof AbstractMultipleInputTransformation?) {transformedIds transformMultipleInputTransform((AbstractMultipleInputTransformation?) transform);} else if (transform instanceof SourceTransformation) {transformedIds transformSource((SourceTransformation?) transform);} else if (transform instanceof LegacySourceTransformation?) {transformedIds transformLegacySource((LegacySourceTransformation?) transform);} else if (transform instanceof SinkTransformation?) {transformedIds transformSink((SinkTransformation?) transform);} else if (transform instanceof UnionTransformation?) {transformedIds transformUnion((UnionTransformation?) transform);} else if (transform instanceof SplitTransformation?) {transformedIds transformSplit((SplitTransformation?) transform);} else if (transform instanceof SelectTransformation?) {transformedIds transformSelect((SelectTransformation?) transform);} else if (transform instanceof FeedbackTransformation?) {transformedIds transformFeedback((FeedbackTransformation?) transform);} else if (transform instanceof CoFeedbackTransformation?) {transformedIds transformCoFeedback((CoFeedbackTransformation?) transform);} else if (transform instanceof PartitionTransformation?) {transformedIds transformPartition((PartitionTransformation?) transform);} else if (transform instanceof SideOutputTransformation?) {transformedIds transformSideOutput((SideOutputTransformation?) transform);} else {throw new IllegalStateException(Unknown transformation: transform);}// need this check because the iterate transformation adds itself before// transforming the feedback edgesif (!alreadyTransformed.containsKey(transform)) {alreadyTransformed.put(transform, transformedIds);}if (transform.getBufferTimeout() 0) {streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());} else {streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);}if (transform.getUid() ! null) {streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if (transform.getUserProvidedNodeHash() ! null) {streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {if (transform instanceof PhysicalTransformation transform.getUserProvidedNodeHash() null transform.getUid() null) {throw new IllegalStateException(Auto generated UIDs have been disabled but no UID or hash has been assigned to operator transform.getName());}}if (transform.getMinResources() ! null transform.getPreferredResources() ! null) {streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}streamGraph.setManagedMemoryWeight(transform.getId(), transform.getManagedMemoryWeight());return transformedIds;
}拿WordCount举例在flatMap、map、reduce、addSink过程中会将生成的DataStream的StreamTransformation添加到transformations列表中。 addSource没有将StreamTransformation添加到transformations但是flatMap生成的StreamTransformation的input持有SourceTransformation的引用。 keyBy算子会生成KeyedStream但是它的StreamTransformation并不会添加到transformations列表中不过reduce生成的DataStream中的StreamTransformation中持有了KeyedStream的StreamTransformation的引用作为它的input。 所以WordCount中有4个StreamTransformation前3个算子均为OneInputTransformation最后一个为SinkTransformation。
1.2 transformOneInputTransform
由于transformations中第一个为OneInputTransformation所以代码首先会走到transformOneInputTransform((OneInputTransformation?, ?) transform)
private IN, OUT CollectionInteger transformOneInputTransform(OneInputTransformationIN, OUT transform) {//首先递归的transform该OneInputTransformation的inputCollectionInteger inputIds transform(transform.getInput());//如果递归transform input时发现input已经被transform那么直接获取结果即可// the recursive call might have already transformed thisif (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}//获取共享slot资源组默认分组名”default”String slotSharingGroup determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);//将该StreamTransformation添加到StreamGraph中当做一个顶点streamGraph.addOperator(transform.getId(),slotSharingGroup,transform.getCoLocationGroupKey(),transform.getOperator(),transform.getInputType(),transform.getOutputType(),transform.getName());if (transform.getStateKeySelector() ! null) {TypeSerializer? keySerializer transform.getStateKeyType().createSerializer(env.getConfig());streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);}//设置顶点的并行度和最大并行度streamGraph.setParallelism(transform.getId(), transform.getParallelism());streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());//根据该StreamTransformation有多少个input依次给StreamGraph添加边即input——currentfor (Integer inputId: inputIds) {streamGraph.addEdge(inputId, transform.getId(), 0);}//返回该StreamTransformation的idOneInputTransformation只有自身的一个id列表return Collections.singleton(transform.getId());
}transformOneInputTransform()方法的实现如下 1、先递归的transform该OneInputTransformation的input如果input已经transformed那么直接从map中取数据即可 2、将该StreamTransformation作为一个图的顶点添加到StreamGraph中并设置顶点的并行度和共享资源组 3、根据该StreamTransformation的input构造图的边有多少个input就会生成多少边不过OneInputTransformation顾名思义就是一个input所以会构造一条边即input——currentId
1.3 构造顶点
//StreamGraph类
public IN, OUT void addOperator(Integer vertexID,String slotSharingGroup,Nullable String coLocationGroup,StreamOperatorOUT operatorObject,TypeInformationIN inTypeInfo,TypeInformationOUT outTypeInfo,String operatorName) {//将StreamTransformation作为一个顶点添加到streamNodes中if (operatorObject instanceof StoppableStreamSource) {addNode(vertexID, slotSharingGroup, coLocationGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);} else if (operatorObject instanceof StreamSource) {//如果operator是StreamSource则Task类型为SourceStreamTaskaddNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorObject, operatorName);} else {//如果operator不是StreamSourceTask类型为OneInputStreamTaskaddNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorObject, operatorName);}TypeSerializerIN inSerializer inTypeInfo ! null !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;TypeSerializerOUT outSerializer outTypeInfo ! null !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;setSerializers(vertexID, inSerializer, null, outSerializer);if (operatorObject instanceof OutputTypeConfigurable outTypeInfo ! null) {SuppressWarnings(unchecked)OutputTypeConfigurableOUT outputTypeConfigurable (OutputTypeConfigurableOUT) operatorObject;// sets the output type which must be know at StreamGraph creation timeoutputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);}if (operatorObject instanceof InputTypeConfigurable) {InputTypeConfigurable inputTypeConfigurable (InputTypeConfigurable) operatorObject;inputTypeConfigurable.setInputType(inTypeInfo, executionConfig);}if (LOG.isDebugEnabled()) {LOG.debug(Vertex: {}, vertexID);}
}protected StreamNode addNode(Integer vertexID,String slotSharingGroup,Nullable String coLocationGroup,Class? extends AbstractInvokable vertexClass,StreamOperator? operatorObject,String operatorName) {if (streamNodes.containsKey(vertexID)) {throw new RuntimeException(Duplicate vertexID vertexID);}//构造顶点添加到streamNodes中streamNodes是一个MapStreamNode vertex new StreamNode(environment,vertexID,slotSharingGroup,coLocationGroup,operatorObject,operatorName,new ArrayListOutputSelector?(),vertexClass);// 添加到 streamNodesstreamNodes.put(vertexID, vertex);return vertex;
}1.4 构造边 public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,null, //这里开始传递的partitioner都是nullnew ArrayListString(),null //outputTag对侧输出有效);}private void addEdgeInternal(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner? partitioner,ListString outputNames,OutputTag outputTag) {if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {//针对侧输出int virtualId upStreamVertexID;upStreamVertexID virtualSideOutputNodes.get(virtualId).f0;if (outputTag null) {outputTag virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {int virtualId upStreamVertexID;upStreamVertexID virtualSelectNodes.get(virtualId).f0;if (outputNames.isEmpty()) {// selections that happen downstream override earlier selectionsoutputNames virtualSelectNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {//keyBy算子产生的PartitionTransform作为下游的input下游的StreamTransformation添加边时会走到这int virtualId upStreamVertexID;upStreamVertexID virtualPartitionNodes.get(virtualId).f0;if (partitioner null) {partitioner virtualPartitionNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else {//一般的OneInputTransform会走到这里StreamNode upstreamNode getStreamNode(upStreamVertexID);StreamNode downstreamNode getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.//如果上下顶点的并行度一致则用ForwardPartitioner否则用RebalancePartitionerif (partitioner null upstreamNode.getParallelism() downstreamNode.getParallelism()) {partitioner new ForwardPartitionerObject();} else if (partitioner null) {partitioner new RebalancePartitionerObject();}if (partitioner instanceof ForwardPartitioner) {if (upstreamNode.getParallelism() ! downstreamNode.getParallelism()) {throw new UnsupportedOperationException(Forward partitioning does not allow change of parallelism. Upstream operation: upstreamNode parallelism: upstreamNode.getParallelism() , downstream operation: downstreamNode parallelism: downstreamNode.getParallelism() You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.);}}//一条边包括上下顶点顶点之间的分区器等信息StreamEdge edge new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);//分别给顶点添加出边和入边getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}
}边的属性包括上下游的顶点和顶点之间的partitioner等信息。如果上下游的并行度一致那么他们之间的partitioner是ForwardPartitioner如果上下游的并行度不一致是RebalancePartitioner当然这前提是没有设置partitioner的前提下。如果显示设置了partitioner的情况例如keyBy算子在内部就确定了分区器是KeyGroupStreamPartitioner那么它们之间的分区器就是KeyGroupStreamPartitioner。
1.5 transformSource
上述说道transformOneInputTransform会先递归的transform该OneInputTransform的input那么对于WordCount中在transform第一个OneInputTransform时会首先transform它的input也就是SourceTransformation方法在transformSource()
private T CollectionInteger transformSource(SourceTransformationT source) {String slotSharingGroup determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());//将source作为图的顶点和图的source添加StreamGraph中streamGraph.addSource(source.getId(),slotSharingGroup,source.getCoLocationGroupKey(),source.getOperator(),null,source.getOutputType(),Source: source.getName());if (source.getOperator().getUserFunction() instanceof InputFormatSourceFunction) {InputFormatSourceFunctionT fs (InputFormatSourceFunctionT) source.getOperator().getUserFunction();streamGraph.setInputFormat(source.getId(), fs.getFormat());}//设置source的并行度streamGraph.setParallelism(source.getId(), source.getParallelism());streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());//返回自身id的单列表return Collections.singleton(source.getId());
}//StreamGraph类
public IN, OUT void addSource(Integer vertexID,String slotSharingGroup,Nullable String coLocationGroup,StreamOperatorOUT operatorObject,TypeInformationIN inTypeInfo,TypeInformationOUT outTypeInfo,String operatorName) {addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);sources.add(vertexID);
}transformSource()逻辑也比较简单就是将source添加到StreamGraph中注意Source是图的根节点没有input所以它不需要添加边。
1.6 transformPartition
在WordCount中由reduce生成的DataStream的StreamTransformation是一个OneInputTransformation同样在transform它的时候会首先transform它的input而它的input就是KeyedStream中生成的PartitionTransformation。所以代码会执行transformPartition((PartitionTransformation?) transform)
private T CollectionInteger transformPartition(PartitionTransformationT partition) {StreamTransformationT input partition.getInput();ListInteger resultIds new ArrayList();//首先会transform PartitionTransformation的inputCollectionInteger transformedIds transform(input);for (Integer transformedId: transformedIds) {//生成一个新的虚拟节点int virtualId StreamTransformation.getNewNodeId();//将虚拟节点添加到StreamGraph中streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());resultIds.add(virtualId);}return resultIds;
}//StreamGraph类
public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner? partitioner) {if (virtualPartitionNodes.containsKey(virtualId)) {throw new IllegalStateException(Already has virtual partition node with id virtualId);}// virtualPartitionNodes是一个Map存储虚拟的Partition节点virtualPartitionNodes.put(virtualId,new Tuple2Integer, StreamPartitioner?(originalId, partitioner));
}transformPartition会为PartitionTransformation生成一个新的虚拟节点同时将该虚拟节点保存到StreamGraph的virtualPartitionNodes中并且会保存该PartitionTransformation的input将PartitionTransformation的partitioner作为其input的分区器在WordCount中也就是作为map算子生成的DataStream的分区器。
在进行transform reduce生成的OneInputTransform时它的inputIds便是transformPartition时为PartitionTransformation生成新的虚拟节点在添加边的时候会走到下面的代码。边的形式大致是OneInputTransform(map)—KeyGroupStreamPartitioner—OneInputTransform(window)
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {//keyBy算子产生的PartitionTransform作为下游的input下游的StreamTransform添加边时会走到这int virtualId upStreamVertexID;upStreamVertexID virtualPartitionNodes.get(virtualId).f0;if (partitioner null) {partitioner virtualPartitionNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);1.7 transformSink
在WordCount中transformations的最后一个StreamTransformation是SinkTransformation方法在 transformSink()
private T CollectionInteger transformSink(SinkTransformationT sink) {//首先transform inputCollectionInteger inputIds transform(sink.getInput());String slotSharingGroup determineSlotSharingGroup(sink.getSlotSharingGroup(), inputIds);//给图添加sink并且构造并添加图的顶点streamGraph.addSink(sink.getId(),slotSharingGroup,sink.getCoLocationGroupKey(),sink.getOperator(),sink.getInput().getOutputType(),null,Sink: sink.getName());streamGraph.setParallelism(sink.getId(), sink.getParallelism());streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism());//构造并添加StreamGraph的边for (Integer inputId: inputIds) {streamGraph.addEdge(inputId,sink.getId(),0);}if (sink.getStateKeySelector() ! null) {TypeSerializer? keySerializer sink.getStateKeyType().createSerializer(env.getConfig());streamGraph.setOneInputStateKey(sink.getId(), sink.getStateKeySelector(), keySerializer);}//因为sink是图的末尾节点没有下游的输出所以返回空了return Collections.emptyList();
}
//StreamGraph类
public IN, OUT void addSink(Integer vertexID,String slotSharingGroup,Nullable String coLocationGroup,StreamOperatorOUT operatorObject,TypeInformationIN inTypeInfo,TypeInformationOUT outTypeInfo,String operatorName) {addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);sinks.add(vertexID);
}transformSink()中的逻辑也比较简单和transformSource()类似不同的是sink是有边的而且sink的下游没有输出了也就不需要作为下游的input所以返回空列表。
在transformSink()之后也就把所有的StreamTransformation的都进行transform了那么这时候StreamGraph中的顶点、边、partition的虚拟顶点都构建好了返回StreamGraph即可。下一步就是根据StreamGraph构造JobGraph了。 可以看出StreamGraph的结构还是比较简单的每个DataStream的StreamTransformation会作为一个图的顶点PartitionTransform是虚拟顶点根据StreamTransformation的input来构建图的边。