Resolve issues

This commit is contained in:
divanik 2024-11-07 18:35:32 +00:00
parent ba67604364
commit 34da26df32
4 changed files with 56 additions and 61 deletions

View File

@ -47,17 +47,12 @@ public:
BaseStorageConfiguration::update(object_storage, local_context);
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
//Metadata must have been updated to this moment in updateAndGetCurrentSchema method and should be the same here
if (!current_metadata || (*current_metadata != *new_metadata))
{
throw Exception(
ErrorCodes::FORMAT_VERSION_TOO_OLD,
"Storage thinks that actual metadata version is {}, but actual metadata version is {}",
(dynamic_cast<IcebergMetadata *>(current_metadata.get()) != nullptr)
? std::to_string(dynamic_cast<IcebergMetadata *>(current_metadata.get())->getVersion())
: "Absent",
(dynamic_cast<IcebergMetadata *>(new_metadata.get()) != nullptr)
? std::to_string(dynamic_cast<IcebergMetadata *>(new_metadata.get())->getVersion())
: "Absent");
"Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query.");
}
}

View File

@ -119,6 +119,27 @@ bool operator==(const Poco::JSON::Object & first, const Poco::JSON::Object & sec
}
return first_string_stream.str() == second_string_stream.str();
}
Int32 parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor)
{
Int32 format_version = metadata_object->getValue<Int32>("format-version");
if (format_version == 2)
{
auto fields = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i != fields->size(); ++i)
{
auto field = fields->getObject(static_cast<UInt32>(i));
schema_processor.addIcebergTableSchema(field);
}
return metadata_object->getValue<int>("current-schema-id");
}
else
{
auto schema = metadata_object->getObject("schema");
schema_processor.addIcebergTableSchema(schema);
return schema->getValue<Int32>("schema-id");
}
}
}
DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name)
@ -242,27 +263,6 @@ bool IcebergSchemaProcessor::allowPrimitiveTypeConversion(const String & old_typ
return allowed_type_conversion;
}
Int32 parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor)
{
Int32 format_version = metadata_object->getValue<Int32>("format-version");
if (format_version == 2)
{
auto fields = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i != fields->size(); ++i)
{
auto field = fields->getObject(static_cast<UInt32>(i));
schema_processor.addIcebergTableSchema(field);
}
return metadata_object->getValue<int>("current-schema-id");
}
else
{
auto schema = metadata_object->getObject("schema");
schema_processor.addIcebergTableSchema(schema);
return schema->getValue<Int32>("schema-id");
}
}
std::shared_ptr<ActionsDAG>
IcebergSchemaProcessor::getSchemaTransformationDag(const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema)
{
@ -372,31 +372,28 @@ std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformatio
current_new_id = -1;
});
Poco::JSON::Object::Ptr old_schema, new_schema;
if (transform_dags_by_ids.contains({old_id, new_id}))
auto required_transform_dag_it = transform_dags_by_ids.find({old_id, new_id});
if (required_transform_dag_it != transform_dags_by_ids.end())
{
return transform_dags_by_ids.at({old_id, new_id});
}
try
{
old_schema = iceberg_table_schemas_by_ids.at(old_id);
}
catch (std::exception &)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", old_id);
return required_transform_dag_it->second;
}
if (old_id == new_id)
{
return nullptr;
}
try
auto old_schema_it = iceberg_table_schemas_by_ids.find(old_id);
if (old_schema_it == iceberg_table_schemas_by_ids.end())
{
new_schema = iceberg_table_schemas_by_ids.at(new_id);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", old_id);
}
catch (std::exception &)
auto new_schema_it = iceberg_table_schemas_by_ids.find(new_id);
if (new_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id);
}
return transform_dags_by_ids[{old_id, new_id}] = getSchemaTransformationDag(old_schema, new_schema);
return transform_dags_by_ids[{old_id, new_id}] = getSchemaTransformationDag(old_schema_it->second, new_schema_it->second);
}
void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr)
@ -425,14 +422,10 @@ void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schem
std::shared_ptr<NamesAndTypesList> IcebergSchemaProcessor::getClickhouseTableSchemaById(Int32 id)
{
try
{
return clickhouse_table_schemas_by_ids.at(id);
}
catch (std::exception &)
{
auto it = clickhouse_table_schemas_by_ids.find(id);
if (it == clickhouse_table_schemas_by_ids.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with id {} is unknown", id);
}
return it->second;
}
MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const Block & header, const FormatSettings & settings)

View File

@ -1,7 +1,8 @@
#pragma once
#include <memory>
#include "config.h"
#include <memory>
#include <unordered_map>
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
@ -80,16 +81,20 @@ public:
std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id);
private:
std::map<Int32, Poco::JSON::Object::Ptr> iceberg_table_schemas_by_ids;
std::map<Int32, std::shared_ptr<NamesAndTypesList>> clickhouse_table_schemas_by_ids;
std::map<std::pair<Int32, Int32>, std::shared_ptr<ActionsDAG>> transform_dags_by_ids;
std::unordered_map<Int32, Poco::JSON::Object::Ptr>
iceberg_table_schemas_by_ids;
std::unordered_map<Int32, std::shared_ptr<NamesAndTypesList>>
clickhouse_table_schemas_by_ids;
std::map<std::pair<Int32, Int32>, std::shared_ptr<ActionsDAG>>
transform_dags_by_ids;
NamesAndTypesList getSchemaType(const Poco::JSON::Object::Ptr & schema);
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type);
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required);
DataTypePtr getSimpleType(const String & type_name);
NamesAndTypesList getSchemaType(const Poco::JSON::Object::Ptr &schema);
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr &type);
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr &field,
const String &type_key, bool required);
DataTypePtr getSimpleType(const String &type_name);
std::shared_ptr<ActionsDAG> getSchemaTransformationDag(
[[maybe_unused]] const Poco::JSON::Object::Ptr & old_schema, [[maybe_unused]] const Poco::JSON::Object::Ptr & new_schema);
const Poco::JSON::Object::Ptr & old_schema, [const Poco::JSON::Object::Ptr & new_schema);
bool allowPrimitiveTypeConversion(const String & old_type, const String & new_type);
@ -97,9 +102,6 @@ private:
Int32 current_old_id = -1;
Int32 current_new_id = -1;
// std::pair<const Node *, const Node *>
// getRemappingForStructField(const Poco::JSON::Array::Ptr & old_node, const Poco::JSON::Array::Ptr & new_node, const Node * input_node);
};

View File

@ -18,6 +18,11 @@ class ReadBufferIterator;
class SchemaCache;
class NamedCollection;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/**
* A general class containing implementation for external table engines