From deb3751a3a607459a9296c84c472ece1c33af986 Mon Sep 17 00:00:00 2001 From: divanik Date: Mon, 25 Nov 2024 14:36:09 +0000 Subject: [PATCH] Fix broken tests --- .../DataLakes/IcebergMetadata.cpp | 63 +++++++++++-------- .../ObjectStorage/DataLakes/IcebergMetadata.h | 3 + 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp index c63a61860df..36f901575f9 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp @@ -270,31 +270,11 @@ bool IcebergSchemaProcessor::allowPrimitiveTypeConversion(const String & old_typ return allowed_type_conversion; } -std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id) + +// Ids are passed only for error logging purposes +std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDag( + const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id) { - std::lock_guard lock(mutex); - Poco::JSON::Object::Ptr old_schema, new_schema; - auto required_transform_dag_it = transform_dags_by_ids.find({old_id, new_id}); - if (required_transform_dag_it != transform_dags_by_ids.end()) - { - return required_transform_dag_it->second; - } - - if (old_id == new_id) - { - return nullptr; - } - - auto old_schema_it = iceberg_table_schemas_by_ids.find(old_id); - if (old_schema_it == iceberg_table_schemas_by_ids.end()) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", old_id); - } - auto new_schema_it = iceberg_table_schemas_by_ids.find(new_id); - if (new_schema_it == iceberg_table_schemas_by_ids.end()) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id); - } std::unordered_map> old_schema_entries; auto old_schema_fields = old_schema->get("fields").extract(); std::shared_ptr dag = std::make_shared(); @@ -325,7 +305,8 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformatio { throw Exception( ErrorCodes::UNSUPPORTED_METHOD, - "Schema evolution is not supported for complex types yet, field id is {}, old schema id is {}, new schema id is {}", + "Schema evolution is not supported for complex types yet, field id is {}, old schema id is {}, new schema id " + "is {}", id, old_id, new_id); @@ -370,7 +351,8 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformatio { throw Exception( ErrorCodes::UNSUPPORTED_METHOD, - "Adding a default column with id {} and complex type is not supported yet. Old schema id is {}, new schema id is {}", + "Adding a default column with id {} and complex type is not supported yet. Old schema id is {}, new schema id is " + "{}", id, old_id, new_id); @@ -391,7 +373,34 @@ std::shared_ptr IcebergSchemaProcessor::getSchemaTransformatio outputs.push_back(&constant); } } - return transform_dags_by_ids[{old_id, new_id}] = dag; + return dag; +} + +std::shared_ptr IcebergSchemaProcessor::getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id) +{ + if (old_id == new_id) + { + return nullptr; + } + std::lock_guard lock(mutex); + auto required_transform_dag_it = transform_dags_by_ids.find({old_id, new_id}); + if (required_transform_dag_it != transform_dags_by_ids.end()) + { + return required_transform_dag_it->second; + } + + auto old_schema_it = iceberg_table_schemas_by_ids.find(old_id); + if (old_schema_it == iceberg_table_schemas_by_ids.end()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", old_id); + } + auto new_schema_it = iceberg_table_schemas_by_ids.find(new_id); + if (new_schema_it == iceberg_table_schemas_by_ids.end()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id); + } + return transform_dags_by_ids[{old_id, new_id}] + = getSchemaTransformationDag(old_schema_it->second, new_schema_it->second, old_id, new_id); } void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr) diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h index 4bb55adf9da..fb5a7800228 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h @@ -99,6 +99,9 @@ private: bool allowPrimitiveTypeConversion(const String & old_type, const String & new_type); const Node * getDefaultNodeForField(const Poco::JSON::Object::Ptr & field); + std::shared_ptr getSchemaTransformationDag( + const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id); + std::mutex mutex; };