背景
本文主要是具体说说Flink中的clean操作的实现
杂说闲谈
在flink中主要是CleanFunction
函数:
@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();String instantTime = HoodieActiveTimeline.createNewInstantTime();LOG.info(String.format("exec clean with instant time %s...", instantTime));executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");}@Overridepublic void notifyCheckpointComplete(long l) throws Exception {if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {executor.execute(() -> {try {this.writeClient.waitForCleaningFinish();} finally {// ensure to switch the isCleaning flagthis.isCleaning = false;}}, "wait for cleaning finish");}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {try {this.writeClient.startAsyncCleaning();this.isCleaning = true;} catch (Throwable throwable) {// catch the exception to not affect the normal checkpointingLOG.warn("Error while start async cleaning", throwable);}}}
-
open函数
-
writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext())
创建FlinkWriteClient,用于写hudi数据 -
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
创建一个只有一个线程的线程池,改线程池的主要作用来异步执行hudi写操作 -
executor.execute(() -> writeClient.clean(instantTime)
异步执行hudi的清理操作,该clean函数的主要代码如下:if (!tableServicesEnabled(config)) {return null;}final Timer.Context timerContext = metrics.getCleanCtx();CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));HoodieTable table = createTable(config, hadoopConf);if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {LOG.info("Cleaner started");// proceed only if multiple clean schedules are enabled or if there are no pending cleans.if (scheduleInline) {scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);table.getMetaClient().reloadActiveTimeline();}}// Proceeds to execute any requested or inflight clean instances in the timelineHoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking);if (timerContext != null && metadata != null) {long durationMs = metrics.getDurationInMs(timerContext.stop());metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()+ " cleanerElapsedMs" + durationMs);}return metadata;
-
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.CLEAN_ACTION,() -> rollbackFailedWrites *
根据配置hoodie.cleaner.policy.failed.writes* 默认是EAGER,也就是在写数据失败的时候,会立即进行这次写失败的数据的清理,在这种情况下,
就不会执行rollbackFailedWrites操作,也就是回滚写失败文件的操作 -
HoodieTable table = createTable *
创建HoodieFlinkMergeOnReadTable*类型的hudi表,用来做clean等操作 -
scheduleTableServiceInternal
如果hoodie.clean.allow.multiple为true(默认为true)或者没有正在运行中clean操作,则会生成Clean计划
这里最终调用的是FlinkWriteClient.scheduleCleaning方法,即CleanPlanActionExecutor.execute方法这里最重要的就是requestClean方法:
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config); Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain(); List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant) int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism).stream().collect(Collectors.toMap(Pair::getKey, Pair::getValue)) Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue()))) List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey).collect(Collectors.toList()) return new HoodieCleanerPlan(earliestInstant.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),planner.getLastCompletedCommitTimestamp(),config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete)
- planner.getEarliestCommitToRetain();
根据保留策略,获取到最早需要保留的commit的HoodieInstant,在这里会兼顾考虑到hoodie.cleaner.commits.retained(默认是10)以及hoodie.cleaner.hours.retained默认是24小时以及hoodie.cleaner.policy策略(默认是KEEP_LATEST_COMMITS) - planner.getPartitionPathsToClean(earliestInstant);
根据保留的最新commit的HoodieInstant,得到要删除的分区,这里会根据配置hoodie.cleaner.incremental.mode(默认是true)来进行增量清理,
这个时候就会根据上一次已经clean的信息,只需要删除差量的分区数据就行 - cleanOpsWithPartitionMeta = context
根据上面得到的需要删除的分区信息,获取需要删除的文件信息,具体的实现可以参考CleanPlanner.getFilesToCleanKeepingLatestCommits
这里的操作主要是先通过fileSystemView获取分区下所有的FileGroup,之后再获取每个FileGroup下的所有的FileSlice(这里的FileSlice就有版本的概念,也就是commit的版本),之后再与最新保留的commit的时间戳进行比较得到需要删除的文件信息 - new HoodieCleanerPlan
最后组装成HoodieCleanPlan的计划,并且在外层调用table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan)); 方法把clean request的状态存储到对应的.hoodie目录下,并建立一个xxxx.clean.requested的元数据文件
- planner.getEarliestCommitToRetain();
-
table.getMetaClient().reloadActiveTimeline()
重新加载timeline,便于过滤出来刚才scheduleTableServiceInternal操作生成的xxxxxxxxxxxxxx.clean.requested的元数据文件 -
table.clean(context, cleanInstantTime, skipLocking)
真正执行clean的部分,主要是调用CleanActionExecutor.execute的方法,最终调用的是*runPendingClean(table, hoodieInstant)*方法:HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);return runClean(table, cleanInstant, cleanerPlan);
首先是反序列化CleanPlan,然后在进行清理,主要是删除1. 如果没有满足的分区,直接删除该分区,2. 否则删除该分区下的满足条件的文件,最后返回HoodieCleanStat包含删除的文件信息等。
-
-
-
snapshotState方法
- 如果clean.async.enabled是true(默认是true),并且不是正在进行clean动作,则会进行异步清理
this.writeClient.startAsyncCleaning(); 这里最终也是调用的writeClient.clean方法。 - this.isCleaning = true;
设置标志位,用来保证clean操作的有序性
- 如果clean.async.enabled是true(默认是true),并且不是正在进行clean动作,则会进行异步清理
-
notifyCheckpointComplete方法
- 如果clean.async.enabled是true(默认是true),并且正在进行clean动作,则等待clean操作完成,
并且设置清理标识位,用来和snapshotState方法进行呼应以保证clean操作的有序性
- 如果clean.async.enabled是true(默认是true),并且正在进行clean动作,则等待clean操作完成,