To checkout

This commit is contained in:
divanik 2024-11-21 12:46:19 +00:00
parent f76df37cfa
commit 56ab216e63
8 changed files with 139 additions and 39 deletions

View File

@ -56,6 +56,14 @@ public:
return std::nullopt;
}
void implementPartitionPruning(const ActionsDAG & filter_dag) override
{
if (!current_metadata || !current_metadata->supportsPartitionPruning())
return;
BaseStorageConfiguration::setPaths(current_metadata->makePartitionPruning(filter_dag));
}
private:
DataLakeMetadataPtr current_metadata;

View File

@ -2,6 +2,7 @@
#include <boost/noncopyable.hpp>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include "Interpreters/ActionsDAG.h"
#include "PartitionColumns.h"
namespace DB
@ -16,6 +17,8 @@ public:
virtual bool operator==(const IDataLakeMetadata & other) const = 0;
virtual const DataLakePartitionColumns & getPartitionColumns() const = 0;
virtual const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const = 0;
virtual bool supportsPartitionPruning() { return false; }
virtual Strings makePartitionPruning(const ActionsDAG &) { return {}; }
};
using DataLakeMetadataPtr = std::unique_ptr<IDataLakeMetadata>;

View File

@ -75,21 +75,6 @@ IcebergMetadata::IcebergMetadata(
namespace
{
enum class ManifestEntryStatus : uint8_t
{
EXISTING = 0,
ADDED = 1,
DELETED = 2,
};
enum class DataFileContent : uint8_t
{
DATA = 0,
POSITION_DELETES = 1,
EQUALITY_DELETES = 2,
};
/**
* Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types):
* - Primitive types:
@ -309,7 +294,7 @@ parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_ver
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well.
if (!ignore_schema_evolution && metadata_object->has("schemas")
&& metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1§)
&& metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
@ -645,8 +630,27 @@ Strings IcebergMetadata::getDataFiles() const
data_files.push_back(file_path);
}
}
ColumnPtr big_partition_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("partition"));
if (big_partition_column->getDataType() != TypeIndex::Tuple)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
columns.at(1)->getFamilyName());
}
const auto * big_partition_tuple = assert_cast<const ColumnTuple *>(big_partition_column.get());
std::vector<uint8_t> partition_spec_json_bytes = avro_metadata["partition-spec"];
String partition_spec_json_string
= String(reinterpret_cast<char *>(partition_spec_json_bytes.data()), partition_spec_json_bytes.size());
Poco::Dynamic::Var partition_spec_json = parser.parse(partition_spec_json_string);
const Poco::JSON::Array::Ptr & partition_spec = partition_spec_json.extract<Poco::JSON::Array::Ptr>();
common_partition_infos.push_back(pruning_processor.getCommonPartitionInfo(partition_spec, big_partition_tuple));
specific_partition_infos.push_back(
pruning_processor.getSpecificPartitionInfo(common_partition_infos.back(), current_schema_id, name_and_type_by_source_id));
}
return data_files;
return pruning_processor.getDataFiles(
common_partition_infos, specific_partition_infos, nullptr, getContext(), configuration_ptr->getPath());
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include "Columns/IColumn.h"
#include "Core/NamesAndTypes.h"
#include "DataTypes/DataTypeNullable.h"
#include "config.h"
@ -15,6 +16,7 @@
namespace DB
{
/**
* Useful links:
* - https://iceberg.apache.org/spec/
@ -72,11 +74,28 @@ enum class PartitionTransform
Unsupported
};
enum class ManifestEntryStatus : uint8_t
{
EXISTING = 0,
ADDED = 1,
DELETED = 2,
};
enum class DataFileContent : uint8_t
{
DATA = 0,
POSITION_DELETES = 1,
EQUALITY_DELETES = 2,
};
struct CommonPartitionInfo
{
std::vector<ColumnPtr> partition_columns;
std::vector<PartitionTransform> partition_transforms;
std::vector<Int32> partition_source_ids;
ColumnPtr file_path_column;
ColumnPtr status_column;
};
struct SpecificSchemaPartitionInfo
@ -88,16 +107,23 @@ struct SpecificSchemaPartitionInfo
class PartitionPruningProcessor
{
public:
CommonPartitionInfo addCommonPartitionInfo(Poco::JSON::Array::Ptr partition_specification, const ColumnTuple * big_partition_tuple);
CommonPartitionInfo
getCommonPartitionInfo(const Poco::JSON::Array::Ptr & partition_specification, const ColumnTuple * big_partition_tuple) const;
SpecificSchemaPartitionInfo getSpecificPartitionPruning(
SpecificSchemaPartitionInfo getSpecificPartitionInfo(
const CommonPartitionInfo & common_info,
Int32 schema_version,
const std::unordered_map<Int32, NameAndTypePair> & name_and_type_by_source_id);
const std::unordered_map<Int32, NameAndTypePair> & name_and_type_by_source_id) const;
std::vector<bool> getPruningMask(const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context);
std::vector<bool>
getPruningMask(const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context) const;
std::vector<bool> getAllFilesMask(const ActionsDAG * filter_dag, ContextPtr context);
Strings getDataFiles(
const std::vector<CommonPartitionInfo> & manifest_partitions_infos,
const std::vector<SpecificSchemaPartitionInfo> & specific_infos,
const ActionsDAG * filter_dag,
ContextPtr context,
const std::string & common_path) const;
private:
static PartitionTransform getTransform(const String & transform_name)
@ -147,7 +173,7 @@ private:
static Int16 getDay(Int32 value, PartitionTransform transform)
{
DateLUTImpl::Time got_time = getTime(value, transform);
LOG_DEBUG(&Poco::Logger::get("Get field"), "Time: {}", got_time);
// LOG_DEBUG(&Poco::Logger::get("Get field"), "Time: {}", got_time);
return DateLUT::instance().toDayNum(got_time);
}
@ -188,11 +214,6 @@ private:
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported partition transform: {}", partition_transform);
}
}
std::unordered_map<String, CommonPartitionInfo> common_partition_info_by_manifest_file;
std::map<std::pair<String, Int32>, SpecificSchemaPartitionInfo> specific_partition_info_by_manifest_file_and_schema;
std::vector<CommonPartitionInfo> common_partition_infos;
};
@ -241,7 +262,18 @@ public:
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
Strings makePartitionPruning(const ActionsDAG & filter_dag);
Strings makePartitionPruning(const ActionsDAG & filter_dag) override
{
auto configuration_ptr = configuration.lock();
if (!configuration_ptr)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is expired");
}
return pruning_processor.getDataFiles(
common_partition_infos, specific_partition_infos, &filter_dag, getContext(), configuration_ptr->getPath());
}
bool supportsPartitionPruning() override { return true; }
private:
size_t getVersion() const { return metadata_version; }
@ -262,9 +294,12 @@ private:
std::vector<std::pair<String, Int32>> manifest_files_with_start_index;
mutable Strings data_files;
std::vector<CommonPartitionInfo> partition_infos;
mutable Strings manifest_files;
PartitionPruningProcessor pruning_processor;
mutable std::vector<CommonPartitionInfo> common_partition_infos;
mutable std::vector<SpecificSchemaPartitionInfo> specific_partition_infos;
};
}

View File

@ -35,18 +35,18 @@
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <DataFile.hh>
// # include <filesystem>
# include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
# include <DataFile.hh>
namespace DB
{
CommonPartitionInfo
PartitionPruningProcessor::getCommonPartitionInfo(Poco::JSON::Array::Ptr partition_specification, const ColumnTuple * big_partition_tuple)
CommonPartitionInfo PartitionPruningProcessor::getCommonPartitionInfo(
const Poco::JSON::Array::Ptr & partition_specification, const ColumnTuple * data_file_tuple_column) const
{
CommonPartitionInfo common_info;
ColumnPtr big_partition_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("partition"));
common_info.file_path_column = big_partition_column->getColumnPtr(0);
for (size_t i = 0; i != partition_specification->size(); ++i)
{
auto current_field = partition_specification->getObject(static_cast<UInt32>(i));
@ -61,17 +61,17 @@ PartitionPruningProcessor::getCommonPartitionInfo(Poco::JSON::Array::Ptr partiti
auto partition_name = current_field->getValue<String>("name");
LOG_DEBUG(&Poco::Logger::get("Partition Spec"), "Name: {}", partition_name);
common_info.partition_columns.push_back(big_partition_tuple->getColumnPtr(i));
common_info.partition_columns.push_back(big_partition_column->getColumnPtr(i));
common_info.partition_transforms.push_back(transform);
common_info.partition_source_ids.push_back(source_id);
}
return common_info;
}
SpecificSchemaPartitionInfo PartitionPruningProcessor::getSpecificPartitionPruning(
SpecificSchemaPartitionInfo PartitionPruningProcessor::getSpecificPartitionInfo(
const CommonPartitionInfo & common_info,
[[maybe_unused]] Int32 schema_version,
const std::unordered_map<Int32, NameAndTypePair> & name_and_type_by_source_id)
const std::unordered_map<Int32, NameAndTypePair> & name_and_type_by_source_id) const
{
SpecificSchemaPartitionInfo specific_info;
@ -104,7 +104,7 @@ SpecificSchemaPartitionInfo PartitionPruningProcessor::getSpecificPartitionPruni
}
std::vector<bool> PartitionPruningProcessor::getPruningMask(
const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context)
const SpecificSchemaPartitionInfo & specific_info, const ActionsDAG * filter_dag, ContextPtr context) const
{
std::vector<bool> pruning_mask;
if (!specific_info.partition_names_and_types.empty())
@ -131,6 +131,47 @@ std::vector<bool> PartitionPruningProcessor::getPruningMask(
return pruning_mask;
}
Strings PartitionPruningProcessor::getDataFiles(
const std::vector<CommonPartitionInfo> & manifest_partitions_infos,
const std::vector<SpecificSchemaPartitionInfo> & specific_infos,
const ActionsDAG * filter_dag,
ContextPtr context,
const std::string & common_path) const
{
Strings data_files;
for (size_t index = 0; index < manifest_partitions_infos.size(); ++index)
{
const auto & manifest_partition_info = manifest_partitions_infos[index];
const auto & specific_partition_info = specific_infos[index];
size_t number_of_files_in_manifest = manifest_partition_info.file_path_column->size();
LOG_DEBUG(&Poco::Logger::get("Partition pruning"), "Filter dag is null: {}", filter_dag == nullptr);
auto pruning_mask = filter_dag ? getPruningMask(specific_partition_info, filter_dag, context) : std::vector<bool>{};
for (size_t i = 0; i < number_of_files_in_manifest; ++i)
{
if (!filter_dag || pruning_mask[i])
{
const auto status = manifest_partition_info.status_column->getInt(i);
const auto data_path = std::string(manifest_partition_info.file_path_column->getDataAt(i).toView());
const auto pos = data_path.find(common_path);
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", common_path, data_path);
const auto file_path = data_path.substr(pos);
if (ManifestEntryStatus(status) == ManifestEntryStatus::DELETED)
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
}
else
{
data_files.push_back(file_path);
}
}
}
}
return data_files;
}
}
#endif

View File

@ -191,7 +191,10 @@ public:
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
{
configuration->implementPartitionPruning(*filter_actions_dag);
predicate = filter_actions_dag->getOutputs().at(0);
}
createIterator(predicate);
}

View File

@ -204,6 +204,8 @@ public:
virtual bool isDataLakeConfiguration() const { return false; }
virtual void implementPartitionPruning(const ActionsDAG &) { }
virtual ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage,
const Strings & requested_columns,

View File

@ -91,6 +91,10 @@ StorageObjectStorageSource::~StorageObjectStorageSource()
void StorageObjectStorageSource::setKeyCondition(const std::optional<ActionsDAG> & filter_actions_dag, ContextPtr context_)
{
setKeyConditionImpl(filter_actions_dag, context_, read_from_format_info.format_header);
if (filter_actions_dag.has_value())
{
configuration->implementPartitionPruning(filter_actions_dag.value());
}
}
std::string StorageObjectStorageSource::getUniqueStoragePathIdentifier(