Allow to ignore schema evolution in Iceberg table engine under a setting

This commit is contained in:
avogar 2024-01-23 17:45:41 +00:00
parent aa5e4b418b
commit be3b5dc45f
5 changed files with 60 additions and 14 deletions

View File

@ -5197,3 +5197,7 @@ The value 0 means that you can delete all tables without any restrictions.
:::note
This query setting overwrites its server setting equivalent, see [max_table_size_to_drop](/docs/en/operations/server-configuration-parameters/settings.md/#max-table-size-to-drop)
:::
## iceberg_engine_ignore_schema_evolution {#iceberg_engine_ignore_schema_evolution}
Allow to ignore schema evolution in Iceberg table engine and read all data using latest schema saved on storage creation.

View File

@ -856,7 +856,8 @@ class IColumn;
M(UInt64, cache_warmer_threads, 4, "Only available in ClickHouse Cloud", 0) \
M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
M(Bool, enable_order_by_all, true, "Enable sorting expression ORDER BY ALL.", 0)\
M(Bool, enable_order_by_all, true, "Enable sorting expression ORDER BY ALL.", 0) \
M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.

View File

@ -98,7 +98,8 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"max_rows_in_set_to_optimize_join", 100000, 0, "Disable join optimization as it prevents from read in order optimization"},
{"output_format_pretty_color", true, "auto", "Setting is changed to allow also for auto value, disabling ANSI escapes if output is not a tty"},
{"function_visible_width_behavior", 0, 1, "We changed the default behavior of `visibleWidth` to be more precise"},
{"max_estimated_execution_time", 0, 0, "Separate max_execution_time and max_estimated_execution_time"}}},
{"max_estimated_execution_time", 0, 0, "Separate max_execution_time and max_estimated_execution_time"},
{"iceberg_engine_ignore_schema_evolution", false, false, "Allow to ignore schema evolution in Iceberg table engine"}}},
{"23.12", {{"allow_suspicious_ttl_expressions", true, false, "It is a new setting, and in previous versions the behavior was equivalent to allowing."},
{"input_format_parquet_allow_missing_columns", false, true, "Allow missing columns in Parquet files by default"},
{"input_format_orc_allow_missing_columns", false, true, "Allow missing columns in ORC files by default"},

View File

@ -240,7 +240,7 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
}
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version)
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
{
Poco::JSON::Object::Ptr schema;
Int32 current_schema_id;
@ -253,22 +253,51 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
{
current_schema_id = metadata_object->getValue<int>("current-schema-id");
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
if (schemas->size() == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty");
if (ignore_schema_evolution)
{
/// If we ignore schema evolution, we will just use latest schema for all data files.
/// Find schema with 'schema-id' equal to 'current_schema_id'.
for (uint32_t i = 0; i != schemas->size(); ++i)
{
auto current_schema = schemas->getObject(i);
if (current_schema->getValue<int>("schema-id") == current_schema_id)
{
schema = current_schema;
break;
}
}
if (!schema)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
}
else
{
if (schemas->size() != 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
/// Now we sure that there is only one schema.
schema = schemas->getObject(0);
if (schema->getValue<int>("schema-id") != current_schema_id)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
}
}
else
{
schema = metadata_object->getObject("schema");
current_schema_id = schema->getValue<int>("schema-id");
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well.
if (metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
}
NamesAndTypesList names_and_types;
@ -356,7 +385,7 @@ std::unique_ptr<IcebergMetadata> parseIcebergMetadata(const StorageS3::Configura
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
auto format_version = object->getValue<int>("format-version");
auto [schema, schema_id] = parseTableSchema(object, format_version);
auto [schema, schema_id] = parseTableSchema(object, format_version, context_->getSettingsRef().iceberg_engine_ignore_schema_evolution);
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
@ -453,8 +482,12 @@ Strings IcebergMetadata::getDataFiles()
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(schema_json_string);
Poco::JSON::Object::Ptr schema_object = json.extract<Poco::JSON::Object::Ptr>();
if (schema_object->getValue<int>("schema-id") != current_schema_id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
if (!getContext()->getSettingsRef().iceberg_engine_ignore_schema_evolution && schema_object->getValue<int>("schema-id") != current_schema_id)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
avro::NodePtr root_node = manifest_file_reader->dataSchema().root();
size_t leaves_num = root_node->leaves();

View File

@ -399,6 +399,8 @@ def test_evolved_schema(started_cluster, format_version):
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
expected_data = instance.query(f"SELECT * FROM {TABLE_NAME} order by a, b")
spark.sql(f"ALTER TABLE {TABLE_NAME} ADD COLUMNS (x bigint)")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
@ -407,6 +409,11 @@ def test_evolved_schema(started_cluster, format_version):
error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}")
assert "UNSUPPORTED_METHOD" in error
data = instance.query(
f"SELECT * FROM {TABLE_NAME} SETTINGS iceberg_engine_ignore_schema_evolution=1"
)
assert data == expected_data
def test_row_based_deletes(started_cluster):
instance = started_cluster.instances["node1"]