From a87dc7a3e24c599254503008110c6e3f11cb8ac1 Mon Sep 17 00:00:00 2001 From: divanik Date: Fri, 22 Nov 2024 13:40:07 +0000 Subject: [PATCH] Add processing of incorrect icebrg 1 format --- .../DataLakes/IcebergMetadata.cpp | 154 ++++++++++++------ 1 file changed, 108 insertions(+), 46 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp index f0a80a41d4e..bed3e78a0de 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp @@ -139,6 +139,7 @@ enum class DataFileContent : uint8_t * } */ + DataTypePtr getSimpleTypeByName(const String & type_name) { if (type_name == "boolean") @@ -248,7 +249,83 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t } -std::pair parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution) +std::pair +parseTableSchemaV2Method(const Poco::JSON::Object::Ptr & metadata_object, bool ignore_schema_evolution) +{ + Poco::JSON::Object::Ptr schema; + if (!metadata_object->has("current-schema-id")) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'current-schema-id' field is missing in metadata"); + auto current_schema_id = metadata_object->getValue("current-schema-id"); + if (!metadata_object->has("schemas")) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schemas' field is missing in metadata"); + auto schemas = metadata_object->get("schemas").extract(); + if (schemas->size() == 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty"); + + if (ignore_schema_evolution) + { + /// If we ignore schema evolution, we will just use latest schema for all data files. + /// Find schema with 'schema-id' equal to 'current_schema_id'. + for (uint32_t i = 0; i != schemas->size(); ++i) + { + auto current_schema = schemas->getObject(i); + if (!current_schema->has("schema-id")) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema-id' field is missing in schema"); + } + if (current_schema->getValue("schema-id") == current_schema_id) + { + schema = current_schema; + break; + } + } + + if (!schema) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)"); + } + else + { + if (schemas->size() == 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty"); + if (schemas->size() > 1) + throw Exception( + ErrorCodes::UNSUPPORTED_METHOD, + "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is " + "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable " + "setting " + "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); + + /// Now we sure that there is only one schema. + schema = schemas->getObject(0); + if (schema->getValue("schema-id") != current_schema_id) + throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); + } + return {schema, current_schema_id}; +} + +std::pair +parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object, bool ignore_schema_evolution) +{ + if (!metadata_object->has("schema")) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema' field is missing in metadata"); + Poco::JSON::Object::Ptr schema = metadata_object->getObject("schema"); + if (!metadata_object->has("schema")) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema-id' field is missing in schema"); + auto current_schema_id = schema->getValue("schema-id"); + if (!ignore_schema_evolution && metadata_object->has("schemas") + && metadata_object->get("schemas").extract()->size() > 1) + throw Exception( + ErrorCodes::UNSUPPORTED_METHOD, + "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " + "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable " + "setting " + "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); + return {schema, current_schema_id}; +} + +std::pair parseTableSchema( + const Poco::JSON::Object::Ptr & metadata_object, LoggerPtr metadata_logger, int format_version, bool ignore_schema_evolution) { Poco::JSON::Object::Ptr schema; Int32 current_schema_id; @@ -259,55 +336,40 @@ std::pair parseTableSchema(const Poco::JSON::Object::P /// (we will compare schema id from manifest file and currently used schema). if (format_version == 2) { - current_schema_id = metadata_object->getValue("current-schema-id"); - auto schemas = metadata_object->get("schemas").extract(); - if (schemas->size() == 0) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty"); - - if (ignore_schema_evolution) - { - /// If we ignore schema evolution, we will just use latest schema for all data files. - /// Find schema with 'schema-id' equal to 'current_schema_id'. - for (uint32_t i = 0; i != schemas->size(); ++i) - { - auto current_schema = schemas->getObject(i); - if (current_schema->getValue("schema-id") == current_schema_id) - { - schema = current_schema; - break; - } - } - - if (!schema) - throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)"); - } - else - { - if (schemas->size() != 1) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, - "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is " - "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " - "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); - - /// Now we sure that there is only one schema. - schema = schemas->getObject(0); - if (schema->getValue("schema-id") != current_schema_id) - throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); - } + std::tie(schema, current_schema_id) = parseTableSchemaV2Method(metadata_object, ignore_schema_evolution); } else { - schema = metadata_object->getObject("schema"); - current_schema_id = schema->getValue("schema-id"); - /// Field "schemas" is optional for version 1, but after version 2 was introduced, - /// in most cases this field is added for new tables in version 1 as well. - if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract()->size() > 1) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, - "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " - "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " - "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); + try + { + std::tie(schema, current_schema_id) = parseTableSchemaV1Method(metadata_object, ignore_schema_evolution); + } + catch (const Exception & first_error) + { + try + { + std::tie(schema, current_schema_id) = parseTableSchemaV2Method(metadata_object, ignore_schema_evolution); + LOG_WARNING( + metadata_logger, + "Iceberg table schema was parsed using v2 specification, but it was impossible to parse it using v1 specification. Be " + "aware that you Iceberg writing Engine violates Iceberg specification. Error during parsing {}", + first_error.displayText()); + } + catch (const Exception & second_error) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Cannot parse Iceberg table schema both with v1 and v2 methods. Old method error: {}. New method error: {}", + first_error.displayText(), + second_error.displayText()); + } + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, e.displayText()); + } + } } + NamesAndTypesList names_and_types; auto fields = schema->get("fields").extract(); for (size_t i = 0; i != fields->size(); ++i) @@ -404,7 +466,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt auto format_version = object->getValue("format-version"); auto [schema, schema_id] - = parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]); + = parseTableSchema(object, log, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]); auto current_snapshot_id = object->getValue("current-snapshot-id"); auto snapshots = object->get("snapshots").extract();