dremio 对于比较大的处理(聚合函数操作可能会触发spill,同时会方法临时文件中),对于比较频繁的job 查询可能会有不少临时文件
TemporaryFolderManager核心是为了进行这些临时文件的管理,包括清理,创建,已经对于临时异常执行器节点的文件处理,对于清理
处理dremio 没有使用自己的开发的那个任务调度,而是简单的基于java ScheduledThreadPoolExecutor 扩展的CloseableSchedulerThreadPool
同时为了方便防止误删,对于临时文件的创建基于了自己的一个格式,以下是一些简单说明

临时文件创建

DefaultTemporaryFolderManager createTmpDirectory 方法

public Path createTmpDirectory(Path rootPath) throws IOException {
    if (closed) {
      throw new IOException("Temporary Folder Manager already closed");
    }
    if (thisExecutor == null || thisExecutor.get() == null) {
      // retain old behaviour
      return rootPath;
    }
    final String prefix = thisExecutor.get().toPrefix(purpose);
    logger.info("Registering path '{}' for temporary file monitoring and cleanup", rootPath.toString());
    List<Path> pathsToDelete = new ArrayList<>();
    fsWrapper.visitDirectory(rootPath, prefix, fileStatus -> {
      if (fileStatus.isDirectory()) {
        try {
          final long incarnation = Long.parseLong(fileStatus.getPath().getName());
          if (incarnation > Instant.EPOCH.toEpochMilli() && incarnation < Instant.now().toEpochMilli()) {
            pathsToDelete.add(fileStatus.getPath());
          } else {
            logger.warn("Not deleting old incarnation {} as it is too recent or invalid", incarnation);
          }
        } catch (NumberFormatException ignored) {
          logger.debug("Ignoring directory {} as it is not a valid incarnation", fileStatus.getPath().getName());
        }
      }
    });
    if (!pathsToDelete.isEmpty()) {
      // 一次性任务,判断最近访问时间以及修改时间与允许的过期时间,默认是90s
      this.oneShotTask = executorService.schedule(() -> doCleanupOneShot(pathsToDelete),
        cleanupConfig.getOneShotCleanupDelaySeconds(), TimeUnit.SECONDS);
    }
    // 创建临时文件加
    final Path newIncarnation = fsWrapper.createTmpDirectory(rootPath, prefix);
    if (folderMonitor != null) {
      // 对于文件进行监控
      folderMonitor.startMonitoring(rootPath);
    }
    return newIncarnation;
  }

临时文件夹格式如下,实际上是 purpose_hostname_port 模式,目的是创建的原因,主要主要是spilling

文件监控处理
DefaultTemporaryFolderManager 构造函数中

if (availableExecutors != null) {
  // if available executors can be monitored, start a background task to clean up on dead executors.
  // 
  this.folderMonitor = new TemporaryFolderMonitor(thisExecutor, availableExecutors, purpose,
    cleanupConfig.getStalenessLimitSeconds(), cleanupConfig.getMinUnhealthyCyclesBeforeDelete(), fsWrapper);
} else {
  this.folderMonitor = null;
}

临时文件清理处理

DefaultTemporaryFolderManager 类中,是由SpillService 调用的

  • 参考处理
public void startMonitoring() {
  if (folderMonitor != null) {
    final int increment = (cleanupConfig.getCleanupDelayMaxVariationSeconds() > 0) ?
      (int) (Instant.now().toEpochMilli() * Thread.currentThread().getId())
        % cleanupConfig.getCleanupDelayMaxVariationSeconds() : 0;
    final int delay = cleanupConfig.getCleanupDelaySeconds() + increment;
    logger.debug("Starting folder monitoring for cleanup with {} seconds as interval", delay);
   // 基于固定频率的调度处理
    this.monitorTask = executorService.scheduleAtFixedRate(folderMonitor::doCleanupOther, delay, delay,
      TimeUnit.SECONDS);
  }
}
  • SpillService 调用
    SpillService start 部分处理的,包含了创建,清理,健康检测
    public void start() throws Exception {
    // TODO: Implement the following:
    // TODO: 1. global pool of compression buffers
    // TODO: 2. pool of I/O completion threads (Note: for local FS only)
    // TODO: 3. create the spill filesystem adapter

     

    for (String spillDir : this.spillDirs) {
      try {
        final Path spillDirPath = new Path(spillDir);
        final FileSystem fileSystem = spillDirPath.getFileSystem(getSpillingConfig());
        healthCheckEnabled = healthCheckEnabled || isHealthCheckEnabled(fileSystem.getUri().getScheme());
      } catch (Exception ignored) {}
    }

     

    // healthySpillDirs set at start()
    this.healthySpillDirs = Lists.newArrayList();
    this.monitoredSpillDirectoryMap = new ConcurrentHashMap<>();
    final Supplier<Set<ExecutorId>> nodesConverter =
      (nodesProvider == null) ? null : () -> convertEndpointsToId(nodesProvider);
    final Supplier<ExecutorId> identityConverter =
      (identityProvider == null) ? null : () -> convertEndpointToId(identityProvider);
    this.folderManager = new DefaultTemporaryFolderManager(identityConverter, getSpillingConfig(), nodesConverter,
      TEMP_FOLDER_PURPOSE);

     

    minDiskSpace = options.minDiskSpace();
    minDiskSpacePercentage = options.minDiskSpacePercentage();
    healthCheckInterval = options.healthCheckInterval();
    healthCheckEnabled = healthCheckEnabled && options.enableHealthCheck();
    spillSweepInterval = options.spillSweepInterval();
    spillSweepThreshold = options.spillSweepThreshold();
    // 清理处理任务
    folderManager.startMonitoring();

     

    // Create spill directories, in case it doesn't already exist
    assert healthySpillDirs.isEmpty();
    for (String spillDir : this.spillDirs) {
      try {
        final Path spillDirPath = new Path(spillDir);
        final FileSystem fileSystem = spillDirPath.getFileSystem(getSpillingConfig());
        if (fileSystem.exists(spillDirPath) || fileSystem.mkdirs(spillDirPath, PERMISSIONS)) {
         // 创建
          monitoredSpillDirectoryMap.put(spillDir, folderManager.createTmpDirectory(spillDirPath));
          if (healthCheckEnabled) {
            healthySpillDirs.add(spillDir);
          }
        } else {
          logger.warn("Unable to find or create spill directory {} due to lack of permissions", spillDir);
        }
      } catch (Exception e) {
        logger.info("Sub directory creation in spill directory {} hit a temporary error `{}` " +
          "and is not added to healthy list. Will monitor periodically", spillDir, e.getMessage());
      }
    }
    // 健康检测,此方法中的数据主要在SpillManager 中使用
    if (healthCheckEnabled) {
      healthCheckTask = schedulerService.get()
        .schedule(Schedule.Builder
            .everyMillis(healthCheckInterval)
            .startingAt(Instant.now())
            .build(),
          new SpillHealthCheckTask()
        );
    }
    }

说明

以上只是一个简单说明,对于临时文件的直接操作是通过SpillManager,SpillManager 会调用SpillService 服务包装的文件操作
(实际上还有一个BoostBufferManager 主要是对于arrow cache 的处理),如下图是一些操作(后边再介绍,里边比较复杂,关联到
不关于流控,以及数据执行的一些细节)

参考资料

services/spill/src/main/java/com/dremio/common/io/TemporaryFolderManager.java
services/spill/src/main/java/com/dremio/common/io/DefaultTemporaryFolderManager.java
services/spill/src/main/java/com/dremio/service/spill/SpillService.java
sabot/kernel/src/main/java/com/dremio/sabot/op/sort/external/SpillManager.java
services/spill/src/main/java/com/dremio/exec/store/LocalSyncableFileSystem.java
services/spill/src/main/java/com/dremio/common/io/TemporaryFolderMonitor.java
common/legacy/src/main/java/com/dremio/common/concurrent/CloseableSchedulerThreadPool.java
services/spill/src/main/java/com/dremio/common/io/ExecutorId.java
sabot/kernel/src/main/java/com/dremio/sabot/op/sort/external/SpillManager.java
sabot/kernel/src/main/java/com/dremio/exec/store/parquet/BoostBufferManager.java

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。