diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp index f300714ea23..b3341702408 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp @@ -57,7 +57,8 @@ IcebergMetadata::IcebergMetadata( Int32 metadata_version_, Int32 format_version_, String manifest_list_file_, - Int32 current_schema_id_, + Int32 schema_id_, + Int64 snapshot_id_, DB::NamesAndTypesList schema_) : WithContext(context_) , object_storage(object_storage_) @@ -65,7 +66,8 @@ IcebergMetadata::IcebergMetadata( , metadata_version(metadata_version_) , format_version(format_version_) , manifest_list_file(std::move(manifest_list_file_)) - , current_schema_id(current_schema_id_) + , schema_id(schema_id_) + , snapshot_id(snapshot_id_) , schema(std::move(schema_)) , log(getLogger("IcebergMetadata")) { @@ -266,7 +268,7 @@ NamesAndTypesList parseTableSchema(const Poco::JSON::Object::Ptr & metadata_obje if (ignore_schema_evolution) { /// If we ignore schema evolution, we will just use latest schema for all data files. - /// Find schema with 'schema-id' equal to 'current_schema_id'. + /// Find schema with the correct 'schema_id'. for (uint32_t i = 0; i != schemas->size(); ++i) { auto current_schema = schemas->getObject(i); @@ -405,17 +407,18 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt String manifest_list_file; Int32 schema_id = -1; + Int64 snapshot_id = -1; // Use the current snapshot (by default) or find the appropriate snapshot based on timestamp Int64 query_timestamp = local_context->getSettingsRef()[Setting::iceberg_query_at_timestamp_ms]; if (query_timestamp == 0) { // Use current snapshot - auto current_snapshot_id = metadata->getValue("current-snapshot-id"); + snapshot_id = metadata->getValue("current-snapshot-id"); for (size_t i = 0; i < snapshots->size(); ++i) { const auto snapshot = snapshots->getObject(static_cast(i)); - if (snapshot->getValue("snapshot-id") == current_snapshot_id) + if (snapshot->getValue("snapshot-id") == snapshot_id) { const auto path = snapshot->getValue("manifest-list"); manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename(); @@ -424,7 +427,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt } } - if (manifest_list_file.empty() || schema_id == -1) + if (manifest_list_file.empty() || schema_id == -1 || snapshot_id == -1) { throw Exception( ErrorCodes::BAD_ARGUMENTS, @@ -448,10 +451,11 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt const auto path = snapshot->getValue("manifest-list"); manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename(); schema_id = snapshot->getValue("schema-id"); + snapshot_id = snapshot->getValue("snapshot-id"); } } - if (manifest_list_file.empty() || schema_id == -1) + if (manifest_list_file.empty() || schema_id == -1 || snapshot_id == -1) { throw Exception( ErrorCodes::BAD_ARGUMENTS, @@ -466,7 +470,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt auto schema = parseTableSchema(metadata, format_version, schema_id, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]); return std::make_unique( - object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema); + object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, snapshot_id, schema); } /** @@ -551,7 +555,7 @@ Strings IcebergMetadata::getDataFiles() const Poco::Dynamic::Var json = parser.parse(schema_json_string); Poco::JSON::Object::Ptr schema_object = json.extract(); if (!context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution] - && schema_object->getValue("schema-id") != current_schema_id) + && schema_object->getValue("schema-id") != schema_id) throw Exception( ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h index d5ff13543f7..ee4f7463355 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h @@ -74,7 +74,8 @@ public: Int32 metadata_version_, Int32 format_version_, String manifest_list_file_, - Int32 current_schema_id_, + Int32 schema_id_, + Int64 snapshot_id_, NamesAndTypesList schema_); /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. @@ -91,22 +92,21 @@ public: bool operator ==(const IDataLakeMetadata & other) const override { const auto * iceberg_metadata = dynamic_cast(&other); - return iceberg_metadata && getManifestListFile() == iceberg_metadata->getManifestListFile(); + return iceberg_metadata && getVersion() == iceberg_metadata->getVersion(); } static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context); private: - const std::string& getManifestListFile() const { return manifest_list_file; } - - size_t getVersion() const { return metadata_version; } + std::tuple getVersion() const { return std::make_tuple(metadata_version, schema_id, snapshot_id); } const ObjectStoragePtr object_storage; const ConfigurationObserverPtr configuration; Int32 metadata_version; Int32 format_version; String manifest_list_file; - Int32 current_schema_id; + Int32 schema_id; + Int64 snapshot_id; NamesAndTypesList schema; mutable Strings data_files; std::unordered_map column_name_to_physical_name;