当前位置: 首页 > news >正文

那个平台的网页游戏好玩南宁seo教程

那个平台的网页游戏好玩,南宁seo教程,做网站包括哪些,app开发公司赚钱吗一文搞懂 Flink Graph 构建过程 1. StreamGraph构建过程1.1 transform(): 构建的核心1.2 transformOneInputTransform1.3 构造顶点1.4 构造边1.5 transformSource1.6 transformPartition1.7 transformSink 1. StreamGraph构建过程 链接: 一文搞懂 Flink 其他重要源码点击我 e… 一文搞懂 Flink Graph 构建过程 1. StreamGraph构建过程1.1 transform(): 构建的核心1.2 transformOneInputTransform1.3 构造顶点1.4 构造边1.5 transformSource1.6 transformPartition1.7 transformSink 1. StreamGraph构建过程 链接: 一文搞懂 Flink 其他重要源码点击我 env.execute将进行任务的提交和执行在执行之前会对任务进行StreamGraph和JobGraph的构建然后再提交JobGraph。 那么现在就来分析一下StreamGraph的构建过程在正式环境下会调用StreamContextEnvironment.execute()方法 public JobExecutionResult execute(String jobName) throws Exception {Preconditions.checkNotNull(jobName, Streaming Job name should not be null.);// 获取 StreamGraphreturn execute(getStreamGraph(jobName)); }public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {// 构建StreamGraphStreamGraph streamGraph getStreamGraphGenerator().setJobName(jobName).generate();if (clearTransformations) {// 构建完StreamGraph之后可以清空transformations列表this.transformations.clear();}return streamGraph; }实现在StreamGraphGenerator.generate(this, transformations);这里的transformations列表就是在之前调用map、flatMap、filter算子时添加进去的生成的DataStream的StreamTransformation是DataStream的描述信息。 接着看 public StreamGraph generate() {// 实例化 StreamGraphstreamGraph new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);streamGraph.setStateBackend(stateBackend);streamGraph.setChaining(chaining);streamGraph.setScheduleMode(scheduleMode);streamGraph.setUserArtifacts(userArtifacts);streamGraph.setTimeCharacteristic(timeCharacteristic);streamGraph.setJobName(jobName);streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);// 记录了已经transformed的StreamTransformation。alreadyTransformed new HashMap();for (Transformation? transformation: transformations) {// 根据 transformation 创建StreamGraphtransform(transformation);}final StreamGraph builtStreamGraph streamGraph;alreadyTransformed.clear();alreadyTransformed null;streamGraph null;return builtStreamGraph; }1.1 transform(): 构建的核心 private CollectionInteger transform(Transformation? transform) {if (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}LOG.debug(Transforming transform);if (transform.getMaxParallelism() 0) {// if the max parallelism hasnt been set, then first use the job wide max parallelism// from the ExecutionConfig.int globalMaxParallelismFromConfig executionConfig.getMaxParallelism();if (globalMaxParallelismFromConfig 0) {transform.setMaxParallelism(globalMaxParallelismFromConfig);}}// call at least once to trigger exceptions about MissingTypeInfotransform.getOutputType();CollectionInteger transformedIds;// 判断 transform 属于哪类 Transformationif (transform instanceof OneInputTransformation?, ?) {transformedIds transformOneInputTransform((OneInputTransformation?, ?) transform);} else if (transform instanceof TwoInputTransformation?, ?, ?) {transformedIds transformTwoInputTransform((TwoInputTransformation?, ?, ?) transform);} else if (transform instanceof AbstractMultipleInputTransformation?) {transformedIds transformMultipleInputTransform((AbstractMultipleInputTransformation?) transform);} else if (transform instanceof SourceTransformation) {transformedIds transformSource((SourceTransformation?) transform);} else if (transform instanceof LegacySourceTransformation?) {transformedIds transformLegacySource((LegacySourceTransformation?) transform);} else if (transform instanceof SinkTransformation?) {transformedIds transformSink((SinkTransformation?) transform);} else if (transform instanceof UnionTransformation?) {transformedIds transformUnion((UnionTransformation?) transform);} else if (transform instanceof SplitTransformation?) {transformedIds transformSplit((SplitTransformation?) transform);} else if (transform instanceof SelectTransformation?) {transformedIds transformSelect((SelectTransformation?) transform);} else if (transform instanceof FeedbackTransformation?) {transformedIds transformFeedback((FeedbackTransformation?) transform);} else if (transform instanceof CoFeedbackTransformation?) {transformedIds transformCoFeedback((CoFeedbackTransformation?) transform);} else if (transform instanceof PartitionTransformation?) {transformedIds transformPartition((PartitionTransformation?) transform);} else if (transform instanceof SideOutputTransformation?) {transformedIds transformSideOutput((SideOutputTransformation?) transform);} else {throw new IllegalStateException(Unknown transformation: transform);}// need this check because the iterate transformation adds itself before// transforming the feedback edgesif (!alreadyTransformed.containsKey(transform)) {alreadyTransformed.put(transform, transformedIds);}if (transform.getBufferTimeout() 0) {streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());} else {streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);}if (transform.getUid() ! null) {streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if (transform.getUserProvidedNodeHash() ! null) {streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {if (transform instanceof PhysicalTransformation transform.getUserProvidedNodeHash() null transform.getUid() null) {throw new IllegalStateException(Auto generated UIDs have been disabled but no UID or hash has been assigned to operator transform.getName());}}if (transform.getMinResources() ! null transform.getPreferredResources() ! null) {streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}streamGraph.setManagedMemoryWeight(transform.getId(), transform.getManagedMemoryWeight());return transformedIds; }拿WordCount举例在flatMap、map、reduce、addSink过程中会将生成的DataStream的StreamTransformation添加到transformations列表中。 addSource没有将StreamTransformation添加到transformations但是flatMap生成的StreamTransformation的input持有SourceTransformation的引用。 keyBy算子会生成KeyedStream但是它的StreamTransformation并不会添加到transformations列表中不过reduce生成的DataStream中的StreamTransformation中持有了KeyedStream的StreamTransformation的引用作为它的input。 所以WordCount中有4个StreamTransformation前3个算子均为OneInputTransformation最后一个为SinkTransformation。 1.2 transformOneInputTransform 由于transformations中第一个为OneInputTransformation所以代码首先会走到transformOneInputTransform((OneInputTransformation?, ?) transform) private IN, OUT CollectionInteger transformOneInputTransform(OneInputTransformationIN, OUT transform) {//首先递归的transform该OneInputTransformation的inputCollectionInteger inputIds transform(transform.getInput());//如果递归transform input时发现input已经被transform那么直接获取结果即可// the recursive call might have already transformed thisif (alreadyTransformed.containsKey(transform)) {return alreadyTransformed.get(transform);}//获取共享slot资源组默认分组名”default”String slotSharingGroup determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);//将该StreamTransformation添加到StreamGraph中当做一个顶点streamGraph.addOperator(transform.getId(),slotSharingGroup,transform.getCoLocationGroupKey(),transform.getOperator(),transform.getInputType(),transform.getOutputType(),transform.getName());if (transform.getStateKeySelector() ! null) {TypeSerializer? keySerializer transform.getStateKeyType().createSerializer(env.getConfig());streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);}//设置顶点的并行度和最大并行度streamGraph.setParallelism(transform.getId(), transform.getParallelism());streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());//根据该StreamTransformation有多少个input依次给StreamGraph添加边即input——currentfor (Integer inputId: inputIds) {streamGraph.addEdge(inputId, transform.getId(), 0);}//返回该StreamTransformation的idOneInputTransformation只有自身的一个id列表return Collections.singleton(transform.getId()); }transformOneInputTransform()方法的实现如下 1、先递归的transform该OneInputTransformation的input如果input已经transformed那么直接从map中取数据即可 2、将该StreamTransformation作为一个图的顶点添加到StreamGraph中并设置顶点的并行度和共享资源组 3、根据该StreamTransformation的input构造图的边有多少个input就会生成多少边不过OneInputTransformation顾名思义就是一个input所以会构造一条边即input——currentId 1.3 构造顶点 //StreamGraph类 public IN, OUT void addOperator(Integer vertexID,String slotSharingGroup,Nullable String coLocationGroup,StreamOperatorOUT operatorObject,TypeInformationIN inTypeInfo,TypeInformationOUT outTypeInfo,String operatorName) {//将StreamTransformation作为一个顶点添加到streamNodes中if (operatorObject instanceof StoppableStreamSource) {addNode(vertexID, slotSharingGroup, coLocationGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);} else if (operatorObject instanceof StreamSource) {//如果operator是StreamSource则Task类型为SourceStreamTaskaddNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorObject, operatorName);} else {//如果operator不是StreamSourceTask类型为OneInputStreamTaskaddNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorObject, operatorName);}TypeSerializerIN inSerializer inTypeInfo ! null !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;TypeSerializerOUT outSerializer outTypeInfo ! null !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;setSerializers(vertexID, inSerializer, null, outSerializer);if (operatorObject instanceof OutputTypeConfigurable outTypeInfo ! null) {SuppressWarnings(unchecked)OutputTypeConfigurableOUT outputTypeConfigurable (OutputTypeConfigurableOUT) operatorObject;// sets the output type which must be know at StreamGraph creation timeoutputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);}if (operatorObject instanceof InputTypeConfigurable) {InputTypeConfigurable inputTypeConfigurable (InputTypeConfigurable) operatorObject;inputTypeConfigurable.setInputType(inTypeInfo, executionConfig);}if (LOG.isDebugEnabled()) {LOG.debug(Vertex: {}, vertexID);} }protected StreamNode addNode(Integer vertexID,String slotSharingGroup,Nullable String coLocationGroup,Class? extends AbstractInvokable vertexClass,StreamOperator? operatorObject,String operatorName) {if (streamNodes.containsKey(vertexID)) {throw new RuntimeException(Duplicate vertexID vertexID);}//构造顶点添加到streamNodes中streamNodes是一个MapStreamNode vertex new StreamNode(environment,vertexID,slotSharingGroup,coLocationGroup,operatorObject,operatorName,new ArrayListOutputSelector?(),vertexClass);// 添加到 streamNodesstreamNodes.put(vertexID, vertex);return vertex; }1.4 构造边 public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,null, //这里开始传递的partitioner都是nullnew ArrayListString(),null //outputTag对侧输出有效);}private void addEdgeInternal(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner? partitioner,ListString outputNames,OutputTag outputTag) {if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {//针对侧输出int virtualId upStreamVertexID;upStreamVertexID virtualSideOutputNodes.get(virtualId).f0;if (outputTag null) {outputTag virtualSideOutputNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {int virtualId upStreamVertexID;upStreamVertexID virtualSelectNodes.get(virtualId).f0;if (outputNames.isEmpty()) {// selections that happen downstream override earlier selectionsoutputNames virtualSelectNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {//keyBy算子产生的PartitionTransform作为下游的input下游的StreamTransformation添加边时会走到这int virtualId upStreamVertexID;upStreamVertexID virtualPartitionNodes.get(virtualId).f0;if (partitioner null) {partitioner virtualPartitionNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);} else {//一般的OneInputTransform会走到这里StreamNode upstreamNode getStreamNode(upStreamVertexID);StreamNode downstreamNode getStreamNode(downStreamVertexID);// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.//如果上下顶点的并行度一致则用ForwardPartitioner否则用RebalancePartitionerif (partitioner null upstreamNode.getParallelism() downstreamNode.getParallelism()) {partitioner new ForwardPartitionerObject();} else if (partitioner null) {partitioner new RebalancePartitionerObject();}if (partitioner instanceof ForwardPartitioner) {if (upstreamNode.getParallelism() ! downstreamNode.getParallelism()) {throw new UnsupportedOperationException(Forward partitioning does not allow change of parallelism. Upstream operation: upstreamNode parallelism: upstreamNode.getParallelism() , downstream operation: downstreamNode parallelism: downstreamNode.getParallelism() You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.);}}//一条边包括上下顶点顶点之间的分区器等信息StreamEdge edge new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);//分别给顶点添加出边和入边getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);} }边的属性包括上下游的顶点和顶点之间的partitioner等信息。如果上下游的并行度一致那么他们之间的partitioner是ForwardPartitioner如果上下游的并行度不一致是RebalancePartitioner当然这前提是没有设置partitioner的前提下。如果显示设置了partitioner的情况例如keyBy算子在内部就确定了分区器是KeyGroupStreamPartitioner那么它们之间的分区器就是KeyGroupStreamPartitioner。 1.5 transformSource 上述说道transformOneInputTransform会先递归的transform该OneInputTransform的input那么对于WordCount中在transform第一个OneInputTransform时会首先transform它的input也就是SourceTransformation方法在transformSource() private T CollectionInteger transformSource(SourceTransformationT source) {String slotSharingGroup determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());//将source作为图的顶点和图的source添加StreamGraph中streamGraph.addSource(source.getId(),slotSharingGroup,source.getCoLocationGroupKey(),source.getOperator(),null,source.getOutputType(),Source: source.getName());if (source.getOperator().getUserFunction() instanceof InputFormatSourceFunction) {InputFormatSourceFunctionT fs (InputFormatSourceFunctionT) source.getOperator().getUserFunction();streamGraph.setInputFormat(source.getId(), fs.getFormat());}//设置source的并行度streamGraph.setParallelism(source.getId(), source.getParallelism());streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());//返回自身id的单列表return Collections.singleton(source.getId()); }//StreamGraph类 public IN, OUT void addSource(Integer vertexID,String slotSharingGroup,Nullable String coLocationGroup,StreamOperatorOUT operatorObject,TypeInformationIN inTypeInfo,TypeInformationOUT outTypeInfo,String operatorName) {addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);sources.add(vertexID); }transformSource()逻辑也比较简单就是将source添加到StreamGraph中注意Source是图的根节点没有input所以它不需要添加边。 1.6 transformPartition 在WordCount中由reduce生成的DataStream的StreamTransformation是一个OneInputTransformation同样在transform它的时候会首先transform它的input而它的input就是KeyedStream中生成的PartitionTransformation。所以代码会执行transformPartition((PartitionTransformation?) transform) private T CollectionInteger transformPartition(PartitionTransformationT partition) {StreamTransformationT input partition.getInput();ListInteger resultIds new ArrayList();//首先会transform PartitionTransformation的inputCollectionInteger transformedIds transform(input);for (Integer transformedId: transformedIds) {//生成一个新的虚拟节点int virtualId StreamTransformation.getNewNodeId();//将虚拟节点添加到StreamGraph中streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());resultIds.add(virtualId);}return resultIds; }//StreamGraph类 public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner? partitioner) {if (virtualPartitionNodes.containsKey(virtualId)) {throw new IllegalStateException(Already has virtual partition node with id virtualId);}// virtualPartitionNodes是一个Map存储虚拟的Partition节点virtualPartitionNodes.put(virtualId,new Tuple2Integer, StreamPartitioner?(originalId, partitioner)); }transformPartition会为PartitionTransformation生成一个新的虚拟节点同时将该虚拟节点保存到StreamGraph的virtualPartitionNodes中并且会保存该PartitionTransformation的input将PartitionTransformation的partitioner作为其input的分区器在WordCount中也就是作为map算子生成的DataStream的分区器。 在进行transform reduce生成的OneInputTransform时它的inputIds便是transformPartition时为PartitionTransformation生成新的虚拟节点在添加边的时候会走到下面的代码。边的形式大致是OneInputTransform(map)—KeyGroupStreamPartitioner—OneInputTransform(window) } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {//keyBy算子产生的PartitionTransform作为下游的input下游的StreamTransform添加边时会走到这int virtualId upStreamVertexID;upStreamVertexID virtualPartitionNodes.get(virtualId).f0;if (partitioner null) {partitioner virtualPartitionNodes.get(virtualId).f1;}addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);1.7 transformSink 在WordCount中transformations的最后一个StreamTransformation是SinkTransformation方法在 transformSink() private T CollectionInteger transformSink(SinkTransformationT sink) {//首先transform inputCollectionInteger inputIds transform(sink.getInput());String slotSharingGroup determineSlotSharingGroup(sink.getSlotSharingGroup(), inputIds);//给图添加sink并且构造并添加图的顶点streamGraph.addSink(sink.getId(),slotSharingGroup,sink.getCoLocationGroupKey(),sink.getOperator(),sink.getInput().getOutputType(),null,Sink: sink.getName());streamGraph.setParallelism(sink.getId(), sink.getParallelism());streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism());//构造并添加StreamGraph的边for (Integer inputId: inputIds) {streamGraph.addEdge(inputId,sink.getId(),0);}if (sink.getStateKeySelector() ! null) {TypeSerializer? keySerializer sink.getStateKeyType().createSerializer(env.getConfig());streamGraph.setOneInputStateKey(sink.getId(), sink.getStateKeySelector(), keySerializer);}//因为sink是图的末尾节点没有下游的输出所以返回空了return Collections.emptyList(); } //StreamGraph类 public IN, OUT void addSink(Integer vertexID,String slotSharingGroup,Nullable String coLocationGroup,StreamOperatorOUT operatorObject,TypeInformationIN inTypeInfo,TypeInformationOUT outTypeInfo,String operatorName) {addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);sinks.add(vertexID); }transformSink()中的逻辑也比较简单和transformSource()类似不同的是sink是有边的而且sink的下游没有输出了也就不需要作为下游的input所以返回空列表。 在transformSink()之后也就把所有的StreamTransformation的都进行transform了那么这时候StreamGraph中的顶点、边、partition的虚拟顶点都构建好了返回StreamGraph即可。下一步就是根据StreamGraph构造JobGraph了。 可以看出StreamGraph的结构还是比较简单的每个DataStream的StreamTransformation会作为一个图的顶点PartitionTransform是虚拟顶点根据StreamTransformation的input来构建图的边。
http://www.hkea.cn/news/14437921/

相关文章:

  • 类似直播平台网站的建设费用企业网站配色
  • 怎么添加网站内锚点高端的佛山网站建设价格
  • 重庆御临建筑公司官网wordpress 4.9优化
  • 平顶山市哪里有做网站的广州网站关键排名
  • 怎样做能直接上传微信的视频网站哪家代理注册公司好
  • 昆明做网站的张雪峰对市场营销专业的建议
  • 爱站网官网查询域名跳转链接
  • js与asp.net做的网站wordpress侧边栏作者
  • 企业网站适合响应式嘛广州网站建设优化
  • 贵州网站建设系统自己做网站麻烦吗
  • 代刷网站搭建教程北京东城网站建设
  • 如何攻击Wordpress站点谷歌网站提交入口
  • c 做游戏的网站教学教育网站制作定制
  • 网站 昆明网站建设管理总结
  • 商品展示网站源码合肥站建设
  • 鼎湖网站建设公司优秀网页设计作品分析ppt
  • seo优化网站建设张戈博客wordpress主题
  • 网站建设怎么挣钱国内主机wordpress
  • 广告设计学的是什么成都关键词优化技术
  • 南通制作网站高埗东莞微信网站建设
  • 网站返回500错误页面网站推广公司兴田德润
  • 有哪些可以做推广的网站dw网页设计个人介绍
  • 网站备案许可证号wordpress的安装过程
  • 网站开发工作经验简历php网站开发程序
  • 国内好的seo网站律师免费咨询
  • 加盟网站制作公司西宁seo快速排名
  • 网站设计需求书黄岩区建设规划局网站
  • wordpress 全站ssl网址转化短链接
  • 兰州网站建设redu上海网络维护培训班
  • 网站开发实现的环境logo素材大图