From 4892618aa9b922f5191796849851f001a84a1f6e Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 17 Oct 2023 11:42:19 +0000 Subject: [PATCH] Update with master, fix comments --- contrib/avro | 2 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 16 +- .../DataLakes/IcebergMetadataParser.cpp | 361 ------------------ 3 files changed, 7 insertions(+), 372 deletions(-) delete mode 100644 src/Storages/DataLakes/IcebergMetadataParser.cpp diff --git a/contrib/avro b/contrib/avro index 7832659ec98..2fb8a8a6ec0 160000 --- a/contrib/avro +++ b/contrib/avro @@ -1 +1 @@ -Subproject commit 7832659ec986075d560f930c288e973c64679552 +Subproject commit 2fb8a8a6ec0eab9109b68abf3b4857e8c476b918 diff --git a/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp index c0bda44d28c..d10b3c2ceba 100644 --- a/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp @@ -252,17 +252,13 @@ std::pair parseTableSchema(const Poco::JSON::Object::P { current_schema_id = metadata_object->getValue("current-schema-id"); auto schemas = metadata_object->get("schemas").extract(); - if (schemas->size() > 1) + if (schemas->size() != 1) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported"); - for (size_t i = 0; i != schemas->size(); ++i) - { - auto current_schema = schemas->getObject(static_cast(i)); - if (current_schema->getValue("schema-id") == current_schema_id) - { - schema = current_schema; - break; - } - } + + /// Now we sure that there is only one schema. + schema = schemas->getObject(0); + if (schema->getValue("schema-id") != current_schema_id) + throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); } else { diff --git a/src/Storages/DataLakes/IcebergMetadataParser.cpp b/src/Storages/DataLakes/IcebergMetadataParser.cpp deleted file mode 100644 index c4194cd7814..00000000000 --- a/src/Storages/DataLakes/IcebergMetadataParser.cpp +++ /dev/null @@ -1,361 +0,0 @@ -#include "config.h" - -#if USE_AWS_S3 && USE_AVRO - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int FILE_DOESNT_EXIST; - extern const int ILLEGAL_COLUMN; - extern const int BAD_ARGUMENTS; -} - -template -struct IcebergMetadataParser::Impl -{ - Poco::Logger * log = &Poco::Logger::get("IcebergMetadataParser"); - - /** - * Useful links: - * - https://iceberg.apache.org/spec/ - */ - - /** - * Iceberg has two format versions, currently we support only format V1. - * - * Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`. - * Metadata file - json file. - * Manifest list – a file that lists manifest files; one per snapshot. - * Manifest file – a file that lists data or delete files; a subset of a snapshot. - * All changes to table state create a new metadata file and replace the old metadata with an atomic swap. - */ - - static constexpr auto metadata_directory = "metadata"; - - /** - * Each version of table metadata is stored in a `metadata` directory and - * has format: v.metadata.json, where V - metadata version. - */ - String getMetadataFile(const Configuration & configuration) - { - static constexpr auto metadata_file_suffix = ".metadata.json"; - - const auto metadata_files = MetadataReadHelper::listFiles(configuration, metadata_directory, metadata_file_suffix); - if (metadata_files.empty()) - { - throw Exception( - ErrorCodes::FILE_DOESNT_EXIST, - "The metadata file for Iceberg table with path {} doesn't exist", - configuration.url.key); - } - - /// Get the latest version of metadata file: v.metadata.json - return *std::max_element(metadata_files.begin(), metadata_files.end()); - } - - /** - * In order to find out which data files to read, we need to find the `manifest list` - * which corresponds to the latest snapshot. We find it by checking a list of snapshots - * in metadata's "snapshots" section. - * - * Example of metadata.json file. - * - * { - * "format-version" : 1, - * "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5", - * "location" : "/iceberg_data/db/table_name", - * "last-updated-ms" : 1680206743150, - * "last-column-id" : 2, - * "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {}, {}, ... ] }, - * "current-schema-id" : 0, - * "schemas" : [ ], - * ... - * "current-snapshot-id" : 2819310504515118887, - * "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } }, - * "snapshots" : [ { - * "snapshot-id" : 2819310504515118887, - * "timestamp-ms" : 1680206743150, - * "summary" : { - * "operation" : "append", "spark.app.id" : "local-1680206733239", - * "added-data-files" : "1", "added-records" : "100", - * "added-files-size" : "1070", "changed-partition-count" : "1", - * "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0", - * "total-position-deletes" : "0", "total-equality-deletes" : "0" - * }, - * "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro", - * "schema-id" : 0 - * } ], - * "statistics" : [ ], - * "snapshot-log" : [ ... ], - * "metadata-log" : [ ] - * } - */ - struct Metadata - { - int format_version; - String manifest_list; - Strings manifest_files; - }; - Metadata processMetadataFile(const Configuration & configuration, ContextPtr context) - { - const auto metadata_file_path = getMetadataFile(configuration); - auto buf = MetadataReadHelper::createReadBuffer(metadata_file_path, context, configuration); - String json_str; - readJSONObjectPossiblyInvalid(json_str, *buf); - - Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file - Poco::Dynamic::Var json = parser.parse(json_str); - Poco::JSON::Object::Ptr object = json.extract(); - - Metadata result; - result.format_version = object->getValue("format-version"); - - auto current_snapshot_id = object->getValue("current-snapshot-id"); - auto snapshots = object->get("snapshots").extract(); - - for (size_t i = 0; i < snapshots->size(); ++i) - { - const auto snapshot = snapshots->getObject(static_cast(i)); - if (snapshot->getValue("snapshot-id") == current_snapshot_id) - { - const auto path = snapshot->getValue("manifest-list"); - result.manifest_list = std::filesystem::path(configuration.url.key) / metadata_directory / std::filesystem::path(path).filename(); - break; - } - } - return result; - } - - /** - * Manifest list has Avro as default format (and currently we support only Avro). - * Manifest list file format of manifest list is: snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro - * - * `manifest list` has the following contents: - * ┌─manifest_path────────────────────────────────────────────────────────────────────────────────────────┬─manifest_length─┬─partition_spec_id─┬───added_snapshot_id─┬─added_data_files_count─┬─existing_data_files_count─┬─deleted_data_files_count─┬─partitions─┬─added_rows_count─┬─existing_rows_count─┬─deleted_rows_count─┐ - * │ /iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro │ 5813 │ 0 │ 2819310504515118887 │ 1 │ 0 │ 0 │ [] │ 100 │ 0 │ 0 │ - * └──────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────┴───────────────────┴─────────────────────┴────────────────────────┴───────────────────────────┴──────────────────────────┴────────────┴──────────────────┴─────────────────────┴────────────────────┘ - */ - void processManifestList(Metadata & metadata, const Configuration & configuration, ContextPtr context) - { - auto buf = MetadataReadHelper::createReadBuffer(metadata.manifest_list, context, configuration); - auto file_reader = std::make_unique(std::make_unique(*buf)); - - auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0)); - Block header{{data_type->createColumn(), data_type, "manifest_path"}}; - auto columns = parseAvro(*file_reader, header, getFormatSettings(context)); - auto & col = columns.at(0); - - if (col->getDataType() != TypeIndex::String) - { - throw Exception( - ErrorCodes::ILLEGAL_COLUMN, - "The parsed column from Avro file of `manifest_path` field should be String type, got {}", - col->getFamilyName()); - } - - const auto * col_str = typeid_cast(col.get()); - for (size_t i = 0; i < col_str->size(); ++i) - { - const auto file_path = col_str->getDataAt(i).toView(); - const auto filename = std::filesystem::path(file_path).filename(); - metadata.manifest_files.emplace_back(std::filesystem::path(configuration.url.key) / metadata_directory / filename); - } - } - - /** - * Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro' - * - * `manifest file` is different in format version V1 and V2 and has the following contents: - * v1 v2 - * status req req - * snapshot_id req opt - * sequence_number opt - * file_sequence_number opt - * data_file req req - * Example format version V1: - * ┌─status─┬─────────snapshot_id─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ - * │ 1 │ 2819310504515118887 │ ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) │ - * └────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ - * Example format version V2: - * ┌─status─┬─────────snapshot_id─┬─sequence_number─┬─file_sequence_number─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ - * │ 1 │ 5887006101709926452 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) │ - * └────────┴─────────────────────┴─────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ - * In case of partitioned data we'll have extra directory partition=value: - * ─status─┬─────────snapshot_id─┬─data_file──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ - * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=0/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00001.parquet','PARQUET',(0),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],NULL,[4],0) │ - * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=1/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00002.parquet','PARQUET',(1),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'2')],[(1,'\0\0\0\0\0\0\0'),(2,'2')],NULL,[4],0) │ - * │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0) │ - * └────────┴─────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ - */ - Strings getFilesForRead(const Metadata & metadata, const Configuration & configuration, ContextPtr context) - { - NameSet keys; - for (const auto & manifest_file : metadata.manifest_files) - { - auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration); - auto file_reader = std::make_unique(std::make_unique(*buffer)); - - avro::NodePtr root_node = file_reader->dataSchema().root(); - size_t leaves_num = root_node->leaves(); - size_t expected_min_num = metadata.format_version == 1 ? 3 : 2; - if (leaves_num < expected_min_num) - { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Unexpected number of columns {}. Expected at least {}", - root_node->leaves(), expected_min_num); - } - - avro::NodePtr status_node = root_node->leafAt(0); - if (status_node->type() != avro::Type::AVRO_INT) - { - throw Exception( - ErrorCodes::ILLEGAL_COLUMN, - "The parsed column from Avro file of `status` field should be Int type, got {}", - magic_enum::enum_name(status_node->type())); - } - - avro::NodePtr data_file_node = root_node->leafAt(static_cast(leaves_num) - 1); - if (data_file_node->type() != avro::Type::AVRO_RECORD) - { - throw Exception( - ErrorCodes::ILLEGAL_COLUMN, - "The parsed column from Avro file of `data_file` field should be Tuple type, got {}", - magic_enum::enum_name(data_file_node->type())); - } - - auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node); - auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node); - Block header{ - {status_col_data_type->createColumn(), status_col_data_type, "status"}, - {data_col_data_type->createColumn(), data_col_data_type, "data_file"}}; - - const auto columns = parseAvro(*file_reader, header, getFormatSettings(context)); - if (columns.size() != 2) - { - throw Exception(ErrorCodes::ILLEGAL_COLUMN, - "Unexpected number of columns. Expected 2, got {}", columns.size()); - } - - if (columns.at(0)->getDataType() != TypeIndex::Int32) - { - throw Exception(ErrorCodes::ILLEGAL_COLUMN, - "The parsed column from Avro file of `status` field should be Int32 type, got {}", - columns.at(0)->getFamilyName()); - } - if (columns.at(1)->getDataType() != TypeIndex::Tuple) - { - throw Exception(ErrorCodes::ILLEGAL_COLUMN, - "The parsed column from Avro file of `file_path` field should be Tuple type, got {}", - columns.at(1)->getFamilyName()); - } - - const auto status_int_column = assert_cast(columns.at(0).get()); - const auto data_file_tuple_column = assert_cast(columns.at(1).get()); - - if (status_int_column->size() != data_file_tuple_column->size()) - { - throw Exception(ErrorCodes::ILLEGAL_COLUMN, - "The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}", - status_int_column->size(), data_file_tuple_column->size()); - } - - const auto * data_file_name_column = metadata.format_version == 1 - ? data_file_tuple_column->getColumnPtr(0).get() - : data_file_tuple_column->getColumnPtr(1).get(); - - if (data_file_name_column->getDataType() != TypeIndex::String) - { - throw Exception(ErrorCodes::ILLEGAL_COLUMN, - "The parsed column from Avro file of `file_path` field should be String type, got {}", - data_file_name_column->getFamilyName()); - } - auto file_name_str_column = assert_cast(data_file_name_column); - - for (size_t i = 0; i < status_int_column->size(); ++i) - { - const auto status = status_int_column->getInt(i); - const auto data_path = std::string(file_name_str_column->getDataAt(i).toView()); - const auto pos = data_path.find(configuration.url.key); - const auto file_path = data_path.substr(pos); - if (pos == std::string::npos) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path); - - if (status == 2) - { - LOG_TEST(log, "Processing delete file for path: {}", file_path); - chassert(!keys.contains(file_path)); - } - else - keys.insert(file_path); - } - } - - return std::vector(keys.begin(), keys.end()); - } - - MutableColumns parseAvro( - avro::DataFileReaderBase & file_reader, - const Block & header, - const FormatSettings & settings) - { - auto deserializer = std::make_unique(header, file_reader.dataSchema(), true, true, settings); - MutableColumns columns = header.cloneEmptyColumns(); - - file_reader.init(); - RowReadExtension ext; - while (file_reader.hasMore()) - { - file_reader.decr(); - deserializer->deserializeRow(columns, file_reader.decoder(), ext); - } - return columns; - } - -}; - - -template -IcebergMetadataParser::IcebergMetadataParser() : impl(std::make_unique()) -{ -} - -template -Strings IcebergMetadataParser::getFiles(const Configuration & configuration, ContextPtr context) -{ - auto metadata = impl->processMetadataFile(configuration, context); - - /// When table first created and does not have any data - if (metadata.manifest_list.empty()) - return {}; - - impl->processManifestList(metadata, configuration, context); - return impl->getFilesForRead(metadata, configuration, context); -} - - -template IcebergMetadataParser::IcebergMetadataParser(); -template Strings IcebergMetadataParser::getFiles(const StorageS3::Configuration & configuration, ContextPtr); - -} - -#endif