From afea67590141a4b897cf58b55afdaaee13a38a54 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Fri, 25 Oct 2024 16:38:26 -0600 Subject: [PATCH] Add setting to query Iceberg tables as of a specific timestamp --- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + .../DataLakes/IcebergMetadata.cpp | 86 +++++++++++---- .../ObjectStorage/DataLakes/IcebergMetadata.h | 4 +- .../integration/test_storage_iceberg/test.py | 101 +++++++++++++++++- 5 files changed, 172 insertions(+), 23 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index ac6b68cb6a1..9f84f8deff0 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5519,6 +5519,9 @@ Allow to ignore schema evolution in Iceberg table engine and read all data using :::note Enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema. ::: +)", 0) \ + DECLARE(Int64, iceberg_query_at_timestamp_ms, 0, R"( +Query Iceberg table using the snapshot that was current at a specific timestamp. )", 0) \ DECLARE(Bool, allow_deprecated_error_prone_window_functions, false, R"( Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 108d3eb71fe..90f0194a657 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -60,6 +60,7 @@ static std::initializer_list parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution) +NamesAndTypesList parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, int schema_id, bool ignore_schema_evolution) { Poco::JSON::Object::Ptr schema; - Int32 current_schema_id; /// First check if schema was evolved, because we don't support it yet. /// For version 2 we can check it by using field schemas, but in version 1 @@ -259,7 +259,6 @@ std::pair parseTableSchema(const Poco::JSON::Object::P /// (we will compare schema id from manifest file and currently used schema). if (format_version == 2) { - current_schema_id = metadata_object->getValue("current-schema-id"); auto schemas = metadata_object->get("schemas").extract(); if (schemas->size() == 0) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty"); @@ -271,7 +270,7 @@ std::pair parseTableSchema(const Poco::JSON::Object::P for (uint32_t i = 0; i != schemas->size(); ++i) { auto current_schema = schemas->getObject(i); - if (current_schema->getValue("schema-id") == current_schema_id) + if (current_schema->getValue("schema-id") == schema_id) { schema = current_schema; break; @@ -289,16 +288,15 @@ std::pair parseTableSchema(const Poco::JSON::Object::P "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); - /// Now we sure that there is only one schema. + /// Now we are sure that there is only one schema. schema = schemas->getObject(0); - if (schema->getValue("schema-id") != current_schema_id) + if (schema->getValue("schema-id") != schema_id) throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); } } else { schema = metadata_object->getObject("schema"); - current_schema_id = schema->getValue("schema-id"); /// Field "schemas" is optional for version 1, but after version 2 was introduced, /// in most cases this field is added for new tables in version 1 as well. if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract()->size() > 1) @@ -318,7 +316,7 @@ std::pair parseTableSchema(const Poco::JSON::Object::P names_and_types.push_back({name, getFieldType(field, "type", required)}); } - return {std::move(names_and_types), current_schema_id}; + return names_and_types; } MutableColumns parseAvro( @@ -400,26 +398,72 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file Poco::Dynamic::Var json = parser.parse(json_str); - const Poco::JSON::Object::Ptr & object = json.extract(); + const Poco::JSON::Object::Ptr & metadata = json.extract(); - auto format_version = object->getValue("format-version"); - auto [schema, schema_id] - = parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]); - - auto current_snapshot_id = object->getValue("current-snapshot-id"); - auto snapshots = object->get("snapshots").extract(); + auto format_version = metadata->getValue("format-version"); + auto snapshots = metadata->get("snapshots").extract(); String manifest_list_file; - for (size_t i = 0; i < snapshots->size(); ++i) + Int32 schema_id = -1; + + // Use the current snapshot (by default) or find the appropriate snapshot based on timestamp + Int64 query_timestamp = local_context->getSettingsRef()[Setting::iceberg_query_at_timestamp_ms]; + if (query_timestamp == 0) { - const auto snapshot = snapshots->getObject(static_cast(i)); - if (snapshot->getValue("snapshot-id") == current_snapshot_id) + // Use current snapshot + auto current_snapshot_id = metadata->getValue("current-snapshot-id"); + for (size_t i = 0; i < snapshots->size(); ++i) { - const auto path = snapshot->getValue("manifest-list"); - manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename(); - break; + const auto snapshot = snapshots->getObject(static_cast(i)); + if (snapshot->getValue("snapshot-id") == current_snapshot_id) + { + const auto path = snapshot->getValue("manifest-list"); + manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename(); + schema_id = snapshot->getValue("schema-id"); + break; + } + } + + if (manifest_list_file.empty() || schema_id == -1) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Current snapshot not found" + ); } } + else + { + // Find the most recent snapshot at or before the query timestamp + Int64 closest_timestamp = 0; + for (size_t i = 0; i < snapshots->size(); ++i) + { + const auto snapshot = snapshots->getObject(static_cast(i)); + Int64 snapshot_timestamp = snapshot->getValue("timestamp-ms"); + + // The spec doesn't say these have to be ordered, so we do an exhaustive search just to be safe + if (snapshot_timestamp <= query_timestamp && snapshot_timestamp > closest_timestamp) + { + closest_timestamp = snapshot_timestamp; + const auto path = snapshot->getValue("manifest-list"); + manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename(); + schema_id = snapshot->getValue("schema-id"); + } + } + + if (manifest_list_file.empty() || schema_id == -1) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "No Iceberg snapshot found at or before timestamp {}", + query_timestamp + ); + } + } + + chassert(schema_id == metadata->getValue("current-schema-id")); + + auto schema = parseTableSchema(metadata, format_version, schema_id, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]); return std::make_unique( object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema); diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h index eb5cac591f2..d5ff13543f7 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h @@ -91,12 +91,14 @@ public: bool operator ==(const IDataLakeMetadata & other) const override { const auto * iceberg_metadata = dynamic_cast(&other); - return iceberg_metadata && getVersion() == iceberg_metadata->getVersion(); + return iceberg_metadata && getManifestListFile() == iceberg_metadata->getManifestListFile(); } static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context); private: + const std::string& getManifestListFile() const { return manifest_list_file; } + size_t getVersion() const { return metadata_version; } const ObjectStoragePtr object_storage; diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 36aba550dbd..5b06d7d24f1 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -4,7 +4,7 @@ import logging import os import time import uuid -from datetime import datetime +from datetime import datetime, timezone import pyspark import pytest @@ -424,6 +424,105 @@ def test_multiple_iceberg_files(started_cluster, format_version, storage_type): ) +@pytest.mark.parametrize("format_version", ["1", "2"]) +@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"]) +def test_iceberg_snapshot_reads(started_cluster, format_version, storage_type): + if is_arm() and storage_type == "hdfs": + pytest.skip("Disabled test IcebergHDFS for aarch64") + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = ( + "test_iceberg_snapshot_reads" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) + + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100), + TABLE_NAME, + mode="overwrite", + format_version=format_version, + ) + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + snapshot1_timestamp = datetime.now(timezone.utc) + time.sleep(0.1) + + write_iceberg_from_df( + spark, + generate_data(spark, 100, 200), + TABLE_NAME, + mode="append", + format_version=format_version, + ) + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + snapshot2_timestamp = datetime.now(timezone.utc) + time.sleep(0.1) + + write_iceberg_from_df( + spark, + generate_data(spark, 200, 300), + TABLE_NAME, + mode="append", + format_version=format_version, + ) + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", + ) + snapshot3_timestamp = datetime.now(timezone.utc) + + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 300 + assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY 1") == instance.query( + "SELECT number, toString(number + 1) FROM numbers(300)" + ) + + # Validate that each snapshot timestamp only sees the data inserted by that time. + assert ( + instance.query( + f""" + SELECT * FROM {TABLE_NAME} ORDER BY 1 + SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot1_timestamp.timestamp() * 1000)}""" + ) + == instance.query("SELECT number, toString(number + 1) FROM numbers(100)") + ) + + assert ( + instance.query( + f""" + SELECT * FROM {TABLE_NAME} ORDER BY 1 + SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot2_timestamp.timestamp() * 1000)}""" + ) + == instance.query("SELECT number, toString(number + 1) FROM numbers(200)") + ) + + assert ( + instance.query( + f"""SELECT * FROM {TABLE_NAME} ORDER BY 1 + SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot3_timestamp.timestamp() * 1000)}""" + ) + == instance.query("SELECT number, toString(number + 1) FROM numbers(300)") + ) + + @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"]) def test_types(started_cluster, format_version, storage_type):