This commit is contained in:
Brett Hoerner 2024-11-20 15:10:04 -08:00 committed by GitHub
commit 6605acfb6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 183 additions and 30 deletions

View File

@ -5537,6 +5537,9 @@ Allow to ignore schema evolution in Iceberg table engine and read all data using
:::note
Enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema.
:::
)", 0) \
DECLARE(Int64, iceberg_query_at_timestamp_ms, 0, R"(
Query Iceberg table using the snapshot that was current at a specific timestamp.
)", 0) \
DECLARE(Bool, allow_deprecated_error_prone_window_functions, false, R"(
Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)

View File

@ -60,6 +60,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{
{"24.12",
{
{"iceberg_query_at_timestamp_ms", 0, 0, "New setting."}
}
},
{"24.11",

View File

@ -39,6 +39,7 @@ namespace DB
namespace Setting
{
extern const SettingsBool iceberg_engine_ignore_schema_evolution;
extern const SettingsInt64 iceberg_query_at_timestamp_ms;
}
namespace ErrorCodes
@ -56,7 +57,8 @@ IcebergMetadata::IcebergMetadata(
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
Int32 schema_id_,
Int64 snapshot_id_,
DB::NamesAndTypesList schema_)
: WithContext(context_)
, object_storage(object_storage_)
@ -64,7 +66,8 @@ IcebergMetadata::IcebergMetadata(
, metadata_version(metadata_version_)
, format_version(format_version_)
, manifest_list_file(std::move(manifest_list_file_))
, current_schema_id(current_schema_id_)
, schema_id(schema_id_)
, snapshot_id(snapshot_id_)
, schema(std::move(schema_))
, log(getLogger("IcebergMetadata"))
{
@ -248,10 +251,9 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
}
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
NamesAndTypesList parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, int schema_id, bool ignore_schema_evolution)
{
Poco::JSON::Object::Ptr schema;
Int32 current_schema_id;
/// First check if schema was evolved, because we don't support it yet.
/// For version 2 we can check it by using field schemas, but in version 1
@ -259,7 +261,6 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
/// (we will compare schema id from manifest file and currently used schema).
if (format_version == 2)
{
current_schema_id = metadata_object->getValue<int>("current-schema-id");
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
if (schemas->size() == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty");
@ -267,11 +268,11 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
if (ignore_schema_evolution)
{
/// If we ignore schema evolution, we will just use latest schema for all data files.
/// Find schema with 'schema-id' equal to 'current_schema_id'.
/// Find schema with the correct 'schema_id'.
for (uint32_t i = 0; i != schemas->size(); ++i)
{
auto current_schema = schemas->getObject(i);
if (current_schema->getValue<int>("schema-id") == current_schema_id)
if (current_schema->getValue<int>("schema-id") == schema_id)
{
schema = current_schema;
break;
@ -289,16 +290,15 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
/// Now we sure that there is only one schema.
/// Now we are sure that there is only one schema.
schema = schemas->getObject(0);
if (schema->getValue<int>("schema-id") != current_schema_id)
if (schema->getValue<int>("schema-id") != schema_id)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
}
}
else
{
schema = metadata_object->getObject("schema");
current_schema_id = schema->getValue<int>("schema-id");
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well.
if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
@ -318,7 +318,7 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
names_and_types.push_back({name, getFieldType(field, "type", required)});
}
return {std::move(names_and_types), current_schema_id};
return names_and_types;
}
MutableColumns parseAvro(
@ -400,29 +400,77 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
const Poco::JSON::Object::Ptr & metadata = json.extract<Poco::JSON::Object::Ptr>();
auto format_version = object->getValue<int>("format-version");
auto [schema, schema_id]
= parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
auto format_version = metadata->getValue<int>("format-version");
auto snapshots = metadata->get("snapshots").extract<Poco::JSON::Array::Ptr>();
String manifest_list_file;
for (size_t i = 0; i < snapshots->size(); ++i)
Int32 schema_id = -1;
Int64 snapshot_id = -1;
// Use the current snapshot (by default) or find the appropriate snapshot based on timestamp
Int64 query_timestamp = local_context->getSettingsRef()[Setting::iceberg_query_at_timestamp_ms];
if (query_timestamp == 0)
{
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
// Use current snapshot
snapshot_id = metadata->getValue<Int64>("current-snapshot-id");
for (size_t i = 0; i < snapshots->size(); ++i)
{
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
break;
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == snapshot_id)
{
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
schema_id = snapshot->getValue<Int32>("schema-id");
break;
}
}
if (manifest_list_file.empty() || schema_id == -1 || snapshot_id == -1)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Current snapshot not found"
);
}
}
else
{
// Find the most recent snapshot at or before the query timestamp
Int64 closest_timestamp = 0;
for (size_t i = 0; i < snapshots->size(); ++i)
{
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
Int64 snapshot_timestamp = snapshot->getValue<Int64>("timestamp-ms");
// The spec doesn't say these have to be ordered, so we do an exhaustive search just to be safe
if (snapshot_timestamp <= query_timestamp && snapshot_timestamp > closest_timestamp)
{
closest_timestamp = snapshot_timestamp;
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
schema_id = snapshot->getValue<Int32>("schema-id");
snapshot_id = snapshot->getValue<Int64>("snapshot-id");
}
}
if (manifest_list_file.empty() || schema_id == -1 || snapshot_id == -1)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"No Iceberg snapshot found at or before timestamp {}",
query_timestamp
);
}
}
chassert(schema_id == metadata->getValue<int>("current-schema-id"));
auto schema = parseTableSchema(metadata, format_version, schema_id, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
return std::make_unique<IcebergMetadata>(
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema);
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, snapshot_id, schema);
}
/**
@ -507,7 +555,7 @@ Strings IcebergMetadata::getDataFiles() const
Poco::Dynamic::Var json = parser.parse(schema_json_string);
Poco::JSON::Object::Ptr schema_object = json.extract<Poco::JSON::Object::Ptr>();
if (!context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]
&& schema_object->getValue<int>("schema-id") != current_schema_id)
&& schema_object->getValue<int>("schema-id") != schema_id)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "

View File

@ -74,7 +74,8 @@ public:
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
Int32 schema_id_,
Int64 snapshot_id_,
NamesAndTypesList schema_);
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
@ -97,14 +98,15 @@ public:
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
private:
size_t getVersion() const { return metadata_version; }
std::tuple<Int32, Int32, Int64> getVersion() const { return std::make_tuple(metadata_version, schema_id, snapshot_id); }
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
Int32 metadata_version;
Int32 format_version;
String manifest_list_file;
Int32 current_schema_id;
Int32 schema_id;
Int64 snapshot_id;
NamesAndTypesList schema;
mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;

View File

@ -4,7 +4,7 @@ import logging
import os
import time
import uuid
from datetime import datetime
from datetime import datetime, timezone
import pyspark
import pytest
@ -471,6 +471,105 @@ def test_multiple_iceberg_files(started_cluster, format_version, storage_type):
)
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_iceberg_snapshot_reads(started_cluster, format_version, storage_type):
if is_arm() and storage_type == "hdfs":
pytest.skip("Disabled test IcebergHDFS for aarch64")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_iceberg_snapshot_reads"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode="overwrite",
format_version=format_version,
)
default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
snapshot1_timestamp = datetime.now(timezone.utc)
time.sleep(0.1)
write_iceberg_from_df(
spark,
generate_data(spark, 100, 200),
TABLE_NAME,
mode="append",
format_version=format_version,
)
default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
snapshot2_timestamp = datetime.now(timezone.utc)
time.sleep(0.1)
write_iceberg_from_df(
spark,
generate_data(spark, 200, 300),
TABLE_NAME,
mode="append",
format_version=format_version,
)
default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
snapshot3_timestamp = datetime.now(timezone.utc)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 300
assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY 1") == instance.query(
"SELECT number, toString(number + 1) FROM numbers(300)"
)
# Validate that each snapshot timestamp only sees the data inserted by that time.
assert (
instance.query(
f"""
SELECT * FROM {TABLE_NAME} ORDER BY 1
SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot1_timestamp.timestamp() * 1000)}"""
)
== instance.query("SELECT number, toString(number + 1) FROM numbers(100)")
)
assert (
instance.query(
f"""
SELECT * FROM {TABLE_NAME} ORDER BY 1
SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot2_timestamp.timestamp() * 1000)}"""
)
== instance.query("SELECT number, toString(number + 1) FROM numbers(200)")
)
assert (
instance.query(
f"""SELECT * FROM {TABLE_NAME} ORDER BY 1
SETTINGS iceberg_query_at_timestamp_ms = {int(snapshot3_timestamp.timestamp() * 1000)}"""
)
== instance.query("SELECT number, toString(number + 1) FROM numbers(300)")
)
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_types(started_cluster, format_version, storage_type):