Fix broken tests

This commit is contained in:
divanik 2024-11-25 14:36:09 +00:00
parent e05b64064e
commit deb3751a3a
2 changed files with 39 additions and 27 deletions

View File

@ -270,31 +270,11 @@ bool IcebergSchemaProcessor::allowPrimitiveTypeConversion(const String & old_typ
return allowed_type_conversion;
}
std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id)
// Ids are passed only for error logging purposes
std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id)
{
std::lock_guard lock(mutex);
Poco::JSON::Object::Ptr old_schema, new_schema;
auto required_transform_dag_it = transform_dags_by_ids.find({old_id, new_id});
if (required_transform_dag_it != transform_dags_by_ids.end())
{
return required_transform_dag_it->second;
}
if (old_id == new_id)
{
return nullptr;
}
auto old_schema_it = iceberg_table_schemas_by_ids.find(old_id);
if (old_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", old_id);
}
auto new_schema_it = iceberg_table_schemas_by_ids.find(new_id);
if (new_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id);
}
std::unordered_map<size_t, std::pair<Poco::JSON::Object::Ptr, const ActionsDAG::Node *>> old_schema_entries;
auto old_schema_fields = old_schema->get("fields").extract<Poco::JSON::Array::Ptr>();
std::shared_ptr<ActionsDAG> dag = std::make_shared<ActionsDAG>();
@ -325,7 +305,8 @@ std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformatio
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Schema evolution is not supported for complex types yet, field id is {}, old schema id is {}, new schema id is {}",
"Schema evolution is not supported for complex types yet, field id is {}, old schema id is {}, new schema id "
"is {}",
id,
old_id,
new_id);
@ -370,7 +351,8 @@ std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformatio
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Adding a default column with id {} and complex type is not supported yet. Old schema id is {}, new schema id is {}",
"Adding a default column with id {} and complex type is not supported yet. Old schema id is {}, new schema id is "
"{}",
id,
old_id,
new_id);
@ -391,7 +373,34 @@ std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformatio
outputs.push_back(&constant);
}
}
return transform_dags_by_ids[{old_id, new_id}] = dag;
return dag;
}
std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id)
{
if (old_id == new_id)
{
return nullptr;
}
std::lock_guard lock(mutex);
auto required_transform_dag_it = transform_dags_by_ids.find({old_id, new_id});
if (required_transform_dag_it != transform_dags_by_ids.end())
{
return required_transform_dag_it->second;
}
auto old_schema_it = iceberg_table_schemas_by_ids.find(old_id);
if (old_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", old_id);
}
auto new_schema_it = iceberg_table_schemas_by_ids.find(new_id);
if (new_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id);
}
return transform_dags_by_ids[{old_id, new_id}]
= getSchemaTransformationDag(old_schema_it->second, new_schema_it->second, old_id, new_id);
}
void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr)

View File

@ -99,6 +99,9 @@ private:
bool allowPrimitiveTypeConversion(const String & old_type, const String & new_type);
const Node * getDefaultNodeForField(const Poco::JSON::Object::Ptr & field);
std::shared_ptr<ActionsDAG> getSchemaTransformationDag(
const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id);
std::mutex mutex;
};