From 5746970ea96ac70a2bcdc0e7f33f61afd23403aa Mon Sep 17 00:00:00 2001 From: divanik Date: Sun, 17 Nov 2024 13:41:23 +0000 Subject: [PATCH 1/3] To checkout --- src/Common/DateLUTImpl.cpp | 12 + src/Common/DateLUTImpl.h | 4 + .../{ => Iceberg}/IcebergMetadata.cpp | 127 +++++---- .../DataLakes/Iceberg/IcebergMetadata.h | 267 ++++++++++++++++++ .../DataLakes/Iceberg/PartitionPruning.cpp | 136 +++++++++ .../ObjectStorage/DataLakes/IcebergMetadata.h | 117 -------- 6 files changed, 484 insertions(+), 179 deletions(-) rename src/Storages/ObjectStorage/DataLakes/{ => Iceberg}/IcebergMetadata.cpp (89%) create mode 100644 src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h create mode 100644 src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp delete mode 100644 src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h diff --git a/src/Common/DateLUTImpl.cpp b/src/Common/DateLUTImpl.cpp index 355d39780f2..80e2fb8c98b 100644 --- a/src/Common/DateLUTImpl.cpp +++ b/src/Common/DateLUTImpl.cpp @@ -291,3 +291,15 @@ namespace cctz_extension ZoneInfoSourceFactory zone_info_source_factory = custom_factory; } + +DateLUTImpl::Values DateLUTImpl::lutIndexByMonthSinceEpochStartsZeroIndexing(Int32 months) const +{ + Int16 year = 1970 + months / 12; + UInt8 month = months % 12 + 1; + return lut[makeLUTIndex(year, month, 1)]; +} + +DateLUTImpl::Values DateLUTImpl::lutIndexByYearSinceEpochStartsZeroIndexing(Int16 years) const +{ + return lut[makeLUTIndex(years + 1970, 1, 1)]; +} diff --git a/src/Common/DateLUTImpl.h b/src/Common/DateLUTImpl.h index f1a937d1c65..b23d06aded6 100644 --- a/src/Common/DateLUTImpl.h +++ b/src/Common/DateLUTImpl.h @@ -1166,6 +1166,10 @@ public: return LUTIndex{std::min(index, static_cast(DATE_LUT_SIZE - 1))}; } + Values lutIndexByMonthSinceEpochStartsZeroIndexing(Int32 months) const; + + Values lutIndexByYearSinceEpochStartsZeroIndexing(Int16 years) const; + /// Create DayNum from year, month, day of month. ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month, Int32 default_error_day_num = 0) const { diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp similarity index 89% rename from src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp rename to src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index f0a80a41d4e..5af933eec2a 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -2,43 +2,44 @@ #if USE_AVRO -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include -#include -#include -#include -#include +# include +# include +# include + +# include namespace DB { namespace Setting { - extern const SettingsBool iceberg_engine_ignore_schema_evolution; +extern const SettingsBool iceberg_engine_ignore_schema_evolution; } namespace ErrorCodes @@ -245,10 +246,10 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t } throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString()); - } -std::pair parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution) +std::tuple> +parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution) { Poco::JSON::Object::Ptr schema; Int32 current_schema_id; @@ -279,20 +280,24 @@ std::pair parseTableSchema(const Poco::JSON::Object::P } if (!schema) - throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)"); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)"); } else { if (schemas->size() != 1) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + throw Exception( + ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is " - "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " + "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, " + "enable setting " "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); /// Now we sure that there is only one schema. schema = schemas->getObject(0); if (schema->getValue("schema-id") != current_schema_id) - throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); } } else @@ -301,30 +306,32 @@ std::pair parseTableSchema(const Poco::JSON::Object::P current_schema_id = schema->getValue("schema-id"); /// Field "schemas" is optional for version 1, but after version 2 was introduced, /// in most cases this field is added for new tables in version 1 as well. - if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract()->size() > 1) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + if (!ignore_schema_evolution && metadata_object->has("schemas") + && metadata_object->get("schemas").extract()->size() > 1) + throw Exception( + ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " - "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " + "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable " + "setting " "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); } NamesAndTypesList names_and_types; auto fields = schema->get("fields").extract(); + std::unordered_map name_and_type_by_source_id; for (size_t i = 0; i != fields->size(); ++i) { auto field = fields->getObject(static_cast(i)); auto name = field->getValue("name"); bool required = field->getValue("required"); names_and_types.push_back({name, getFieldType(field, "type", required)}); + name_and_type_by_source_id[field->getValue("id")] = {name, names_and_types.back().type}; } - return {std::move(names_and_types), current_schema_id}; + return {std::move(names_and_types), current_schema_id, name_and_type_by_source_id}; } -MutableColumns parseAvro( - avro::DataFileReaderBase & file_reader, - const Block & header, - const FormatSettings & settings) +MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const Block & header, const FormatSettings & settings) { auto deserializer = std::make_unique(header, file_reader.dataSchema(), true, true, settings); MutableColumns columns = header.cloneEmptyColumns(); @@ -345,17 +352,14 @@ MutableColumns parseAvro( * 1) v.metadata.json, where V - metadata version. * 2) -.metadata.json, where V - metadata version */ -std::pair getMetadataFileAndVersion( - ObjectStoragePtr object_storage, - const StorageObjectStorage::Configuration & configuration) +std::pair +getMetadataFileAndVersion(ObjectStoragePtr object_storage, const StorageObjectStorage::Configuration & configuration) { const auto metadata_files = listFiles(*object_storage, configuration, "metadata", ".metadata.json"); if (metadata_files.empty()) { throw Exception( - ErrorCodes::FILE_DOESNT_EXIST, - "The metadata file for Iceberg table with path {} doesn't exist", - configuration.getPath()); + ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", configuration.getPath()); } std::vector> metadata_files_with_versions; @@ -372,7 +376,8 @@ std::pair getMetadataFileAndVersion( version_str = String(file_name.begin(), file_name.begin() + file_name.find_first_of('-')); if (!std::all_of(version_str.begin(), version_str.end(), isdigit)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name); metadata_files_with_versions.emplace_back(std::stoi(version_str), path); } @@ -403,7 +408,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt const Poco::JSON::Object::Ptr & object = json.extract(); auto format_version = object->getValue("format-version"); - auto [schema, schema_id] + auto [schema, schema_id, name_and_type_by_source_id] = parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]); auto current_snapshot_id = object->getValue("current-snapshot-id"); @@ -456,7 +461,6 @@ Strings IcebergMetadata::getDataFiles() const if (!data_files.empty()) return data_files; - Strings manifest_files; if (manifest_list_file.empty()) return data_files; @@ -465,7 +469,8 @@ Strings IcebergMetadata::getDataFiles() const auto context = getContext(); StorageObjectStorageSource::ObjectInfo object_info(manifest_list_file); auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log); - auto manifest_list_file_reader = std::make_unique(std::make_unique(*manifest_list_buf)); + auto manifest_list_file_reader + = std::make_unique(std::make_unique(*manifest_list_buf)); auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0)); Block header{{data_type->createColumn(), data_type, "manifest_path"}}; @@ -511,7 +516,8 @@ Strings IcebergMetadata::getDataFiles() const throw Exception( ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " - "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " + "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable " + "setting " "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); avro::NodePtr root_node = manifest_file_reader->dataSchema().root(); @@ -520,9 +526,7 @@ Strings IcebergMetadata::getDataFiles() const if (leaves_num < expected_min_num) { throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Unexpected number of columns {}. Expected at least {}", - root_node->leaves(), expected_min_num); + ErrorCodes::BAD_ARGUMENTS, "Unexpected number of columns {}. Expected at least {}", root_node->leaves(), expected_min_num); } avro::NodePtr status_node = root_node->leafAt(0); @@ -615,7 +619,8 @@ Strings IcebergMetadata::getDataFiles() const { Int32 content_type = content_int_column->getElement(i); if (DataFileContent(content_type) != DataFileContent::DATA) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported"); + throw Exception( + ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported"); } const auto status = status_int_column->getInt(i); @@ -635,14 +640,12 @@ Strings IcebergMetadata::getDataFiles() const { LOG_TEST(log, "Processing data file for path: {}", file_path); files.insert(file_path); + data_files.push_back(file_path); } } } - - data_files = std::vector(files.begin(), files.end()); return data_files; } } - #endif diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h new file mode 100644 index 00000000000..c4f74ad8181 --- /dev/null +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -0,0 +1,267 @@ +#pragma once + +#include "Core/NamesAndTypes.h" +#include "DataTypes/DataTypeNullable.h" +#include "config.h" + +#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. + +# include +# include +# include +# include +# include + +namespace DB +{ + +/** + * Useful links: + * - https://iceberg.apache.org/spec/ + * + * Iceberg has two format versions, v1 and v2. The content of metadata files depends on the version. + * + * Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`. + * Metadata file - json file. + * Manifest list – an Avro file that lists manifest files; one per snapshot. + * Manifest file – an Avro file that lists data or delete files; a subset of a snapshot. + * All changes to table state create a new metadata file and replace the old metadata with an atomic swap. + * + * In order to find out which data files to read, we need to find the `manifest list` + * which corresponds to the latest snapshot. We find it by checking a list of snapshots + * in metadata's "snapshots" section. + * + * Example of metadata.json file. + * { + * "format-version" : 1, + * "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5", + * "location" : "/iceberg_data/db/table_name", + * "last-updated-ms" : 1680206743150, + * "last-column-id" : 2, + * "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {}, {}, ... ] }, + * "current-schema-id" : 0, + * "schemas" : [ ], + * ... + * "current-snapshot-id" : 2819310504515118887, + * "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } }, + * "snapshots" : [ { + * "snapshot-id" : 2819310504515118887, + * "timestamp-ms" : 1680206743150, + * "summary" : { + * "operation" : "append", "spark.app.id" : "local-1680206733239", + * "added-data-files" : "1", "added-records" : "100", + * "added-files-size" : "1070", "changed-partition-count" : "1", + * "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0", + * "total-position-deletes" : "0", "total-equality-deletes" : "0" + * }, + * "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro", + * "schema-id" : 0 + * } ], + * "statistics" : [ ], + * "snapshot-log" : [ ... ], + * "metadata-log" : [ ] + * } + */ + +enum class PartitionTransform +{ + Year, + Month, + Day, + Hour, + Identity, + Unsupported +}; + +struct CommonPartitionInfo +{ + std::vector partition_columns; + std::vector partition_transforms; + std::vector partition_source_ids; +}; + +struct SpecificSchemaPartitionInfo +{ + std::vector> ranges; + NamesAndTypesList partition_names_and_types; +}; + +class PartitionPruningProcessor +{ +public: + CommonPartitionInfo getCommonPartitionInfo(Poco::JSON::Array::Ptr partition_specification, const ColumnTuple * big_partition_tuple); + + SpecificSchemaPartitionInfo getSpecificPartitionPruning( + const CommonPartitionInfo & common_info, + Int32 schema_version, + const std::unordered_map & name_and_type_by_source_id); + + std::vector getPruningMask(const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context); + +private: + static PartitionTransform getTransform(const String & transform_name) + { + if (transform_name == "year") + { + return PartitionTransform::Year; + } + else if (transform_name == "month") + { + return PartitionTransform::Month; + } + else if (transform_name == "day") + { + return PartitionTransform::Day; + } + else + { + return PartitionTransform::Unsupported; + } + } + + static DateLUTImpl::Values getValues(Int32 value, PartitionTransform transform) + { + if (transform == PartitionTransform::Year) + { + return DateLUT::instance().lutIndexByYearSinceEpochStartsZeroIndexing(value); + } + else if (transform == PartitionTransform::Month) + { + return DateLUT::instance().lutIndexByMonthSinceEpochStartsZeroIndexing(static_cast(value)); + } + else if (transform == PartitionTransform::Day) + { + return DateLUT::instance().getValues(static_cast(value)); + } + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported partition transform for get day function: {}", transform); + } + + static Int64 getTime(Int32 value, PartitionTransform transform) + { + DateLUTImpl::Values values = getValues(value, transform); + // LOG_DEBUG(&Poco::Logger::get("Get field"), "Values: {}", values); + return values.date; + } + + static Int16 getDay(Int32 value, PartitionTransform transform) + { + DateLUTImpl::Time got_time = getTime(value, transform); + LOG_DEBUG(&Poco::Logger::get("Get field"), "Time: {}", got_time); + return DateLUT::instance().toDayNum(got_time); + } + + static Range + getPartitionRange(PartitionTransform partition_transform, UInt32 index, ColumnPtr partition_column, DataTypePtr column_data_type) + { + if (partition_transform == PartitionTransform::Year || partition_transform == PartitionTransform::Month + || partition_transform == PartitionTransform::Day) + { + auto column = dynamic_cast(partition_column.get())->getNestedColumnPtr(); + const auto * casted_innner_column = assert_cast(column.get()); + Int32 value = static_cast(casted_innner_column->getInt(index)); + LOG_DEBUG(&Poco::Logger::get("Partition"), "Partition value: {}, transform: {}", value, partition_transform); + auto nested_data_type = dynamic_cast(column_data_type.get())->getNestedType(); + if (WhichDataType(nested_data_type).isDate()) + { + const UInt16 begin_range_value = getDay(value, partition_transform); + const UInt16 end_range_value = getDay(value + 1, partition_transform); + return Range{begin_range_value, true, end_range_value, false}; + } + else if (WhichDataType(nested_data_type).isDateTime64()) + { + const UInt64 begin_range_value = getTime(value, partition_transform); + const UInt64 end_range_value = getTime(value + 1, partition_transform); + return Range{begin_range_value, true, end_range_value, false}; + } + else + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Partition transform {} is not supported for the type: {}", + partition_transform, + nested_data_type); + } + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported partition transform: {}", partition_transform); + } + } + + std::unordered_map common_partition_info_by_manifest_file; + std::map, SpecificSchemaPartitionInfo> specific_partition_info_by_manifest_file_and_schema; +}; + + +class IcebergMetadata : public IDataLakeMetadata, private WithContext +{ +public: + using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr; + + static constexpr auto name = "Iceberg"; + + enum class PartitionTransform + { + Year, + Month, + Day, + Hour, + Unsupported + }; + + IcebergMetadata( + ObjectStoragePtr object_storage_, + ConfigurationObserverPtr configuration_, + ContextPtr context_, + Int32 metadata_version_, + Int32 format_version_, + String manifest_list_file_, + Int32 current_schema_id_, + NamesAndTypesList schema_); + + /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. + /// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file) + Strings getDataFiles() const override; + + /// Get table schema parsed from metadata. + NamesAndTypesList getTableSchema() const override { return schema; } + + const std::unordered_map & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; } + + const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; } + + bool operator==(const IDataLakeMetadata & other) const override + { + const auto * iceberg_metadata = dynamic_cast(&other); + return iceberg_metadata && getVersion() == iceberg_metadata->getVersion(); + } + + static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context); + + Strings makePartitionPruning(const ActionsDAG & filter_dag); + +private: + size_t getVersion() const { return metadata_version; } + + const ObjectStoragePtr object_storage; + const ConfigurationObserverPtr configuration; + Int32 metadata_version; + Int32 format_version; + String manifest_list_file; + Int32 current_schema_id; + NamesAndTypesList schema; + std::unordered_map column_name_to_physical_name; + DataLakePartitionColumns partition_columns; + LoggerPtr log; + + std::unordered_map name_and_type_by_source_id; + + std::vector> manifest_files_with_start_index; + + mutable Strings data_files; + mutable Strings manifest_files; +}; + +} + +#endif diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp new file mode 100644 index 00000000000..36c4a22642d --- /dev/null +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp @@ -0,0 +1,136 @@ +#include +#include "Common/DateLUT.h" +#include "Core/NamesAndTypes.h" +#include "config.h" + +#if USE_AVRO + +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include + + +# include +# include +# include + +# include + +// # include + +# include +namespace DB +{ + +CommonPartitionInfo +PartitionPruningProcessor::getCommonPartitionInfo(Poco::JSON::Array::Ptr partition_specification, const ColumnTuple * big_partition_tuple) +{ + CommonPartitionInfo common_info; + for (size_t i = 0; i != partition_specification->size(); ++i) + { + auto current_field = partition_specification->getObject(static_cast(i)); + + auto source_id = current_field->getValue("source-id"); + PartitionTransform transform = getTransform(current_field->getValue("transform")); + + if (transform == PartitionTransform::Unsupported) + { + continue; + } + auto partition_name = current_field->getValue("name"); + LOG_DEBUG(&Poco::Logger::get("Partition Spec"), "Name: {}", partition_name); + + common_info.partition_columns.push_back(big_partition_tuple->getColumnPtr(i)); + common_info.partition_transforms.push_back(transform); + common_info.partition_source_ids.push_back(source_id); + } + return common_info; +} + +SpecificSchemaPartitionInfo PartitionPruningProcessor::getSpecificPartitionPruning( + const CommonPartitionInfo & common_info, + [[maybe_unused]] Int32 schema_version, + const std::unordered_map & name_and_type_by_source_id) +{ + SpecificSchemaPartitionInfo specific_info; + + for (size_t i = 0; i < common_info.partition_columns.size(); ++i) + { + Int32 source_id = common_info.partition_source_ids[i]; + auto it = name_and_type_by_source_id.find(source_id); + if (it == name_and_type_by_source_id.end()) + { + continue; + } + size_t column_size = common_info.partition_columns[i]->size(); + if (specific_info.ranges.empty()) + { + specific_info.ranges.resize(column_size); + } + else + { + assert(specific_info.ranges.size() == column_size); + } + NameAndTypePair name_and_type = it->second; + specific_info.partition_names_and_types.push_back(name_and_type); + for (size_t j = 0; j < column_size; ++j) + { + specific_info.ranges[j].push_back(getPartitionRange( + common_info.partition_transforms[i], static_cast(j), common_info.partition_columns[i], name_and_type.type)); + } + } + return specific_info; +} + +std::vector PartitionPruningProcessor::getPruningMask( + const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context) +{ + std::vector pruning_mask; + if (!specific_info.partition_names_and_types.empty()) + { + ExpressionActionsPtr partition_minmax_idx_expr = std::make_shared( + ActionsDAG(specific_info.partition_names_and_types), ExpressionActionsSettings::fromContext(context)); + const KeyCondition partition_key_condition( + filter_dag, context, specific_info.partition_names_and_types.getNames(), partition_minmax_idx_expr); + for (size_t j = 0; j < specific_info.ranges.size(); ++j) + { + if (!partition_key_condition.checkInHyperrectangle(specific_info.ranges[j], specific_info.partition_names_and_types.getTypes()) + .can_be_true) + { + LOG_DEBUG(&Poco::Logger::get("Partition pruning"), "Partition pruning was successful for file: {}", j); + pruning_mask.push_back(false); + } + else + { + LOG_DEBUG(&Poco::Logger::get("Partition pruning"), "Partition pruning failed for file: {}", j); + pruning_mask.push_back(true); + } + } + } + return pruning_mask; +} + +} + +#endif diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h deleted file mode 100644 index eb5cac591f2..00000000000 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ /dev/null @@ -1,117 +0,0 @@ -#pragma once - -#include "config.h" - -#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. - -#include -#include -#include -#include -#include - -namespace DB -{ - -/** - * Useful links: - * - https://iceberg.apache.org/spec/ - * - * Iceberg has two format versions, v1 and v2. The content of metadata files depends on the version. - * - * Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`. - * Metadata file - json file. - * Manifest list – an Avro file that lists manifest files; one per snapshot. - * Manifest file – an Avro file that lists data or delete files; a subset of a snapshot. - * All changes to table state create a new metadata file and replace the old metadata with an atomic swap. - * - * In order to find out which data files to read, we need to find the `manifest list` - * which corresponds to the latest snapshot. We find it by checking a list of snapshots - * in metadata's "snapshots" section. - * - * Example of metadata.json file. - * { - * "format-version" : 1, - * "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5", - * "location" : "/iceberg_data/db/table_name", - * "last-updated-ms" : 1680206743150, - * "last-column-id" : 2, - * "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {}, {}, ... ] }, - * "current-schema-id" : 0, - * "schemas" : [ ], - * ... - * "current-snapshot-id" : 2819310504515118887, - * "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } }, - * "snapshots" : [ { - * "snapshot-id" : 2819310504515118887, - * "timestamp-ms" : 1680206743150, - * "summary" : { - * "operation" : "append", "spark.app.id" : "local-1680206733239", - * "added-data-files" : "1", "added-records" : "100", - * "added-files-size" : "1070", "changed-partition-count" : "1", - * "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0", - * "total-position-deletes" : "0", "total-equality-deletes" : "0" - * }, - * "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro", - * "schema-id" : 0 - * } ], - * "statistics" : [ ], - * "snapshot-log" : [ ... ], - * "metadata-log" : [ ] - * } - */ -class IcebergMetadata : public IDataLakeMetadata, private WithContext -{ -public: - using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr; - - static constexpr auto name = "Iceberg"; - - IcebergMetadata( - ObjectStoragePtr object_storage_, - ConfigurationObserverPtr configuration_, - ContextPtr context_, - Int32 metadata_version_, - Int32 format_version_, - String manifest_list_file_, - Int32 current_schema_id_, - NamesAndTypesList schema_); - - /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. - /// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file) - Strings getDataFiles() const override; - - /// Get table schema parsed from metadata. - NamesAndTypesList getTableSchema() const override { return schema; } - - const std::unordered_map & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; } - - const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; } - - bool operator ==(const IDataLakeMetadata & other) const override - { - const auto * iceberg_metadata = dynamic_cast(&other); - return iceberg_metadata && getVersion() == iceberg_metadata->getVersion(); - } - - static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context); - -private: - size_t getVersion() const { return metadata_version; } - - const ObjectStoragePtr object_storage; - const ConfigurationObserverPtr configuration; - Int32 metadata_version; - Int32 format_version; - String manifest_list_file; - Int32 current_schema_id; - NamesAndTypesList schema; - mutable Strings data_files; - std::unordered_map column_name_to_physical_name; - DataLakePartitionColumns partition_columns; - LoggerPtr log; -}; - -} - -#endif From 63621bd38129e6996aa6694fca281b8c74c35077 Mon Sep 17 00:00:00 2001 From: divanik Date: Mon, 18 Nov 2024 16:27:11 +0000 Subject: [PATCH 2/3] Divided logic by files --- src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h | 2 +- src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 1a694a25dff..32ea79e864f 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index c4f74ad8181..b02a60603ab 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -69,7 +69,6 @@ enum class PartitionTransform Month, Day, Hour, - Identity, Unsupported }; From f76df37cfa03fe3df2f7f08756a52be5afa02e88 Mon Sep 17 00:00:00 2001 From: divanik Date: Mon, 18 Nov 2024 17:17:26 +0000 Subject: [PATCH 3/3] Add getAllFilesMask --- src/CMakeLists.txt | 1 + .../ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp | 4 +++- .../ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h | 8 +++++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3627d760d4c..2ddc1fb3e69 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -135,6 +135,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/S3) add_headers_and_sources(dbms Storages/ObjectStorage/HDFS) add_headers_and_sources(dbms Storages/ObjectStorage/Local) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes) +add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg) add_headers_and_sources(dbms Common/NamedCollections) add_headers_and_sources(dbms Common/Scheduler/Workload) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 5af933eec2a..6148e8103ac 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1,5 +1,6 @@ #include "config.h" + #if USE_AVRO # include @@ -88,6 +89,7 @@ enum class DataFileContent : uint8_t EQUALITY_DELETES = 2, }; + /** * Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types): * - Primitive types: @@ -307,7 +309,7 @@ parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_ver /// Field "schemas" is optional for version 1, but after version 2 was introduced, /// in most cases this field is added for new tables in version 1 as well. if (!ignore_schema_evolution && metadata_object->has("schemas") - && metadata_object->get("schemas").extract()->size() > 1) + && metadata_object->get("schemas").extract()->size() > 1§) throw Exception( ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index b02a60603ab..94a5acf28b5 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -88,7 +88,7 @@ struct SpecificSchemaPartitionInfo class PartitionPruningProcessor { public: - CommonPartitionInfo getCommonPartitionInfo(Poco::JSON::Array::Ptr partition_specification, const ColumnTuple * big_partition_tuple); + CommonPartitionInfo addCommonPartitionInfo(Poco::JSON::Array::Ptr partition_specification, const ColumnTuple * big_partition_tuple); SpecificSchemaPartitionInfo getSpecificPartitionPruning( const CommonPartitionInfo & common_info, @@ -97,6 +97,8 @@ public: std::vector getPruningMask(const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context); + std::vector getAllFilesMask(const ActionsDAG * filter_dag, ContextPtr context); + private: static PartitionTransform getTransform(const String & transform_name) { @@ -189,6 +191,8 @@ private: std::unordered_map common_partition_info_by_manifest_file; std::map, SpecificSchemaPartitionInfo> specific_partition_info_by_manifest_file_and_schema; + + std::vector common_partition_infos; }; @@ -259,6 +263,8 @@ private: mutable Strings data_files; mutable Strings manifest_files; + + PartitionPruningProcessor pruning_processor; }; }