Add setting to query Iceberg tables as of a specific timestamp

This commit is contained in:
Brett Hoerner 2024-10-25 16:38:26 -06:00
parent e00e06b162
commit afea675901
5 changed files with 172 additions and 23 deletions

View File

@ -5519,6 +5519,9 @@ Allow to ignore schema evolution in Iceberg table engine and read all data using
:::note
Enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema.
:::
)", 0) \
DECLARE(Int64, iceberg_query_at_timestamp_ms, 0, R"(
Query Iceberg table using the snapshot that was current at a specific timestamp.
)", 0) \
DECLARE(Bool, allow_deprecated_error_prone_window_functions, false, R"(
Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)

View File

@ -60,6 +60,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{
{"24.12",
{
{"iceberg_query_at_timestamp_ms", 0, 0, "New setting."}
}
},
{"24.11",

View File

@ -39,6 +39,7 @@ namespace DB
namespace Setting
{
extern const SettingsBool iceberg_engine_ignore_schema_evolution;
extern const SettingsInt64 iceberg_query_at_timestamp_ms;
}
namespace ErrorCodes
@ -248,10 +249,9 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
}
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
NamesAndTypesList parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, int schema_id, bool ignore_schema_evolution)
{
Poco::JSON::Object::Ptr schema;
Int32 current_schema_id;
/// First check if schema was evolved, because we don't support it yet.
/// For version 2 we can check it by using field schemas, but in version 1
@ -259,7 +259,6 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
/// (we will compare schema id from manifest file and currently used schema).
if (format_version == 2)
{
current_schema_id = metadata_object->getValue<int>("current-schema-id");
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
if (schemas->size() == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty");
@ -271,7 +270,7 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
for (uint32_t i = 0; i != schemas->size(); ++i)
{
auto current_schema = schemas->getObject(i);
if (current_schema->getValue<int>("schema-id") == current_schema_id)
if (current_schema->getValue<int>("schema-id") == schema_id)
{
schema = current_schema;
break;
@ -289,16 +288,15 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
/// Now we sure that there is only one schema.
/// Now we are sure that there is only one schema.
schema = schemas->getObject(0);
if (schema->getValue<int>("schema-id") != current_schema_id)
if (schema->getValue<int>("schema-id") != schema_id)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
}
}
else
{
schema = metadata_object->getObject("schema");
current_schema_id = schema->getValue<int>("schema-id");
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well.
if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
@ -318,7 +316,7 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
names_and_types.push_back({name, getFieldType(field, "type", required)});
}
return {std::move(names_and_types), current_schema_id};
return names_and_types;
}
MutableColumns parseAvro(
@ -400,26 +398,72 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
const Poco::JSON::Object::Ptr & metadata = json.extract<Poco::JSON::Object::Ptr>();
auto format_version = object->getValue<int>("format-version");
auto [schema, schema_id]
= parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
auto format_version = metadata->getValue<int>("format-version");
auto snapshots = metadata->get("snapshots").extract<Poco::JSON::Array::Ptr>();
String manifest_list_file;
for (size_t i = 0; i < snapshots->size(); ++i)
Int32 schema_id = -1;
// Use the current snapshot (by default) or find the appropriate snapshot based on timestamp
Int64 query_timestamp = local_context->getSettingsRef()[Setting::iceberg_query_at_timestamp_ms];
if (query_timestamp == 0)
{
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
// Use current snapshot
auto current_snapshot_id = metadata->getValue<Int64>("current-snapshot-id");
for (size_t i = 0; i < snapshots->size(); ++i)
{
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
break;
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
{
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
schema_id = snapshot->getValue<Int32>("schema-id");
break;
}
}
if (manifest_list_file.empty() || schema_id == -1)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Current snapshot not found"
);
}
}
else
{
// Find the most recent snapshot at or before the query timestamp
Int64 closest_timestamp = 0;
for (size_t i = 0; i < snapshots->size(); ++i)
{
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
Int64 snapshot_timestamp = snapshot->getValue<Int64>("timestamp-ms");
// The spec doesn't say these have to be ordered, so we do an exhaustive search just to be safe
if (snapshot_timestamp <= query_timestamp && snapshot_timestamp > closest_timestamp)
{
closest_timestamp = snapshot_timestamp;
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
schema_id = snapshot->getValue<Int32>("schema-id");
}
}
if (manifest_list_file.empty() || schema_id == -1)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"No Iceberg snapshot found at or before timestamp {}",
query_timestamp
);
}
}
chassert(schema_id == metadata->getValue<int>("current-schema-id"));
auto schema = parseTableSchema(metadata, format_version, schema_id, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
return std::make_unique<IcebergMetadata>(
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema);

View File

@ -91,12 +91,14 @@ public:
bool operator ==(const IDataLakeMetadata & other) const override
{
const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other);
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
return iceberg_metadata && getManifestListFile() == iceberg_metadata->getManifestListFile();
}
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
private:
const std::string& getManifestListFile() const { return manifest_list_file; }
size_t getVersion() const { return metadata_version; }
const ObjectStoragePtr object_storage;

View File

@ -4,7 +4,7 @@ import logging
import os
import time
import uuid
from datetime import datetime
from datetime import datetime, timezone
import pyspark
import pytest
@ -424,6 +424,105 @@ def test_multiple_iceberg_files(started_cluster, format_version, storage_type):
)
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_iceberg_snapshot_reads(started_cluster, format_version, storage_type):
if is_arm() and storage_type == "hdfs":
pytest.skip("Disabled test IcebergHDFS for aarch64")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_iceberg_snapshot_reads"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode="overwrite",
format_version=format_version,
)
default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
snapshot1_timestamp = datetime.now(timezone.utc)
time.sleep(0.1)
write_iceberg_from_df(
spark,
generate_data(spark, 100, 200),
TABLE_NAME,
mode="append",
format_version=format_version,
)
default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
snapshot2_timestamp = datetime.now(timezone.utc)
time.sleep(0.1)
write_iceberg_from_df(
spark,
generate_data(spark, 200, 300),
TABLE_NAME,
mode="append",
format_version=format_version,
)
default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
snapshot3_timestamp = datetime.now(timezone.utc)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY 1") == instance.query(
"SELECT number, toString(number + 1) FROM numbers(300)"
)
# Validate that each snapshot timestamp only sees the data inserted by that time.
assert (
instance.query(
f"""
SELECT * FROM {TABLE_NAME} ORDER BY 1
SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot1_timestamp.timestamp() * 1000)}"""
)
== instance.query("SELECT number, toString(number + 1) FROM numbers(100)")
)
assert (
instance.query(
f"""
SELECT * FROM {TABLE_NAME} ORDER BY 1
SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot2_timestamp.timestamp() * 1000)}"""
)
== instance.query("SELECT number, toString(number + 1) FROM numbers(200)")
)
assert (
instance.query(
f"""SELECT * FROM {TABLE_NAME} ORDER BY 1
SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot3_timestamp.timestamp() * 1000)}"""
)
== instance.query("SELECT number, toString(number + 1) FROM numbers(300)")
)
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_types(started_cluster, format_version, storage_type):