This commit is contained in:
Daniil Ivanik 2024-11-21 00:40:00 -05:00 committed by GitHub
commit 3f4dd36336
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 493 additions and 180 deletions

View File

@ -135,6 +135,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/S3)
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS) add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local) add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg)
add_headers_and_sources(dbms Common/NamedCollections) add_headers_and_sources(dbms Common/NamedCollections)
add_headers_and_sources(dbms Common/Scheduler/Workload) add_headers_and_sources(dbms Common/Scheduler/Workload)

View File

@ -291,3 +291,15 @@ namespace cctz_extension
ZoneInfoSourceFactory zone_info_source_factory = custom_factory; ZoneInfoSourceFactory zone_info_source_factory = custom_factory;
} }
DateLUTImpl::Values DateLUTImpl::lutIndexByMonthSinceEpochStartsZeroIndexing(Int32 months) const
{
Int16 year = 1970 + months / 12;
UInt8 month = months % 12 + 1;
return lut[makeLUTIndex(year, month, 1)];
}
DateLUTImpl::Values DateLUTImpl::lutIndexByYearSinceEpochStartsZeroIndexing(Int16 years) const
{
return lut[makeLUTIndex(years + 1970, 1, 1)];
}

View File

@ -1166,6 +1166,10 @@ public:
return LUTIndex{std::min(index, static_cast<UInt32>(DATE_LUT_SIZE - 1))}; return LUTIndex{std::min(index, static_cast<UInt32>(DATE_LUT_SIZE - 1))};
} }
Values lutIndexByMonthSinceEpochStartsZeroIndexing(Int32 months) const;
Values lutIndexByYearSinceEpochStartsZeroIndexing(Int16 years) const;
/// Create DayNum from year, month, day of month. /// Create DayNum from year, month, day of month.
ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month, Int32 default_error_day_num = 0) const ExtendedDayNum makeDayNum(Int16 year, UInt8 month, UInt8 day_of_month, Int32 default_error_day_num = 0) const
{ {

View File

@ -5,7 +5,7 @@
#include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h> #include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h> #include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h> #include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h> #include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h> #include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/Local/Configuration.h> #include <Storages/ObjectStorage/Local/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h> #include <Storages/ObjectStorage/S3/Configuration.h>

View File

@ -1,44 +1,46 @@
#include "config.h" #include "config.h"
#if USE_AVRO #if USE_AVRO
#include <Common/logger_useful.h> # include <Columns/ColumnString.h>
#include <Core/Settings.h> # include <Columns/ColumnTuple.h>
#include <Columns/ColumnString.h> # include <Columns/IColumn.h>
#include <Columns/ColumnTuple.h> # include <Core/Settings.h>
#include <Columns/IColumn.h> # include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeArray.h> # include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate.h> # include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeDateTime64.h> # include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFactory.h> # include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeFixedString.h> # include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeMap.h> # include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeNullable.h> # include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeString.h> # include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeTuple.h> # include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeUUID.h> # include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesDecimal.h> # include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesNumber.h> # include <Formats/FormatFactory.h>
#include <Formats/FormatFactory.h> # include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h> # include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFileBase.h> # include <IO/ReadHelpers.h>
#include <IO/ReadHelpers.h> # include <Processors/Formats/Impl/AvroRowInputFormat.h>
#include <Processors/Formats/Impl/AvroRowInputFormat.h> # include <Storages/ObjectStorage/DataLakes/Common.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h> # include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Storages/ObjectStorage/DataLakes/Common.h> # include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h> # include <Common/logger_useful.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <filesystem> # include <Poco/JSON/Array.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <filesystem>
namespace DB namespace DB
{ {
namespace Setting namespace Setting
{ {
extern const SettingsBool iceberg_engine_ignore_schema_evolution; extern const SettingsBool iceberg_engine_ignore_schema_evolution;
} }
namespace ErrorCodes namespace ErrorCodes
@ -87,6 +89,7 @@ enum class DataFileContent : uint8_t
EQUALITY_DELETES = 2, EQUALITY_DELETES = 2,
}; };
/** /**
* Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types): * Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types):
* - Primitive types: * - Primitive types:
@ -245,10 +248,10 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
} }
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString()); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
} }
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution) std::tuple<NamesAndTypesList, Int32, std::unordered_map<Int32, NameAndTypePair>>
parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
{ {
Poco::JSON::Object::Ptr schema; Poco::JSON::Object::Ptr schema;
Int32 current_schema_id; Int32 current_schema_id;
@ -279,20 +282,24 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
} }
if (!schema) if (!schema)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)"); throw Exception(
ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
} }
else else
{ {
if (schemas->size() != 1) if (schemas->size() != 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is " "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, "
"enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
/// Now we sure that there is only one schema. /// Now we sure that there is only one schema.
schema = schemas->getObject(0); schema = schemas->getObject(0);
if (schema->getValue<int>("schema-id") != current_schema_id) if (schema->getValue<int>("schema-id") != current_schema_id)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)"); throw Exception(
ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
} }
} }
else else
@ -301,30 +308,32 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
current_schema_id = schema->getValue<int>("schema-id"); current_schema_id = schema->getValue<int>("schema-id");
/// Field "schemas" is optional for version 1, but after version 2 was introduced, /// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well. /// in most cases this field is added for new tables in version 1 as well.
if (!ignore_schema_evolution && metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1) if (!ignore_schema_evolution && metadata_object->has("schemas")
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1§)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable "
"setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
} }
NamesAndTypesList names_and_types; NamesAndTypesList names_and_types;
auto fields = schema->get("fields").extract<Poco::JSON::Array::Ptr>(); auto fields = schema->get("fields").extract<Poco::JSON::Array::Ptr>();
std::unordered_map<Int32, NameAndTypePair> name_and_type_by_source_id;
for (size_t i = 0; i != fields->size(); ++i) for (size_t i = 0; i != fields->size(); ++i)
{ {
auto field = fields->getObject(static_cast<UInt32>(i)); auto field = fields->getObject(static_cast<UInt32>(i));
auto name = field->getValue<String>("name"); auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required"); bool required = field->getValue<bool>("required");
names_and_types.push_back({name, getFieldType(field, "type", required)}); names_and_types.push_back({name, getFieldType(field, "type", required)});
name_and_type_by_source_id[field->getValue<int>("id")] = {name, names_and_types.back().type};
} }
return {std::move(names_and_types), current_schema_id}; return {std::move(names_and_types), current_schema_id, name_and_type_by_source_id};
} }
MutableColumns parseAvro( MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const Block & header, const FormatSettings & settings)
avro::DataFileReaderBase & file_reader,
const Block & header,
const FormatSettings & settings)
{ {
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings); auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
MutableColumns columns = header.cloneEmptyColumns(); MutableColumns columns = header.cloneEmptyColumns();
@ -345,17 +354,14 @@ MutableColumns parseAvro(
* 1) v<V>.metadata.json, where V - metadata version. * 1) v<V>.metadata.json, where V - metadata version.
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version * 2) <V>-<random-uuid>.metadata.json, where V - metadata version
*/ */
std::pair<Int32, String> getMetadataFileAndVersion( std::pair<Int32, String>
ObjectStoragePtr object_storage, getMetadataFileAndVersion(ObjectStoragePtr object_storage, const StorageObjectStorage::Configuration & configuration)
const StorageObjectStorage::Configuration & configuration)
{ {
const auto metadata_files = listFiles(*object_storage, configuration, "metadata", ".metadata.json"); const auto metadata_files = listFiles(*object_storage, configuration, "metadata", ".metadata.json");
if (metadata_files.empty()) if (metadata_files.empty())
{ {
throw Exception( throw Exception(
ErrorCodes::FILE_DOESNT_EXIST, ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", configuration.getPath());
"The metadata file for Iceberg table with path {} doesn't exist",
configuration.getPath());
} }
std::vector<std::pair<UInt32, String>> metadata_files_with_versions; std::vector<std::pair<UInt32, String>> metadata_files_with_versions;
@ -372,7 +378,8 @@ std::pair<Int32, String> getMetadataFileAndVersion(
version_str = String(file_name.begin(), file_name.begin() + file_name.find_first_of('-')); version_str = String(file_name.begin(), file_name.begin() + file_name.find_first_of('-'));
if (!std::all_of(version_str.begin(), version_str.end(), isdigit)) if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name); throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
metadata_files_with_versions.emplace_back(std::stoi(version_str), path); metadata_files_with_versions.emplace_back(std::stoi(version_str), path);
} }
@ -403,7 +410,7 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>(); const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
auto format_version = object->getValue<int>("format-version"); auto format_version = object->getValue<int>("format-version");
auto [schema, schema_id] auto [schema, schema_id, name_and_type_by_source_id]
= parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]); = parseTableSchema(object, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id"); auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
@ -456,7 +463,6 @@ Strings IcebergMetadata::getDataFiles() const
if (!data_files.empty()) if (!data_files.empty())
return data_files; return data_files;
Strings manifest_files;
if (manifest_list_file.empty()) if (manifest_list_file.empty())
return data_files; return data_files;
@ -465,7 +471,8 @@ Strings IcebergMetadata::getDataFiles() const
auto context = getContext(); auto context = getContext();
StorageObjectStorageSource::ObjectInfo object_info(manifest_list_file); StorageObjectStorageSource::ObjectInfo object_info(manifest_list_file);
auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log); auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log);
auto manifest_list_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf)); auto manifest_list_file_reader
= std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0)); auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0));
Block header{{data_type->createColumn(), data_type, "manifest_path"}}; Block header{{data_type->createColumn(), data_type, "manifest_path"}};
@ -511,7 +518,8 @@ Strings IcebergMetadata::getDataFiles() const
throw Exception( throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not " "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting " "supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable "
"setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)"); "iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
avro::NodePtr root_node = manifest_file_reader->dataSchema().root(); avro::NodePtr root_node = manifest_file_reader->dataSchema().root();
@ -520,9 +528,7 @@ Strings IcebergMetadata::getDataFiles() const
if (leaves_num < expected_min_num) if (leaves_num < expected_min_num)
{ {
throw Exception( throw Exception(
ErrorCodes::BAD_ARGUMENTS, ErrorCodes::BAD_ARGUMENTS, "Unexpected number of columns {}. Expected at least {}", root_node->leaves(), expected_min_num);
"Unexpected number of columns {}. Expected at least {}",
root_node->leaves(), expected_min_num);
} }
avro::NodePtr status_node = root_node->leafAt(0); avro::NodePtr status_node = root_node->leafAt(0);
@ -615,7 +621,8 @@ Strings IcebergMetadata::getDataFiles() const
{ {
Int32 content_type = content_int_column->getElement(i); Int32 content_type = content_int_column->getElement(i);
if (DataFileContent(content_type) != DataFileContent::DATA) if (DataFileContent(content_type) != DataFileContent::DATA)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported"); throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
} }
const auto status = status_int_column->getInt(i); const auto status = status_int_column->getInt(i);
@ -635,14 +642,12 @@ Strings IcebergMetadata::getDataFiles() const
{ {
LOG_TEST(log, "Processing data file for path: {}", file_path); LOG_TEST(log, "Processing data file for path: {}", file_path);
files.insert(file_path); files.insert(file_path);
data_files.push_back(file_path);
} }
} }
} }
data_files = std::vector<std::string>(files.begin(), files.end());
return data_files; return data_files;
} }
} }
#endif #endif

View File

@ -0,0 +1,272 @@
#pragma once
#include "Core/NamesAndTypes.h"
#include "DataTypes/DataTypeNullable.h"
#include "config.h"
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
# include <Core/Types.h>
# include <Disks/ObjectStorages/IObjectStorage.h>
# include <Interpreters/Context_fwd.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/StorageObjectStorage.h>
namespace DB
{
/**
* Useful links:
* - https://iceberg.apache.org/spec/
*
* Iceberg has two format versions, v1 and v2. The content of metadata files depends on the version.
*
* Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`.
* Metadata file - json file.
* Manifest list an Avro file that lists manifest files; one per snapshot.
* Manifest file an Avro file that lists data or delete files; a subset of a snapshot.
* All changes to table state create a new metadata file and replace the old metadata with an atomic swap.
*
* In order to find out which data files to read, we need to find the `manifest list`
* which corresponds to the latest snapshot. We find it by checking a list of snapshots
* in metadata's "snapshots" section.
*
* Example of metadata.json file.
* {
* "format-version" : 1,
* "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
* "location" : "/iceberg_data/db/table_name",
* "last-updated-ms" : 1680206743150,
* "last-column-id" : 2,
* "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
* "current-schema-id" : 0,
* "schemas" : [ ],
* ...
* "current-snapshot-id" : 2819310504515118887,
* "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } },
* "snapshots" : [ {
* "snapshot-id" : 2819310504515118887,
* "timestamp-ms" : 1680206743150,
* "summary" : {
* "operation" : "append", "spark.app.id" : "local-1680206733239",
* "added-data-files" : "1", "added-records" : "100",
* "added-files-size" : "1070", "changed-partition-count" : "1",
* "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
* "total-position-deletes" : "0", "total-equality-deletes" : "0"
* },
* "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
* "schema-id" : 0
* } ],
* "statistics" : [ ],
* "snapshot-log" : [ ... ],
* "metadata-log" : [ ]
* }
*/
enum class PartitionTransform
{
Year,
Month,
Day,
Hour,
Unsupported
};
struct CommonPartitionInfo
{
std::vector<ColumnPtr> partition_columns;
std::vector<PartitionTransform> partition_transforms;
std::vector<Int32> partition_source_ids;
};
struct SpecificSchemaPartitionInfo
{
std::vector<std::vector<Range>> ranges;
NamesAndTypesList partition_names_and_types;
};
class PartitionPruningProcessor
{
public:
CommonPartitionInfo addCommonPartitionInfo(Poco::JSON::Array::Ptr partition_specification, const ColumnTuple * big_partition_tuple);
SpecificSchemaPartitionInfo getSpecificPartitionPruning(
const CommonPartitionInfo & common_info,
Int32 schema_version,
const std::unordered_map<Int32, NameAndTypePair> & name_and_type_by_source_id);
std::vector<bool> getPruningMask(const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context);
std::vector<bool> getAllFilesMask(const ActionsDAG * filter_dag, ContextPtr context);
private:
static PartitionTransform getTransform(const String & transform_name)
{
if (transform_name == "year")
{
return PartitionTransform::Year;
}
else if (transform_name == "month")
{
return PartitionTransform::Month;
}
else if (transform_name == "day")
{
return PartitionTransform::Day;
}
else
{
return PartitionTransform::Unsupported;
}
}
static DateLUTImpl::Values getValues(Int32 value, PartitionTransform transform)
{
if (transform == PartitionTransform::Year)
{
return DateLUT::instance().lutIndexByYearSinceEpochStartsZeroIndexing(value);
}
else if (transform == PartitionTransform::Month)
{
return DateLUT::instance().lutIndexByMonthSinceEpochStartsZeroIndexing(static_cast<UInt32>(value));
}
else if (transform == PartitionTransform::Day)
{
return DateLUT::instance().getValues(static_cast<UInt16>(value));
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported partition transform for get day function: {}", transform);
}
static Int64 getTime(Int32 value, PartitionTransform transform)
{
DateLUTImpl::Values values = getValues(value, transform);
// LOG_DEBUG(&Poco::Logger::get("Get field"), "Values: {}", values);
return values.date;
}
static Int16 getDay(Int32 value, PartitionTransform transform)
{
DateLUTImpl::Time got_time = getTime(value, transform);
LOG_DEBUG(&Poco::Logger::get("Get field"), "Time: {}", got_time);
return DateLUT::instance().toDayNum(got_time);
}
static Range
getPartitionRange(PartitionTransform partition_transform, UInt32 index, ColumnPtr partition_column, DataTypePtr column_data_type)
{
if (partition_transform == PartitionTransform::Year || partition_transform == PartitionTransform::Month
|| partition_transform == PartitionTransform::Day)
{
auto column = dynamic_cast<const ColumnNullable *>(partition_column.get())->getNestedColumnPtr();
const auto * casted_innner_column = assert_cast<const ColumnInt32 *>(column.get());
Int32 value = static_cast<Int32>(casted_innner_column->getInt(index));
LOG_DEBUG(&Poco::Logger::get("Partition"), "Partition value: {}, transform: {}", value, partition_transform);
auto nested_data_type = dynamic_cast<const DataTypeNullable *>(column_data_type.get())->getNestedType();
if (WhichDataType(nested_data_type).isDate())
{
const UInt16 begin_range_value = getDay(value, partition_transform);
const UInt16 end_range_value = getDay(value + 1, partition_transform);
return Range{begin_range_value, true, end_range_value, false};
}
else if (WhichDataType(nested_data_type).isDateTime64())
{
const UInt64 begin_range_value = getTime(value, partition_transform);
const UInt64 end_range_value = getTime(value + 1, partition_transform);
return Range{begin_range_value, true, end_range_value, false};
}
else
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Partition transform {} is not supported for the type: {}",
partition_transform,
nested_data_type);
}
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported partition transform: {}", partition_transform);
}
}
std::unordered_map<String, CommonPartitionInfo> common_partition_info_by_manifest_file;
std::map<std::pair<String, Int32>, SpecificSchemaPartitionInfo> specific_partition_info_by_manifest_file_and_schema;
std::vector<CommonPartitionInfo> common_partition_infos;
};
class IcebergMetadata : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "Iceberg";
enum class PartitionTransform
{
Year,
Month,
Day,
Hour,
Unsupported
};
IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
ContextPtr context_,
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
NamesAndTypesList schema_);
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file)
Strings getDataFiles() const override;
/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override { return schema; }
const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; }
const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; }
bool operator==(const IDataLakeMetadata & other) const override
{
const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other);
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
}
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
Strings makePartitionPruning(const ActionsDAG & filter_dag);
private:
size_t getVersion() const { return metadata_version; }
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
Int32 metadata_version;
Int32 format_version;
String manifest_list_file;
Int32 current_schema_id;
NamesAndTypesList schema;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
LoggerPtr log;
std::unordered_map<Int32, NameAndTypePair> name_and_type_by_source_id;
std::vector<std::pair<String, Int32>> manifest_files_with_start_index;
mutable Strings data_files;
mutable Strings manifest_files;
PartitionPruningProcessor pruning_processor;
};
}
#endif

View File

@ -0,0 +1,136 @@
#include <exception>
#include "Common/DateLUT.h"
#include "Core/NamesAndTypes.h"
#include "config.h"
#if USE_AVRO
# include <Columns/ColumnString.h>
# include <Columns/ColumnTuple.h>
# include <Columns/IColumn.h>
# include <Core/Settings.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypeDate.h>
# include <DataTypes/DataTypeDateTime64.h>
# include <DataTypes/DataTypeFactory.h>
# include <DataTypes/DataTypeFixedString.h>
# include <DataTypes/DataTypeMap.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypeTuple.h>
# include <DataTypes/DataTypeUUID.h>
# include <DataTypes/DataTypesDecimal.h>
# include <DataTypes/DataTypesNumber.h>
# include <Formats/FormatFactory.h>
# include <IO/ReadBufferFromFileBase.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <Processors/Formats/Impl/AvroRowInputFormat.h>
# include <Storages/ObjectStorage/DataLakes/Common.h>
# include <Storages/ObjectStorage/StorageObjectStorageSource.h>
# include <Common/logger_useful.h>
# include <Poco/JSON/Array.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <DataFile.hh>
// # include <filesystem>
# include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
namespace DB
{
CommonPartitionInfo
PartitionPruningProcessor::getCommonPartitionInfo(Poco::JSON::Array::Ptr partition_specification, const ColumnTuple * big_partition_tuple)
{
CommonPartitionInfo common_info;
for (size_t i = 0; i != partition_specification->size(); ++i)
{
auto current_field = partition_specification->getObject(static_cast<UInt32>(i));
auto source_id = current_field->getValue<Int32>("source-id");
PartitionTransform transform = getTransform(current_field->getValue<String>("transform"));
if (transform == PartitionTransform::Unsupported)
{
continue;
}
auto partition_name = current_field->getValue<String>("name");
LOG_DEBUG(&Poco::Logger::get("Partition Spec"), "Name: {}", partition_name);
common_info.partition_columns.push_back(big_partition_tuple->getColumnPtr(i));
common_info.partition_transforms.push_back(transform);
common_info.partition_source_ids.push_back(source_id);
}
return common_info;
}
SpecificSchemaPartitionInfo PartitionPruningProcessor::getSpecificPartitionPruning(
const CommonPartitionInfo & common_info,
[[maybe_unused]] Int32 schema_version,
const std::unordered_map<Int32, NameAndTypePair> & name_and_type_by_source_id)
{
SpecificSchemaPartitionInfo specific_info;
for (size_t i = 0; i < common_info.partition_columns.size(); ++i)
{
Int32 source_id = common_info.partition_source_ids[i];
auto it = name_and_type_by_source_id.find(source_id);
if (it == name_and_type_by_source_id.end())
{
continue;
}
size_t column_size = common_info.partition_columns[i]->size();
if (specific_info.ranges.empty())
{
specific_info.ranges.resize(column_size);
}
else
{
assert(specific_info.ranges.size() == column_size);
}
NameAndTypePair name_and_type = it->second;
specific_info.partition_names_and_types.push_back(name_and_type);
for (size_t j = 0; j < column_size; ++j)
{
specific_info.ranges[j].push_back(getPartitionRange(
common_info.partition_transforms[i], static_cast<UInt32>(j), common_info.partition_columns[i], name_and_type.type));
}
}
return specific_info;
}
std::vector<bool> PartitionPruningProcessor::getPruningMask(
const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context)
{
std::vector<bool> pruning_mask;
if (!specific_info.partition_names_and_types.empty())
{
ExpressionActionsPtr partition_minmax_idx_expr = std::make_shared<ExpressionActions>(
ActionsDAG(specific_info.partition_names_and_types), ExpressionActionsSettings::fromContext(context));
const KeyCondition partition_key_condition(
filter_dag, context, specific_info.partition_names_and_types.getNames(), partition_minmax_idx_expr);
for (size_t j = 0; j < specific_info.ranges.size(); ++j)
{
if (!partition_key_condition.checkInHyperrectangle(specific_info.ranges[j], specific_info.partition_names_and_types.getTypes())
.can_be_true)
{
LOG_DEBUG(&Poco::Logger::get("Partition pruning"), "Partition pruning was successful for file: {}", j);
pruning_mask.push_back(false);
}
else
{
LOG_DEBUG(&Poco::Logger::get("Partition pruning"), "Partition pruning failed for file: {}", j);
pruning_mask.push_back(true);
}
}
}
return pruning_mask;
}
}
#endif

View File

@ -1,117 +0,0 @@
#pragma once
#include "config.h"
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
namespace DB
{
/**
* Useful links:
* - https://iceberg.apache.org/spec/
*
* Iceberg has two format versions, v1 and v2. The content of metadata files depends on the version.
*
* Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`.
* Metadata file - json file.
* Manifest list an Avro file that lists manifest files; one per snapshot.
* Manifest file an Avro file that lists data or delete files; a subset of a snapshot.
* All changes to table state create a new metadata file and replace the old metadata with an atomic swap.
*
* In order to find out which data files to read, we need to find the `manifest list`
* which corresponds to the latest snapshot. We find it by checking a list of snapshots
* in metadata's "snapshots" section.
*
* Example of metadata.json file.
* {
* "format-version" : 1,
* "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
* "location" : "/iceberg_data/db/table_name",
* "last-updated-ms" : 1680206743150,
* "last-column-id" : 2,
* "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
* "current-schema-id" : 0,
* "schemas" : [ ],
* ...
* "current-snapshot-id" : 2819310504515118887,
* "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } },
* "snapshots" : [ {
* "snapshot-id" : 2819310504515118887,
* "timestamp-ms" : 1680206743150,
* "summary" : {
* "operation" : "append", "spark.app.id" : "local-1680206733239",
* "added-data-files" : "1", "added-records" : "100",
* "added-files-size" : "1070", "changed-partition-count" : "1",
* "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
* "total-position-deletes" : "0", "total-equality-deletes" : "0"
* },
* "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
* "schema-id" : 0
* } ],
* "statistics" : [ ],
* "snapshot-log" : [ ... ],
* "metadata-log" : [ ]
* }
*/
class IcebergMetadata : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
static constexpr auto name = "Iceberg";
IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
ContextPtr context_,
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
NamesAndTypesList schema_);
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file)
Strings getDataFiles() const override;
/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override { return schema; }
const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; }
const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; }
bool operator ==(const IDataLakeMetadata & other) const override
{
const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other);
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
}
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
private:
size_t getVersion() const { return metadata_version; }
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
Int32 metadata_version;
Int32 format_version;
String manifest_list_file;
Int32 current_schema_id;
NamesAndTypesList schema;
mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
LoggerPtr log;
};
}
#endif