«

checkpoint机制如何实现

时间:2024-3-4 11:55     作者:韩俊     分类: Java


这篇文章主要讲解了“checkpoint机制如何实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“checkpoint机制如何实现”吧!

checkpoint 机制的具体实现

我们都知道为了优化分布式存储系统中 NameNode 的重启性能,我们引进了 checkpoint 机制和 FsImage 快照,使得 FsImage 和 editslog 共同为系统元数据提供持久化功能。

BackNode 节点冷备份

NameNode 的主要工作时维护系统中文件元数据,并实现其持久化;在每执行一个操作之后,NameNode 都要生成一个 editslog,最后刷盘(但是不是每生成一条数据就刷盘一次)。

从这我们可以发现,NameNode 进程,它需要额外分配出来一个线程,后台线程定时的去进行磁盘IO的操作,其实这个是很影响本地 CPU 负载的;另外,假设这时候来了很多操作请求,那么系统中将有大量的线程用来来更新内存的文件目录树,这时候肯定是要加锁的了。此时如果系统还要每隔一段时间,耗费比如说几秒钟,甚至几分钟的时间来对文件目录树进行加锁,读取数据,写入本地磁盘;这样就会导致更新文件目录树,和读取文件目录树写入磁盘,它们之间会产生巨大的锁的冲突。

如果上述所有操作都在 NameNode 上执行的话,就太影响 NameNode 节点的性能了。

为此,我们需要考虑给系统中增加一个角色——BackNode,其实它的功能就有点像 HDFS 中的 SecondaryNameNode。

BackNode 是充当于 NameNode 的一个冷备份的角色,我们可以将 checkpoint 的操作交给其来执行,这样就可以减轻 NameNode 这边的性能消耗了。

checkpoint 的实现

BackNode 在启动的时候会启动一个 checkpoint 的调度任务:

// 调度任务:fsImageCheckpointer
defaultScheduler.schedule("FSImage Checkpoint操作", fsImageCheckpointer,
        backupnodeConfig.getCheckpointInterval(), backupnodeConfig.getCheckpointInterval(), TimeUnit.MILLISECONDS);

这是一个定时任务,每隔一段时间就会被执行一次。

下面我们一起来看看 checkpoint 任务具体需要做些什么?

  • 判断当前系统中的 txid 和上一次 checkpoint 时的是否一致,不一致才继续执行

  • 根据当前系统中的数据生成 FsImage

  • 处理掉旧的 FsImage

具体代码如下:

/**
 * checkpoint 任务
 */
@Override
public void run() {
    log.info("BackupNode启动checkpoint后台线程.");
    try {
        // 如果是正在恢复元数据,则直接返回
        if (nameSystem.isRecovering()) {
            log.info("正在恢复元数据...");
            return;
        }
        // 当前 maxid 和 之前记录的 maxid 相等
        if (nameSystem.getMaxTxId() == lastCheckpointTxId) {
            log.info("EditLog和上次没有变化,不进行checkpoint: [txId={}]", lastCheckpointTxId);
            return;
        }
        // 以下讨论的情况是:当前 maxid 和 之前记录的 maxid 不相等(大于)
        // 对当前内存中的数据生成快照
        FsImage fsImage = nameSystem.getFsImage();
        // 更新记录中的 maxid
        lastCheckpointTxId = fsImage.getMaxTxId();
        // 路径
        String fsImageFile = backupNodeConfig.getFsImageFile(String.valueOf(System.currentTimeMillis()));
        log.info("开始执行checkpoint操作: [maxTxId={}]", fsImage.getMaxTxId());
        // 写入FsImage文件
        doCheckpoint(fsImage, fsImageFile);
        // 上传 FsImage 给 NameNode
        uploadFsImage(fsImageFile);
        // 删除旧的FSImage
        namenodeClient.getDefaultScheduler().scheduleOnce("删除FSImage任务", fsImageClearTask, 0);
    } catch (Exception e) {
        log.error("FSImageCheckPointer error:", e);
    }
}

标签: java

热门推荐