欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

Apache Iceberg 小文件合并原理及实践

本文是《Apache Iceberg 入门教程》专题的第 1 篇,共 10 篇:

《一条数据在 Apache Iceberg 之旅:写过程分析》 这篇文章中我们分析了 Apache Iceberg 写数据的源码。如下是我们使用 Spark 写两次数据到 Iceberg 表的数据目录布局(测试代码在 这里):

/data/hive/warehouse/default.db/iteblog
├── data
│   └── ts_year=2020
│       ├── id_bucket=0
│       │   ├── 00000-0-19603f5a-d38a-4106-aeb9-47285d15a6bd-00001.parquet
│       │   ├── 00000-0-491c447b-e05f-40ac-8c32-44d5ef39353b-00001.parquet
│       │   ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00002.parquet
│       │   └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00002.parquet
│       └── id_bucket=1
│           ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00001.parquet
│           └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00001.parquet
└── metadata
    ├── 00000-0934ba0e-8ed9-48e3-9db2-25dca1f896f2.metadata.json
    ├── 00001-454ee707-dbc0-4fd8-8c64-0910d8d6315b.metadata.json
    ├── 00002-7bdd0e6b-ec2b-4b1a-b355-6a81a3f5a0ae.metadata.json
    ├── 2170dbc3-334e-41ad-ac2c-68dd7470f001-m0.avro
    ├── 22e1674c-71ea-4d84-8982-276c3370d5c0-m0.avro
    ├── snap-5353330338887146702-1-2170dbc3-334e-41ad-ac2c-68dd7470f001.avro
    └── snap-6683043578305788250-1-22e1674c-71ea-4d84-8982-276c3370d5c0.avro

5 directories, 13 files

因为我们每次写入的数据就几条,Iceberg 每个分区写文件的时候都是产生新的文件,这就导致底层文件系统里面产生了很多大小才几KB的文件。如果我们是使用 Spark Streaming 的方式7*24小时不断地往 Apache Iceberg 里面写数据,这将产生大量的小文件。

使用 Iceberg 来压缩文件

值得高兴的是,Apache Iceberg 给我们提供了相关 Actions API 来合并这些小文件,具体如下:

Configuration conf = new Configuration();
conf.set(METASTOREURIS.varname, "thrift://localhost:9083");

Map<String, String> maps = Maps.newHashMap();
maps.put("path", "default.iteblog");
DataSourceOptions options = new DataSourceOptions(maps);

Table table = findTable(options, conf);

SparkSession.builder()
        .master("local[2]")
        .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
        .config("spark.hadoop." + METASTOREURIS.varname, "thrift://localhost:9083")
        .config("spark.executor.heartbeatInterval", "100000")
        .config("spark.network.timeoutInterval", "100000")
        .enableHiveSupport()
        .getOrCreate();

Actions.forTable(table).rewriteDataFiles()
        .targetSizeInBytes(10 * 1024) // 10KB
        .execute();

运行完上面代码之后,可以将 Iceberg 的小文件进行合并,得到的新数据目录如下:

⇒  tree /data/hive/warehouse/default.db/iteblog
/data/hive/warehouse/default.db/iteblog
├── data
│   └── ts_year=2020
│       ├── id_bucket=0
│       │   ├── 00000-0-19603f5a-d38a-4106-aeb9-47285d15a6bd-00001.parquet
│       │   ├── 00000-0-491c447b-e05f-40ac-8c32-44d5ef39353b-00001.parquet
│       │   ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00002.parquet
│       │   ├── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00002.parquet
│       │   └── 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet
│       └── id_bucket=1
│           ├── 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet
│           ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00001.parquet
│           └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00001.parquet
└── metadata
    ├── 00000-0934ba0e-8ed9-48e3-9db2-25dca1f896f2.metadata.json
    ├── 00001-454ee707-dbc0-4fd8-8c64-0910d8d6315b.metadata.json
    ├── 00002-7bdd0e6b-ec2b-4b1a-b355-6a81a3f5a0ae.metadata.json
    ├── 00003-d987d15f-2c7c-427c-849e-b8842d77d28e.metadata.json
    ├── 2170dbc3-334e-41ad-ac2c-68dd7470f001-m0.avro
    ├── 22e1674c-71ea-4d84-8982-276c3370d5c0-m0.avro
    ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m0.avro
    ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m1.avro
    ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m2.avro
    ├── snap-3634417817414108593-1-25126b97-5a87-42b7-b45a-499aa41e7359.avro
    ├── snap-5353330338887146702-1-2170dbc3-334e-41ad-ac2c-68dd7470f001.avro
    └── snap-6683043578305788250-1-22e1674c-71ea-4d84-8982-276c3370d5c0.avro

5 directories, 20 files

对比最新的结果可以得出:

  • ts_year=2020/id_bucket=0 新增了名为 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet 的数据文件,这个其实就是把之前四个文件进行和合并得到的新文件;
  • ts_year=2020/id_bucket=1 新增了名为 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet 的数据文件,这个其实就是把之前两个文件进行和合并得到的新文件。

Iceberg 小文件合并原理

Iceberg 小文件合并是在 org.apache.iceberg.actions.RewriteDataFilesAction 类里面实现的。小文件合并其实是通过 Spark 并行计算的,这也就是上面 DEMO 初始化了一个 SparkSession 的原因。我们可以通过 RewriteDataFilesAction 类的 targetSizeInBytes 方法来设置输出的合并文件大小。

注意:最终合并的文件并不是都小于或等于 targetSizeInBytes,甚至​会出现文件根本没合并的情况。​

当我们调用了 execute() 方法,RewriteDataFilesAction 类会先创建出一个 org.apache.iceberg.DataTableScan,然后会把对应表的最新快照(Snapshot)拿出来,紧接着拿出这个快照对应的底层所有数据文件。然后按照分区 Key 进行分组(group),同一个分区的文件放到一起,并将这些信息放到 Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks 的结果里面,groupedTasks 的 Key 就是分区信息,如果表不是分区表,那就是空分区;groupedTasks 的 value 就是对应分区底下的文件列表。

由于分区里面可能存在一个文件,这时候就没必要去执行文件合并,这时候可以去掉这部分分区,得到了 Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks。如果 filteredGroupedTasks 里面没有需要合并的分区那就直接返回了。

如果 filteredGroupedTasks 不为空,则对每个分区里面的文件进行 split 和 combine 操作,如下:

// Split and combine tasks under each partition
List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
        .map(scanTasks -> {
          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
        })
        .flatMap(Streams::stream)
        .collect(Collectors.toList());

combinedScanTasks 结构如下:

Apache iceberg write path
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

combinedScanTasks 里面其实就是封装了 BaseCombinedScanTask 类,这个类里面的 task 就是标识哪些 Iceberg 的数据文件需要合并到新文件里面。得到 combinedScanTasks 之后会构造出一个 RDD:

JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());

然后最终会调用 taskRDD 的 map 方法,遍历 combinedScanTasks 里面的 task,将 task 里面对应的 Iceberg 读出来,再写到新文件里面:

public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
    JavaRDD<TaskResult> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);

    return taskCommitRDD.collect().stream()
        .flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
        .collect(Collectors.toList());
}

rewriteDataForTask 的实现如下:

private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
    TaskContext context = TaskContext.get();
    int partitionId = context.partitionId();
    long taskId = context.taskAttemptId();

    RowDataReader dataReader = new RowDataReader(
        task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);

    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
        properties, schema, SparkSchemaUtil.convert(schema));
    OutputFileFactory fileFactory = new OutputFileFactory(
        spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);

    BaseWriter writer;
    if (spec.fields().isEmpty()) {
      writer = new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE);
    } else {
      writer = new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema);
    }

    try {
      while (dataReader.next()) {
        InternalRow row = dataReader.get();
        writer.write(row);
      }

      dataReader.close();
      dataReader = null;
      return writer.complete();

    } catch (Throwable originalThrowable) {
    ......
    }
}

rewriteDataForTasks 执行完会返回新创建文件的路径,最后会写到新的快照里面。在快照里面会将新建的文件表示为 org.apache.iceberg.ManifestEntry.Status#ADDED,上一个快照里面的文件标记为 org.apache.iceberg.ManifestEntry.Status#DELETED。

为什么有些情况下文件并没有合并或者拆成 targetSizeInBytes 大小?

假设我们运行小文件合并程序之前文件的分布如下:

-rw-r--r--  1 iteblog  wheel   1.1M 11 24 10:14 00000-0-690008c5-2389-4cc3-9a06-9ac3c93a7b2d-00001.parquet
-rw-r--r--  1 iteblog  wheel    11M 11 24 10:15 00000-0-9784820b-2a40-4957-817a-9cf28b6bd84c-00001.parquet
-rw-r--r--  1 iteblog  wheel   1.1M 11 24 10:14 00001-1-5e6f32e2-157d-4416-acf3-0111e1ddd150-00001.parquet
-rw-r--r--  1 iteblog  wheel    11M 11 24 10:15 00001-1-d7aa2425-94e6-4a54-a873-f4351df3f497-00001.parquet

我们的压缩程序如下:

Actions.forTable(table).rewriteDataFiles()
        .targetSizeInBytes(5 * 1024 * 1024) // 5M
        .execute();

其实运行完上面的程序,底层的数据文件大小还是分布为 1.1MB、11MB、1.1MB 以及 11MB?这里我主要介绍一下为什么 11MB 这两个文件为什么没有被拆分成两个 5MB以及一个1MB的文件。

其实在程序里面确实生成了8个分区信息如下:

Apache iceberg write path
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众号:iteblog_hadoop

如上,targetSizeInBytes = 5242880,正好就是 5MB,filteredGroupedTasks 里面也正好是我们上面四个需要压缩的文件。combinedScanTasks 也正好是我们按照 targetSizeInBytes 拆出来的8个分片。但其实分片1、2、4以及5其实并没有读取底层 Parquet 的文件。我们以分片以为例进行介绍。

从图中可以看出,分片1的数据如下:

BaseCombinedScanTask{tasks=SplitScanTask{len=5242880, offset=0, fileScanTask=BaseFileScanTask{file=/data/hive/warehouse/default.db/iteblog1/data/ts_year=2020/id_bucket=0/00000-0-9784820b-2a40-4957-817a-9cf28b6bd84c-00001.parquet, partition_data=PartitionData{ts_year=50, id_bucket=0}, residual=true}}}

对应的是 00000-0-9784820b-2a40-4957-817a-9cf28b6bd84c-00001.parquet 文件从偏移量为0,长度为5242880的数据。

在上面介绍的 dataReader.next() 这行中,Iceberg 会先打开对应分片的文件,如下:

public boolean next() throws IOException {
    while (true) {
      if (currentIterator.hasNext()) {
        this.current = currentIterator.next();
        return true;
      } else if (tasks.hasNext()) {
        this.currentIterator.close();
        this.currentIterator = open(tasks.next());
      } else {
        return false;
      }
    }
}

也就是上面的 open(tasks.next()),这个行代码的最底层会调用 ParquetFileReader.open(ParquetIO.file(file), options);,其中 options 里面就包含了分片的起始和长度信息。ParquetFileReader 的初始化如下:

  public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
    this.converter = new ParquetMetadataConverter(options);
    this.file = file;
    this.f = file.newStream();
    this.options = options;
    try {
      this.footer = readFooter(file, options, f, converter);
    } catch (Exception e) {
      // In case that reading footer throws an exception in the constructor, the new stream
      // should be closed. Otherwise, there's no way to close this outside.
      f.close();
      throw e;
    }
    this.fileMetaData = footer.getFileMetaData();
    this.blocks = filterRowGroups(footer.getBlocks());
    this.blockIndexStores = listWithNulls(this.blocks.size());
    this.blockRowRanges = listWithNulls(this.blocks.size());
    for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
      paths.put(ColumnPath.get(col.getPath()), col);
    }
    this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
  }

注意上面的 readFooter 调用,在 readFooter 里面最终会调用到 converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter());,readParquetMetadata 里面会调用到下面代码:

@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
        return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
}

filterFileMetaDataByMidpoint 的实现如下:

static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
    List<RowGroup> rowGroups = metaData.getRow_groups();
    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
    for (RowGroup rowGroup : rowGroups) {
      long totalSize = 0;
      long startIndex = getOffset(rowGroup.getColumns().get(0));
      for (ColumnChunk col : rowGroup.getColumns()) {
        totalSize += col.getMeta_data().getTotal_compressed_size();
      }
      long midPoint = startIndex + totalSize / 2;
      if (filter.contains(midPoint)) {
        newRowGroups.add(rowGroup);
      }
    }
    metaData.setRow_groups(newRowGroups);
    return metaData;
}

大家注意了,RangeMetadataFilter filter 其实就是我们分片的范围信息,在我们这个例子中,filter = (start:0, end:5242880)。上面会取到 Parquet 的 rowGroups,然后判断没个 rowGroup 的中点(midPoint)是不是在 filter 范围里面,如果是的话那么这个 rowGroup 就是由这个分区处理;如果不是,则对应分区就相当于空跑。

我们再回到 ReadConf 的初始化:

ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
           Function<MessageType, ParquetValueReader<?>> readerFunc, Function<MessageType,
           VectorizedReader<?>> batchedReaderFunc, NameMapping nameMapping, boolean reuseContainers,
           boolean caseSensitive, Integer bSize) {
    this.file = file;
    this.options = options;
    this.reader = newReader(file, options);

    .....

    this.rowGroups = reader.getRowGroups();
    this.shouldSkip = new boolean[rowGroups.size()];

    .....

    long computedTotalValues = 0L;
    for (int i = 0; i < shouldSkip.length; i += 1) {
      BlockMetaData rowGroup = rowGroups.get(i);
      boolean shouldRead = filter == null || (
          statsFilter.shouldRead(typeWithIds, rowGroup) &&
              dictFilter.shouldRead(typeWithIds, rowGroup, reader.getDictionaryReader(rowGroup)));
      this.shouldSkip[i] = !shouldRead;
      if (shouldRead) {
        computedTotalValues += rowGroup.getRowCount();
      }
    }

    .....
    
    this.totalValues = computedTotalValues;
    .....
}

可以看到 this.reader = newReader(file, options); 其实就是上面介绍的 ParquetFileReader 初始化,reader.getRowGroups(); 就是在 ParquetFileReader 初始化过程中调用 filterFileMetaDataByMidpoint 计算到的 rowGroups 列表。我们当前分片filter = (start:0, end:5242880)其实并不包含对应 rowGroup 的中点,所以 rowGroups 为空列表。这就导致 shouldSkip 初始化为长度为0的数组,其实那个 for 循环没有跑。这就导致 computedTotalValues 变量的值为0。

我们再回到 dataReader.next() 的实现,其实现打开对应分片的文件,然后初始化 currentIterator,初始化完 currentIterator 紧接着会调用 currentIterator.hasNext(),下面就是这个 hasNext 的实现:

@Override
 public boolean hasNext() {
      return valuesRead < totalValues;
}

其中 totalValues 就是初始化 ReadConf 时由 computedTotalValues 变量决定的。因为在这个例子中,computedTotalValues 是为0的,所以导致 valuesRead < totalValues 结果为 false,自然就读不到数据了。到这里,我们已经完全明白了为啥有些分片并没有读到数据,进而导致文件并没有按照 targetSizeInBytes 大小进行拆分。好了,本文就分享到这里。

本博客文章除特别声明,全部都是原创!
转载本文请加上:转载自过往记忆(https://www.iteblog.com/)
本文链接: 【Apache Iceberg 小文件合并原理及实践】(https://www.iteblog.com/archives/9896.html)
喜欢 (3)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!