信息平台网站建设,网络营销推广方法是什么和什么的合理利用,网站开发 实时更新,如何下载wordpress插件之前的版本背景
目前spark的repartition()方法是随机分配数据到下游#xff0c;这会导致一个问题#xff0c;有时候如果我们用repartition方法的时候#xff0c;如果任务发生了重试#xff0c;就有可能导致任务的数据不准确#xff0c;那这个时候改怎么解决这个问题呢#xff1f; …背景
目前spark的repartition()方法是随机分配数据到下游这会导致一个问题有时候如果我们用repartition方法的时候如果任务发生了重试就有可能导致任务的数据不准确那这个时候改怎么解决这个问题呢
分析
在Spark RDD中存在着名为outputDeterministicLevel的变量如下
private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value {if (isReliablyCheckpointed) {DeterministicLevel.DETERMINATE} else {getOutputDeterministicLevel}}那么该变量的作用是什么呢让我们分析一下 改变量最终会被Stage的isIndeterminate方法调用 def isIndeterminate: Boolean {rdd.outputDeterministicLevel DeterministicLevel.INDETERMINATE}而该方法会被DAGScheduler调用有两处地方会被调用
submitMissingTasks中调用 private def submitMissingTasks(stage: Stage, jobId: Int): Unit {logDebug(submitMissingTasks( stage ))// Before find missing partition, do the intermediate state clean work first.// The operation here can make sure for the partially completed intermediate stage,// findMissingPartitions() returns all partitions every time.stage match {case sms: ShuffleMapStage if stage.isIndeterminate !sms.isAvailable mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)case _ }
该方法主要用于在重新提交失败的stage时候用来判断是否需要重新计算上游的所有任务。
handleTaskCompletion中调用 case FetchFailed(bmAddress, shuffleId, _, mapIndex, _, failureMessage) 。。。val noResubmitEnqueued !failedStages.contains(failedStage)failedStages failedStagefailedStages mapStageif (noResubmitEnqueued) {// If the map stage is INDETERMINATE, which means the map tasks may return// different result when re-try, we need to re-try all the tasks of the failed// stage and its succeeding stages, because the input data will be changed after the// map tasks are re-tried.// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is// guaranteed to be determinate, so the input data of the reducers will not change// even if the map tasks are re-tried.if (mapStage.isIndeterminate) {这里如果任务Fetch失败了根据该shuffle所对应的上游stage是不是isIndeterminate来向DAGScheduler提交ResubmitFailedStages事件从而调用submitMissingTasks方法进行上游所有任务或者单个任务的重试。
再回到outputDeterministicLevel变量该变量会调用getOutputDeterministicLevel方法进行循环调用上游的outputDeterministicLevel变量来确定outputDeterministicLevel的值。
结论
所以根据以上分析我们可以改写对应的RDD的outputDeterministicLevel变量或者getOutputDeterministicLevel方法来进行stage任务的全部重试与否