include snapshot_id in iceberg class, compare version, schema and snapshot when comparing IcebergMetadata equality

This commit is contained in:
Brett Hoerner 2024-11-15 13:56:13 -07:00
parent afea675901
commit d5e1798474
2 changed files with 19 additions and 15 deletions

View File

@ -57,7 +57,8 @@ IcebergMetadata::IcebergMetadata(
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
Int32 schema_id_,
Int64 snapshot_id_,
DB::NamesAndTypesList schema_)
: WithContext(context_)
, object_storage(object_storage_)
@ -65,7 +66,8 @@ IcebergMetadata::IcebergMetadata(
, metadata_version(metadata_version_)
, format_version(format_version_)
, manifest_list_file(std::move(manifest_list_file_))
, current_schema_id(current_schema_id_)
, schema_id(schema_id_)
, snapshot_id(snapshot_id_)
, schema(std::move(schema_))
, log(getLogger("IcebergMetadata"))
{
@ -266,7 +268,7 @@ NamesAndTypesList parseTableSchema(const Poco::JSON::Object::Ptr & metadata_obje
if (ignore_schema_evolution)
{
/// If we ignore schema evolution, we will just use latest schema for all data files.
/// Find schema with 'schema-id' equal to 'current_schema_id'.
/// Find schema with the correct 'schema_id'.
for (uint32_t i = 0; i != schemas->size(); ++i)
{
auto current_schema = schemas->getObject(i);
@ -405,17 +407,18 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
String manifest_list_file;
Int32 schema_id = -1;
Int64 snapshot_id = -1;
// Use the current snapshot (by default) or find the appropriate snapshot based on timestamp
Int64 query_timestamp = local_context->getSettingsRef()[Setting::iceberg_query_at_timestamp_ms];
if (query_timestamp == 0)
{
// Use current snapshot
auto current_snapshot_id = metadata->getValue<Int64>("current-snapshot-id");
snapshot_id = metadata->getValue<Int64>("current-snapshot-id");
for (size_t i = 0; i < snapshots->size(); ++i)
{
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
if (snapshot->getValue<Int64>("snapshot-id") == snapshot_id)
{
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
@ -424,7 +427,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
}
}
if (manifest_list_file.empty() || schema_id == -1)
if (manifest_list_file.empty() || schema_id == -1 || snapshot_id == -1)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
@ -448,10 +451,11 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
schema_id = snapshot->getValue<Int32>("schema-id");
snapshot_id = snapshot->getValue<Int64>("snapshot-id");
}
}
if (manifest_list_file.empty() || schema_id == -1)
if (manifest_list_file.empty() || schema_id == -1 || snapshot_id == -1)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
@ -466,7 +470,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
auto schema = parseTableSchema(metadata, format_version, schema_id, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
return std::make_unique<IcebergMetadata>(
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema);
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, snapshot_id, schema);
}
/**
@ -551,7 +555,7 @@ Strings IcebergMetadata::getDataFiles() const
Poco::Dynamic::Var json = parser.parse(schema_json_string);
Poco::JSON::Object::Ptr schema_object = json.extract<Poco::JSON::Object::Ptr>();
if (!context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]
&& schema_object->getValue<int>("schema-id") != current_schema_id)
&& schema_object->getValue<int>("schema-id") != schema_id)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "

View File

@ -74,7 +74,8 @@ public:
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
Int32 schema_id_,
Int64 snapshot_id_,
NamesAndTypesList schema_);
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
@ -91,22 +92,21 @@ public:
bool operator ==(const IDataLakeMetadata & other) const override
{
const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other);
return iceberg_metadata && getManifestListFile() == iceberg_metadata->getManifestListFile();
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
}
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
private:
const std::string& getManifestListFile() const { return manifest_list_file; }
size_t getVersion() const { return metadata_version; }
std::tuple<Int32, Int32, Int64> getVersion() const { return std::make_tuple(metadata_version, schema_id, snapshot_id); }
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
Int32 metadata_version;
Int32 format_version;
String manifest_list_file;
Int32 current_schema_id;
Int32 schema_id;
Int64 snapshot_id;
NamesAndTypesList schema;
mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;