Add processing of incorrect icebrg 1 format

This commit is contained in:
divanik 2024-11-22 13:40:07 +00:00
parent 59fe7e1951
commit a87dc7a3e2

View File

@ -139,6 +139,7 @@ enum class DataFileContent : uint8_t
* }
*/
DataTypePtr getSimpleTypeByName(const String & type_name)
{
if (type_name == "boolean")
@ -248,18 +249,15 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
}
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
std::pair<Poco::JSON::Object::Ptr, Int32>
parseTableSchemaV2Method(const Poco::JSON::Object::Ptr & metadata_object, bool ignore_schema_evolution)
{
Poco::JSON::Object::Ptr schema;
Int32 current_schema_id;
/// First check if schema was evolved, because we don't support it yet.
/// For version 2 we can check it by using field schemas, but in version 1
/// this field is optional and we will check it later during parsing manifest files
/// (we will compare schema id from manifest file and currently used schema).
if (format_version == 2)
{
current_schema_id = metadata_object->getValue<int>("current-schema-id");
if (!metadata_object->has("current-schema-id"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'current-schema-id' field is missing in metadata");
auto current_schema_id = metadata_object->getValue<int>("current-schema-id");
if (!metadata_object->has("schemas"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schemas' field is missing in metadata");
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
if (schemas->size() == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty");
@ -271,6 +269,10 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
for (uint32_t i = 0; i != schemas->size(); ++i)
{
auto current_schema = schemas->getObject(i);
if (!current_schema->has("schema-id"))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema-id' field is missing in schema");
}
if (current_schema->getValue<int>("schema-id") == current_schema_id)
{
schema = current_schema;
@ -279,14 +281,19 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
}
if (!schema)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
throw Exception(
ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
}
else
{
if (schemas->size() != 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
if (schemas->size() == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty");
if (schemas->size() > 1)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable "
"setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
/// Now we sure that there is only one schema.
@ -294,19 +301,74 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
if (schema->getValue<int>("schema-id") != current_schema_id)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
}
return {schema, current_schema_id};
}
std::pair<Poco::JSON::Object::Ptr, Int32>
parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object, bool ignore_schema_evolution)
{
if (!metadata_object->has("schema"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema' field is missing in metadata");
Poco::JSON::Object::Ptr schema = metadata_object->getObject("schema");
if (!metadata_object->has("schema"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema-id' field is missing in schema");
auto current_schema_id = schema->getValue<int>("schema-id");
if (!ignore_schema_evolution && metadata_object->has("schemas")
&& metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable "
"setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
return {schema, current_schema_id};
}
std::pair<NamesAndTypesList, Int32> parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object, LoggerPtr metadata_logger, int format_version, bool ignore_schema_evolution)
{
Poco::JSON::Object::Ptr schema;
Int32 current_schema_id;
/// First check if schema was evolved, because we don't support it yet.
/// For version 2 we can check it by using field schemas, but in version 1
/// this field is optional and we will check it later during parsing manifest files
/// (we will compare schema id from manifest file and currently used schema).
if (format_version == 2)
{
std::tie(schema, current_schema_id) = parseTableSchemaV2Method(metadata_object, ignore_schema_evolution);
}
else
{
schema = metadata_object->getObject("schema");
current_schema_id = schema->getValue<int>("schema-id");
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well.
if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
try
{
std::tie(schema, current_schema_id) = parseTableSchemaV1Method(metadata_object, ignore_schema_evolution);
}
catch (const Exception & first_error)
{
try
{
std::tie(schema, current_schema_id) = parseTableSchemaV2Method(metadata_object, ignore_schema_evolution);
LOG_WARNING(
metadata_logger,
"Iceberg table schema was parsed using v2 specification, but it was impossible to parse it using v1 specification. Be "
"aware that you Iceberg writing Engine violates Iceberg specification. Error during parsing {}",
first_error.displayText());
}
catch (const Exception & second_error)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot parse Iceberg table schema both with v1 and v2 methods. Old method error: {}. New method error: {}",
first_error.displayText(),
second_error.displayText());
}
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, e.displayText());
}
}
}
NamesAndTypesList names_and_types;
auto fields = schema->get("fields").extract<Poco::JSON::Array::Ptr>();
@ -404,7 +466,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
auto format_version = object->getValue<int>("format-version");
auto [schema, schema_id]
= parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
= parseTableSchema(object, log, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();