diff --git a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp index aa960d5003f..e0bb3b28f39 100644 --- a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp +++ b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp @@ -6,8 +6,19 @@ #if USE_AWS_S3 #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include +namespace fs = std::filesystem; + namespace DB { @@ -34,13 +45,57 @@ struct DeltaLakeMetadataParser::Impl * For example: * ./_delta_log/00000000000000000000.json */ - std::vector getMetadataFiles(const Configuration & configuration) - { - /// DeltaLake format stores all metadata json files in _delta_log directory - static constexpr auto deltalake_metadata_directory = "_delta_log"; - static constexpr auto metadata_file_suffix = ".json"; + static constexpr auto deltalake_metadata_directory = "_delta_log"; + static constexpr auto metadata_file_suffix = ".json"; - return MetadataReadHelper::listFiles(configuration, deltalake_metadata_directory, metadata_file_suffix); + std::string withPadding(size_t version) + { + /// File names are zero-padded to 20 digits. + static constexpr auto padding = 20; + + const auto version_str = toString(version); + return std::string(padding - version_str.size(), '0') + version_str; + } + + /** + * A delta file, n.json, contains an atomic set of actions that should be applied to the + * previous table state (n-1.json) in order to the construct nth snapshot of the table. + * An action changes one aspect of the table's state, for example, adding or removing a file. + * Note: it is not a valid json, but a list of json's, so we read it in a while cycle. + */ + std::set processMetadataFiles(const Configuration & configuration, ContextPtr context) + { + std::set result_files; + const auto checkpoint_version = getCheckpointIfExists(result_files, configuration, context); + + if (checkpoint_version) + { + auto current_version = checkpoint_version; + while (true) + { + const auto filename = withPadding(++current_version) + metadata_file_suffix; + const auto file_path = fs::path(configuration.getPath()) / deltalake_metadata_directory / filename; + + if (!MetadataReadHelper::exists(file_path, configuration)) + break; + + processMetadataFile(file_path, result_files, configuration, context); + } + + LOG_TRACE( + log, "Processed metadata files from checkpoint {} to {}", + checkpoint_version, current_version - 1); + } + else + { + const auto keys = MetadataReadHelper::listFiles( + configuration, deltalake_metadata_directory, metadata_file_suffix); + + for (const String & key : keys) + processMetadataFile(key, result_files, configuration, context); + } + + return result_files; } /** @@ -72,60 +127,207 @@ struct DeltaLakeMetadataParser::Impl * \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}} * " */ - void handleJSON(const JSON & json, const String & prefix, std::set & result) + void processMetadataFile( + const String & key, + std::set & result, + const Configuration & configuration, + ContextPtr context) { - if (json.has("add")) + auto buf = MetadataReadHelper::createReadBuffer(key, context, configuration); + + char c; + while (!buf->eof()) { - const auto path = json["add"]["path"].getString(); - const auto [_, inserted] = result.insert(fs::path(prefix) / path); - if (!inserted) - throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path); - } - else if (json.has("remove")) - { - const auto path = json["remove"]["path"].getString(); - const bool erase = result.erase(fs::path(prefix) / path); - if (!erase) - throw Exception(ErrorCodes::INCORRECT_DATA, "File doesn't exist {}", path); + /// May be some invalid characters before json. + while (buf->peek(c) && c != '{') + buf->ignore(); + + if (buf->eof()) + break; + + String json_str; + readJSONObjectPossiblyInvalid(json_str, *buf); + + if (json_str.empty()) + continue; + + const JSON json(json_str); + if (json.has("add")) + { + const auto path = json["add"]["path"].getString(); + const auto [_, inserted] = result.insert(fs::path(configuration.getPath()) / path); + if (!inserted) + throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path); + } + else if (json.has("remove")) + { + const auto path = json["remove"]["path"].getString(); + const bool erase = result.erase(fs::path(configuration.getPath()) / path); + if (!erase) + throw Exception(ErrorCodes::INCORRECT_DATA, "File doesn't exist {}", path); + } } } /** - * A delta file, n.json, contains an atomic set of actions that should be applied to the - * previous table state (n-1.json) in order to the construct nth snapshot of the table. - * An action changes one aspect of the table's state, for example, adding or removing a file. - * Note: it is not a valid json, but a list of json's, so we read it in a while cycle. + * Checkpoints in delta-lake are created each 10 commits by default. + * Latest checkpoint is written in _last_checkpoint file: _delta_log/_last_checkpoint + * + * _last_checkpoint contains the following: + * {"version":20, + * "size":23, + * "sizeInBytes":14057, + * "numOfAddFiles":21, + * "checkpointSchema":{...}} + * + * We need to get "version", which is the version of the checkpoint we need to read. */ - std::set processMetadataFiles(const Configuration & configuration, ContextPtr context) + size_t readLastCheckpointIfExists(const Configuration & configuration, ContextPtr context) { - std::set result; - const auto keys = getMetadataFiles(configuration); - for (const String & key : keys) + const auto last_checkpoint_file = fs::path(configuration.getPath()) / deltalake_metadata_directory / "_last_checkpoint"; + if (!MetadataReadHelper::exists(last_checkpoint_file, configuration)) + return 0; + + String json_str; + auto buf = MetadataReadHelper::createReadBuffer(last_checkpoint_file, context, configuration); + readJSONObjectPossiblyInvalid(json_str, *buf); + + const JSON json(json_str); + const auto version = json["version"].getUInt(); + + LOG_TRACE(log, "Last checkpoint file version: {}", version); + return version; + } + + /** + * The format of the checkpoint file name can take one of two forms: + * 1. A single checkpoint file for version n of the table will be named n.checkpoint.parquet. + * For example: + * 00000000000000000010.checkpoint.parquet + * 2. A multi-part checkpoint for version n can be fragmented into p files. Fragment o of p is + * named n.checkpoint.o.p.parquet. For example: + * 00000000000000000010.checkpoint.0000000001.0000000003.parquet + * 00000000000000000010.checkpoint.0000000002.0000000003.parquet + * 00000000000000000010.checkpoint.0000000003.0000000003.parquet + * TODO: Only (1) is supported, need to support (2). + * + * Such checkpoint files parquet data with the following contents: + * + * Row 1: + * ────── + * txn: (NULL,NULL,NULL) + * add: ('part-00000-1e9cd0c1-57b5-43b4-9ed8-39854287b83a-c000.parquet',{},1070,1680614340485,false,{},'{"numRecords":1,"minValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":13,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"14"},"maxValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":13,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"14"},"nullCount":{"col-360dade5-6d0e-4831-8467-a25d64695975":0,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":0}}') + * remove: (NULL,NULL,NULL,NULL,{},NULL,{}) + * metaData: (NULL,NULL,NULL,(NULL,{}),NULL,[],{},NULL) + * protocol: (NULL,NULL) + * + * Row 2: + * ────── + * txn: (NULL,NULL,NULL) + * add: ('part-00000-8887e898-91dd-4951-a367-48f7eb7bd5fd-c000.parquet',{},1063,1680614318485,false,{},'{"numRecords":1,"minValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":2,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"3"},"maxValues":{"col-360dade5-6d0e-4831-8467-a25d64695975":2,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":"3"},"nullCount":{"col-360dade5-6d0e-4831-8467-a25d64695975":0,"col-e27b0253-569a-4fe1-8f02-f3342c54d08b":0}}') + * remove: (NULL,NULL,NULL,NULL,{},NULL,{}) + * metaData: (NULL,NULL,NULL,(NULL,{}),NULL,[],{},NULL) + * protocol: (NULL,NULL) + * + * ... + */ + #define THROW_ARROW_NOT_OK(status) \ + do \ + { \ + if (const ::arrow::Status & _s = (status); !_s.ok()) \ + throw Exception::createDeprecated(_s.ToString(), ErrorCodes::BAD_ARGUMENTS);\ + } while (false) + + size_t getCheckpointIfExists(std::set & result, const Configuration & configuration, ContextPtr context) + { + const auto version = readLastCheckpointIfExists(configuration, context); + if (!version) + return 0; + + const auto checkpoint_filename = withPadding(version) + ".checkpoint.parquet"; + const auto checkpoint_path = fs::path(configuration.getPath()) / deltalake_metadata_directory / checkpoint_filename; + + LOG_TRACE(log, "Using checkpoint file: {}", checkpoint_path.string()); + + auto buf = MetadataReadHelper::createReadBuffer(checkpoint_path, context, configuration); + auto format_settings = getFormatSettings(context); + + /// Force nullable, because this parquet file for some reason does not have nullable + /// in parquet file metadata while the type are in fact nullable. + format_settings.schema_inference_make_columns_nullable = true; + auto columns = ParquetSchemaReader(*buf, format_settings).readSchema(); + + /// Read only columns that we need. + columns.filter(NameSet{"add", "remove"}); + Block header; + for (const auto & column : columns) + header.insert({column.type->createColumn(), column.type, column.name}); + + std::atomic is_stopped{0}; + auto arrow_file = asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES); + + std::unique_ptr reader; + THROW_ARROW_NOT_OK( + parquet::arrow::OpenFile( + asArrowFile(*buf, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), + arrow::default_memory_pool(), + &reader)); + + std::shared_ptr schema; + THROW_ARROW_NOT_OK(reader->GetSchema(&schema)); + + ArrowColumnToCHColumn column_reader( + header, "Parquet", + format_settings.parquet.import_nested, + format_settings.parquet.allow_missing_columns, + /* null_as_default */true, + format_settings.parquet.case_insensitive_column_matching); + + Chunk res; + std::shared_ptr table; + THROW_ARROW_NOT_OK(reader->ReadTable(&table)); + + column_reader.arrowTableToCHChunk(res, table, reader->parquet_reader()->metadata()->num_rows()); + const auto & res_columns = res.getColumns(); + + if (res_columns.size() != 2) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of columns: {}", res_columns.size()); + + /// Process `add` column. { - auto buf = MetadataReadHelper::createReadBuffer(key, context, configuration); - - char c; - while (!buf->eof()) + const auto * tuple_column = assert_cast(res_columns[0].get()); + const auto & nullable_column = assert_cast(tuple_column->getColumn(0)); + const auto & path_column = assert_cast(nullable_column.getNestedColumn()); + for (size_t i = 0; i < path_column.size(); ++i) { - /// May be some invalid characters before json. - while (buf->peek(c) && c != '{') - buf->ignore(); - - if (buf->eof()) - break; - - String json_str; - readJSONObjectPossiblyInvalid(json_str, *buf); - - if (json_str.empty()) + const auto filename = String(path_column.getDataAt(i)); + if (filename.empty()) continue; - - const JSON json(json_str); - handleJSON(json, configuration.getPath(), result); + LOG_TEST(log, "Adding {}", filename); + const auto [_, inserted] = result.insert(fs::path(configuration.getPath()) / filename); + if (!inserted) + throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", filename); } } - return result; + /// Process `remove` column. + { + const auto * tuple_column = assert_cast(res_columns[1].get()); + const auto & nullable_column = assert_cast(tuple_column->getColumn(0)); + const auto & path_column = assert_cast(nullable_column.getNestedColumn()); + for (size_t i = 0; i < path_column.size(); ++i) + { + const auto filename = String(path_column.getDataAt(i)); + if (filename.empty()) + continue; + LOG_TEST(log, "Removing {}", filename); + result.erase(fs::path(configuration.getPath()) / filename); + } + } + + return version; } + + Poco::Logger * log = &Poco::Logger::get("DeltaLakeMetadataParser"); }; diff --git a/src/Storages/DataLakes/S3MetadataReader.cpp b/src/Storages/DataLakes/S3MetadataReader.cpp index 20ce725e930..f62c440bc2f 100644 --- a/src/Storages/DataLakes/S3MetadataReader.cpp +++ b/src/Storages/DataLakes/S3MetadataReader.cpp @@ -33,6 +33,11 @@ S3DataLakeMetadataReadHelper::createReadBuffer(const String & key, ContextPtr co context->getReadSettings()); } +bool S3DataLakeMetadataReadHelper::exists(const String & key, const StorageS3::Configuration & configuration) +{ + return S3::objectExists(*configuration.client, configuration.url.bucket, key); +} + std::vector S3DataLakeMetadataReadHelper::listFiles( const StorageS3::Configuration & base_configuration, const String & prefix, const String & suffix) { diff --git a/src/Storages/DataLakes/S3MetadataReader.h b/src/Storages/DataLakes/S3MetadataReader.h index f057346efe2..cae7dd1fa3d 100644 --- a/src/Storages/DataLakes/S3MetadataReader.h +++ b/src/Storages/DataLakes/S3MetadataReader.h @@ -16,6 +16,8 @@ struct S3DataLakeMetadataReadHelper static std::shared_ptr createReadBuffer( const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration); + static bool exists(const String & key, const StorageS3::Configuration & configuration); + static std::vector listFiles(const StorageS3::Configuration & configuration, const std::string & prefix = "", const std::string & suffix = ""); }; } diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index 14e3ae2334c..0d11a147f64 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -176,37 +176,76 @@ def test_partition_by(started_cluster): assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 10 -# def test_checkpoint(started_cluster): -# instance = started_cluster.instances["node1"] -# minio_client = started_cluster.minio_client -# bucket = started_cluster.minio_bucket -# spark = get_spark() -# TABLE_NAME = "test_checkpoint" -# -# write_delta_from_df( -# spark, -# generate_data(spark, 0, 1), -# f"/{TABLE_NAME}", -# mode="overwrite", -# ) -# for i in range(1, 25): -# write_delta_from_df( -# spark, -# generate_data(spark, i, i + 1), -# f"/{TABLE_NAME}", -# mode="append", -# ) -# -# files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") -# time.sleep(500) -# ok = false -# for file in files: -# if file.endswith("last_checkpoint"): -# ok = true -# assert ok -# -# create_delta_table(instance, TABLE_NAME) -# assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 16 +def test_checkpoint(started_cluster): + instance = started_cluster.instances["node1"] + minio_client = started_cluster.minio_client + bucket = started_cluster.minio_bucket + spark = get_spark() + TABLE_NAME = "test_checkpoint" + + write_delta_from_df( + spark, + generate_data(spark, 0, 1), + f"/{TABLE_NAME}", + mode="overwrite", + ) + for i in range(1, 25): + write_delta_from_df( + spark, + generate_data(spark, i, i + 1), + f"/{TABLE_NAME}", + mode="append", + ) + files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") + # 25 data files + # 25 metadata files + # 1 last_metadata file + # 2 checkpoints + assert len(files) == 25 * 2 + 3 + + ok = False + for file in files: + if file.endswith("last_checkpoint"): + ok = True + assert ok + + create_delta_table(instance, TABLE_NAME) + assert ( + int( + instance.query( + f"SELECT count() FROM {TABLE_NAME} SETTINGS input_format_parquet_allow_missing_columns=1" + ) + ) + == 25 + ) + + table = DeltaTable.forPath(spark, f"/{TABLE_NAME}") + table.delete("a < 10") + files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 15 + + for i in range(0, 5): + write_delta_from_df( + spark, + generate_data(spark, i, i + 1), + f"/{TABLE_NAME}", + mode="append", + ) + # + 1 metadata files (for delete) + # + 5 data files + # + 5 metadata files + # + 1 checkpoint file + # + 1 ? + files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") + assert len(files) == 53 + 1 + 5 * 2 + 1 + 1 + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 20 + + assert instance.query( + f"SELECT * FROM {TABLE_NAME} ORDER BY 1" + ).strip() == instance.query( + "SELECT number, toString(number + 1) FROM numbers(5) " + "UNION ALL SELECT number, toString(number + 1) FROM numbers(10, 15) ORDER BY 1" + ).strip() def test_multiple_log_files(started_cluster):