温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Driver容错安全性怎么实现

发布时间:2021-12-16 16:32:55 来源:亿速云 阅读:119 作者:iii 栏目:云计算

这篇文章主要介绍“Driver容错安全性怎么实现”,在日常操作中,相信很多人在Driver容错安全性怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Driver容错安全性怎么实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

  • ·  第一、看ReceiverTracker的容错,主要是ReceiverTracker接收元数据的进入WAL,看ReceiverTracker的addBlock方法,代码如下

    def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

     try {

       val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))

       if (writeResult) {

         synchronized {

           getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

         }

         logDebug(s"Stream ${receivedBlockInfo.streamId} received " +

           s"block ${receivedBlockInfo.blockStoreResult.blockId}")

       } else {

         logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +

           s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

       }

       writeResult

     } catch {

       case NonFatal(e) =>

         logError(s"Error adding block $receivedBlockInfo", e)

         false

     }

    }

    writeToLog方法就是进行WAL的操作,看writeToLog的代码

    private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

     if (isWriteAheadLogEnabled) {

       logTrace(s"Writing record: $record")

       try {

         writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

           clock.getTimeMillis())

         true

       } catch {

         case NonFatal(e) =>

           logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)

           false

       }

     } else {

       true

     }

    }

    首先判断是否开启了WAL,根据一下isWriteAheadLogEnabled值

    private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

    接着看writeAheadLogOption

    private val writeAheadLogOption = createWriteAheadLog()

    再看createWriteAheadLog()方法

    private def createWriteAheadLog(): Option[WriteAheadLog] = {

     checkpointDirOption.map { checkpointDir =>

       val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

       WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)

     }

    }

    根据checkpoint的配置,获取checkpoint的目录,这里可以看出,checkpoint可以有多个目录。
    写完WAL才将receivedBlockInfo放到内存队列getReceivedBlockQueue中

    ·  第二、看ReceivedBlockTracker的allocateBlocksToBatch方法,代码如下

    def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

     if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {

       val streamIdToBlocks = streamIds.map { streamId =>

           (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))

       }.toMap

       val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

         lastAllocatedBatchTime = batchTime

       } else {

         logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

       }

     } else {

       // This situation occurs when:

       // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

       // possibly processed batch job or half-processed batch job need to be processed again,

       // so the batchTime will be equal to lastAllocatedBatchTime.

       // 2. Slow checkpointing makes recovered batch time older than WAL recovered

       // lastAllocatedBatchTime.

       // This situation will only occurs in recovery time.

       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

     }

    }

    首先从getReceivedBlockQueue中获取每一个receiver的ReceivedBlockQueue队列赋值给streamIdToBlocks,然后包装一下

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

    allocatedBlocks就是根据时间获取的一批元数据,交给对应batchDuration的job,job在执行的时候就可以使用,在使用前先进行WAL,如果job出错恢复后,可以知道数据计算到什么位置

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

       if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

         timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

         lastAllocatedBatchTime = batchTime

       } else {

         logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

    }

    ·  第三、看cleanupOldBatches方法,cleanupOldBatches的功能是从内存中清楚不用的batches元数据,再删除WAL的数据,再删除之前把要删除的batches信息也进行WAL

    def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

     require(cleanupThreshTime.milliseconds < clock.getTimeMillis())

     val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq

     logInfo("Deleting batches " + timesToCleanup)

     if (writeToLog(BatchCleanupEvent(timesToCleanup))) {

       timeToAllocatedBlocks --= timesToCleanup

       writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))

     } else {

       logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")

     }

    }

    ·  总结一下上面的三种WAL,对应下面的三种事件,这就是ReceiverTracker的容错

    /** Trait representing any event in the ReceivedBlockTracker that updates its state. */

    private[streaming] sealed trait ReceivedBlockTrackerLogEvent

    private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)

    extends ReceivedBlockTrackerLogEvent

    private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)

    extends ReceivedBlockTrackerLogEvent

    private[streaming] case class BatchCleanupEvent(times: Seq[Time])  extends ReceivedBlockTrackerLogEvent

    ·  看一下Dstream.graph和JobGenerator的容错,从开始

    private def generateJobs(time: Time) {

    SparkEnv has been removed.

     SparkEnv.set(ssc.env)

     Try {

     

       // allocate received blocks to batch

       // 分配接收到的数据给batch

       jobScheduler.receiverTracker.allocateBlocksToBatch(time)

       // 使用分配的块生成jobs

       graph.generateJobs(time) // generate jobs using allocated block

     } match {

       case Success(jobs) =>

         // 获取元数据信息

         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

         // 提交jobSet

         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

       case Failure(e) =>

         jobScheduler.reportError("Error generating jobs for time " + time, e)

     }

     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

    }

    jobs生成完成后发送DoCheckpoint消息,最终调用doCheckpoint方法,代码如下

    private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

     if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

       logInfo("Checkpointing graph for time " + time)

       ssc.graph.updateCheckpointData(time)

       checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

     }

    }

     

到此,关于“Driver容错安全性怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI