Support reading partitioed DeltaLake columns

This commit is contained in:
kssenii 2024-04-30 10:44:25 +02:00
parent 4701429db4
commit fb4a230eee
17 changed files with 534 additions and 104 deletions

View File

@ -125,7 +125,7 @@ void Chunk::addColumn(size_t position, ColumnPtr column)
if (position >= columns.size())
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND,
"Position {} out of bound in Chunk::addColumn(), max position = {}",
position, columns.size() - 1);
position, columns.size() ? columns.size() - 1 : 0);
if (empty())
num_rows = column->size();
else if (column->size() != num_rows)
@ -143,7 +143,7 @@ void Chunk::erase(size_t position)
if (position >= columns.size())
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::erase(), max position = {}",
toString(position), toString(columns.size() - 1));
toString(position), toString(columns.size() ? columns.size() - 1 : 0));
columns.erase(columns.begin() + position);
}

View File

@ -17,6 +17,22 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <parquet/arrow/reader.h>
#include <ranges>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
namespace fs = std::filesystem;
@ -26,6 +42,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -65,9 +82,17 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
* An action changes one aspect of the table's state, for example, adding or removing a file.
* Note: it is not a valid json, but a list of json's, so we read it in a while cycle.
*/
std::set<String> processMetadataFiles(const Configuration & configuration, ContextPtr context)
struct DeltaLakeMetadata
{
NamesAndTypesList schema;
Strings data_files;
DataLakePartitionColumns partition_columns;
};
DeltaLakeMetadata processMetadataFiles(const Configuration & configuration, ContextPtr context)
{
std::set<String> result_files;
NamesAndTypesList current_schema;
DataLakePartitionColumns current_partition_columns;
const auto checkpoint_version = getCheckpointIfExists(result_files, configuration, context);
if (checkpoint_version)
@ -81,7 +106,7 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
if (!MetadataReadHelper::exists(file_path, configuration))
break;
processMetadataFile(file_path, result_files, configuration, context);
processMetadataFile(file_path, result_files, current_schema, current_partition_columns, configuration, context);
}
LOG_TRACE(
@ -94,10 +119,10 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
configuration, deltalake_metadata_directory, metadata_file_suffix);
for (const String & key : keys)
processMetadataFile(key, result_files, configuration, context);
processMetadataFile(key, result_files, current_schema, current_partition_columns, configuration, context);
}
return result_files;
return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns};
}
/**
@ -132,6 +157,8 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
void processMetadataFile(
const String & key,
std::set<String> & result,
NamesAndTypesList & file_schema,
DataLakePartitionColumns & file_partition_columns,
const Configuration & configuration,
ContextPtr context)
{
@ -153,19 +180,216 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
if (json_str.empty())
continue;
const JSON json(json_str);
if (json.has("add"))
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
if (object->has("add"))
{
const auto path = json["add"]["path"].getString();
auto add_object = object->get("add").extract<Poco::JSON::Object::Ptr>();
auto path = add_object->getValue<String>("path");
result.insert(fs::path(configuration.getPath()) / path);
}
else if (json.has("remove"))
auto filename = fs::path(path).filename().string();
auto it = file_partition_columns.find(filename);
if (it == file_partition_columns.end())
{
const auto path = json["remove"]["path"].getString();
auto partition_values = add_object->get("partitionValues").extract<Poco::JSON::Object::Ptr>();
if (partition_values->size())
{
auto & current_partition_columns = file_partition_columns[filename];
for (const auto & name : partition_values->getNames())
{
const auto value = partition_values->getValue<String>(name);
auto name_and_type = file_schema.tryGetByName(name);
if (!name_and_type)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such column in schema: {}", name);
auto field = getFieldValue(value, name_and_type->type);
current_partition_columns.emplace_back(*name_and_type, field);
LOG_TEST(log, "Partition {} value is {} (for {})", name, value, filename);
}
}
}
}
else if (object->has("remove"))
{
auto path = object->get("remove").extract<Poco::JSON::Object::Ptr>()->getValue<String>("path");
result.erase(fs::path(configuration.getPath()) / path);
}
if (file_schema.empty())
{
// std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
// object->stringify(oss);
// LOG_TEST(log, "Metadata: {}", oss.str());
if (object->has("metaData"))
{
const auto metadata_object = object->get("metaData").extract<Poco::JSON::Object::Ptr>();
const auto schema_object = metadata_object->getValue<String>("schemaString");
Poco::JSON::Parser p;
Poco::Dynamic::Var fields_json = parser.parse(schema_object);
Poco::JSON::Object::Ptr fields_object = fields_json.extract<Poco::JSON::Object::Ptr>();
const auto fields = fields_object->get("fields").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i < fields->size(); ++i)
{
const auto field = fields->getObject(static_cast<UInt32>(i));
auto name = field->getValue<String>("name");
auto type = field->getValue<String>("type");
auto is_nullable = field->getValue<bool>("nullable");
file_schema.push_back({name, getFieldType(field, "type", is_nullable)});
}
}
}
/// TODO: Check if schema in each file is the same?
}
}
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool is_nullable)
{
if (field->isObject(type_key))
return getComplexTypeFromObject(field->getObject(type_key));
auto type = field->get(type_key);
if (type.isString())
{
const String & type_name = type.extract<String>();
auto data_type = getSimpleTypeByName(type_name);
return is_nullable ? makeNullable(data_type) : data_type;
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
}
Field getFieldValue(const String & value, DataTypePtr data_type)
{
DataTypePtr check_type;
if (data_type->isNullable())
check_type = static_cast<const DataTypeNullable *>(data_type.get())->getNestedType();
else
check_type = data_type;
WhichDataType which(check_type->getTypeId());
if (which.isStringOrFixedString())
return value;
else if (which.isInt8())
return parse<Int8>(value);
else if (which.isInt16())
return parse<Int16>(value);
else if (which.isInt32())
return parse<Int32>(value);
else if (which.isInt64())
return parse<Int64>(value);
else if (which.isFloat32())
return parse<Float32>(value);
else if (which.isFloat64())
return parse<Float64>(value);
else if (which.isDate())
return UInt16{LocalDate{std::string(value)}.getDayNum()};
else if (which.isDate32())
return Int32{LocalDate{std::string(value)}.getExtenedDayNum()};
else if (which.isDateTime64())
{
ReadBufferFromString in(value);
DateTime64 time = 0;
readDateTime64Text(time, 6, in, assert_cast<const DataTypeDateTime64 *>(data_type.get())->getTimeZone());
return time;
}
// else if (which.isDecimal32())
// return parse<Decimal32>(value);
// else if (which.isDecimal64())
// return parse<Decimal64>(value);
// else if (which.isDecimal128())
// return parse<Decimal128>(value);
// else if (which.isDecimal256())
// return parse<Decimal256>(value);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type for {}", check_type->getColumnType());
}
DataTypePtr getSimpleTypeByName(const String & type_name)
{
/// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types
if (type_name == "string" || type_name == "binary")
return std::make_shared<DataTypeString>();
if (type_name == "long")
return std::make_shared<DataTypeInt64>();
if (type_name == "integer")
return std::make_shared<DataTypeInt32>();
if (type_name == "short")
return std::make_shared<DataTypeInt16>();
if (type_name == "byte")
return std::make_shared<DataTypeInt8>();
if (type_name == "float")
return std::make_shared<DataTypeFloat32>();
if (type_name == "double")
return std::make_shared<DataTypeFloat64>();
if (type_name == "boolean")
return DataTypeFactory::instance().get("Bool");
if (type_name == "date")
return std::make_shared<DataTypeDate32>();
if (type_name == "timestamp")
return std::make_shared<DataTypeDateTime64>(6);
if (type_name.starts_with("decimal(") && type_name.ends_with(')'))
{
ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1));
size_t precision;
size_t scale;
readIntText(precision, buf);
skipWhitespaceIfAny(buf);
assertChar(',', buf);
skipWhitespaceIfAny(buf);
tryReadIntText(scale, buf);
return createDecimal<DataTypeDecimal>(precision, scale);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type: {}", type_name);
}
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
{
String type_name = type->getValue<String>("type");
if (type_name == "struct")
{
DataTypes element_types;
Names element_names;
auto fields = type->get("fields").extract<Poco::JSON::Array::Ptr>();
element_types.reserve(fields->size());
element_names.reserve(fields->size());
for (size_t i = 0; i != fields->size(); ++i)
{
auto field = fields->getObject(static_cast<Int32>(i));
element_names.push_back(field->getValue<String>("name"));
auto required = field->getValue<bool>("required");
element_types.push_back(getFieldType(field, "type", required));
}
return std::make_shared<DataTypeTuple>(element_types, element_names);
}
if (type_name == "array")
{
bool is_nullable = type->getValue<bool>("containsNull");
auto element_type = getFieldType(type, "elementType", is_nullable);
return std::make_shared<DataTypeArray>(element_type);
}
if (type_name == "map")
{
bool is_nullable = type->getValue<bool>("containsNull");
auto key_type = getFieldType(type, "keyType", /* is_nullable */false);
auto value_type = getFieldType(type, "valueType", is_nullable);
return std::make_shared<DataTypeMap>(key_type, value_type);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type: {}", type_name);
}
/**
* Checkpoints in delta-lake are created each 10 commits by default.
@ -272,8 +496,8 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
arrow::default_memory_pool(),
&reader));
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(reader->GetSchema(&schema));
std::shared_ptr<arrow::Schema> file_schema;
THROW_ARROW_NOT_OK(reader->GetSchema(&file_schema));
ArrowColumnToCHColumn column_reader(
header, "Parquet",
@ -318,20 +542,19 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
template <typename Configuration, typename MetadataReadHelper>
DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::DeltaLakeMetadataParser() : impl(std::make_unique<Impl>())
{
}
template <typename Configuration, typename MetadataReadHelper>
Strings DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr context)
DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::DeltaLakeMetadataParser(const Configuration & configuration, ContextPtr context)
: impl(std::make_unique<Impl>())
{
auto result = impl->processMetadataFiles(configuration, context);
return Strings(result.begin(), result.end());
data_files = result.data_files;
schema = result.schema;
partition_columns = result.partition_columns;
LOG_TRACE(impl->log, "Found {} data files, {} partition files, schema: {}",
data_files.size(), partition_columns.size(), schema.toString());
}
template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser();
template Strings DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(
const StorageS3::Configuration & configuration, ContextPtr);
template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser(const StorageS3::Configuration & configuration, ContextPtr context);
}
#endif

View File

@ -1,7 +1,9 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
#include "PartitionColumns.h"
namespace DB
{
@ -10,13 +12,20 @@ template <typename Configuration, typename MetadataReadHelper>
struct DeltaLakeMetadataParser
{
public:
DeltaLakeMetadataParser<Configuration, MetadataReadHelper>();
DeltaLakeMetadataParser<Configuration, MetadataReadHelper>(const Configuration & configuration, ContextPtr context);
Strings getFiles(const Configuration & configuration, ContextPtr context);
Strings getFiles() { return data_files; }
NamesAndTypesList getTableSchema() const { return schema; }
DataLakePartitionColumns getPartitionColumns() const { return partition_columns; }
private:
struct Impl;
std::shared_ptr<Impl> impl;
NamesAndTypesList schema;
DataLakePartitionColumns partition_columns;
Strings data_files;
};
}

View File

@ -61,7 +61,7 @@ struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl
String key;
UInt64 timestamp = 0;
};
std::unordered_map<Partition, std::unordered_map<FileID, FileInfo>> data_files;
std::unordered_map<Partition, std::unordered_map<FileID, FileInfo>> files;
for (const auto & key : keys)
{
@ -76,7 +76,7 @@ struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl
const auto & file_id = file_parts[0];
const auto timestamp = parse<UInt64>(file_parts[2]);
auto & file_info = data_files[partition][file_id];
auto & file_info = files[partition][file_id];
if (file_info.timestamp == 0 || file_info.timestamp < timestamp)
{
file_info.key = std::move(key);
@ -85,7 +85,7 @@ struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl
}
Strings result;
for (auto & [partition, partition_data] : data_files)
for (auto & [partition, partition_data] : files)
{
LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size());
for (auto & [file_id, file_data] : partition_data)
@ -97,19 +97,12 @@ struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl
template <typename Configuration, typename MetadataReadHelper>
HudiMetadataParser<Configuration, MetadataReadHelper>::HudiMetadataParser() : impl(std::make_unique<Impl>())
HudiMetadataParser<Configuration, MetadataReadHelper>::HudiMetadataParser(const Configuration & configuration, ContextPtr) : impl(std::make_unique<Impl>())
{
data_files = impl->processMetadataFiles(configuration);
}
template <typename Configuration, typename MetadataReadHelper>
Strings HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles(const Configuration & configuration, ContextPtr)
{
return impl->processMetadataFiles(configuration);
}
template HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::HudiMetadataParser();
template Strings HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles(
const StorageS3::Configuration & configuration, ContextPtr);
template HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::HudiMetadataParser(const StorageS3::Configuration & configuration, ContextPtr);
}

View File

@ -1,7 +1,9 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
#include "PartitionColumns.h"
namespace DB
{
@ -10,13 +12,18 @@ template <typename Configuration, typename MetadataReadHelper>
struct HudiMetadataParser
{
public:
HudiMetadataParser<Configuration, MetadataReadHelper>();
HudiMetadataParser<Configuration, MetadataReadHelper>(const Configuration & configuration, ContextPtr context);
Strings getFiles(const Configuration & configuration, ContextPtr context);
Strings getFiles() { return data_files; }
NamesAndTypesList getTableSchema() const { return {}; }
DataLakePartitionColumns getPartitionColumns() const { return {}; }
private:
struct Impl;
std::shared_ptr<Impl> impl;
Strings data_files;
};
}

View File

@ -9,6 +9,7 @@
#include <Databases/LoadingStrictnessLevel.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
#include "PartitionColumns.h"
#include <filesystem>
@ -23,15 +24,51 @@ public:
using Configuration = typename Storage::Configuration;
template <class ...Args>
explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, LoadingStrictnessLevel mode, Args && ...args)
: Storage(getConfigurationForDataRead(configuration_, context_, {}, mode), context_, std::forward<Args>(args)...)
, base_configuration(configuration_)
, log(getLogger(getName())) {} // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
static StoragePtr create(
const Configuration & configuration_,
ContextPtr context_,
LoadingStrictnessLevel mode,
const ColumnsDescription & columns_,
Args && ...args)
{
std::unique_ptr<MetadataParser> metadata;
Configuration read_configuration;
Configuration base_configuration{configuration_};
try
{
base_configuration.update(context_);
metadata = std::make_unique<MetadataParser>(base_configuration, context_);
read_configuration = getConfigurationForDataRead(*metadata, base_configuration, context_);
}
catch (...)
{
if (mode <= LoadingStrictnessLevel::CREATE)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return std::make_shared<IStorageDataLake<Storage, Name, MetadataParser>>(
configuration_,
read_configuration,
context_,
columns_.empty() && metadata ? ColumnsDescription(metadata->getTableSchema()) : columns_,
std::forward<Args>(args)...);
}
template <class ...Args>
static StoragePtr create(const Configuration & configuration_, ContextPtr context_, LoadingStrictnessLevel mode, Args && ...args)
explicit IStorageDataLake(
const Configuration & base_configuration_,
const Configuration & read_configuration_,
ContextPtr context_,
const ColumnsDescription & columns_,
Args && ...args)
: Storage(read_configuration_,
context_,
columns_,
std::forward<Args>(args)...)
, base_configuration(base_configuration_)
, log(getLogger(getName())) // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
{
return std::make_shared<IStorageDataLake<Storage, Name, MetadataParser>>(configuration_, context_, mode, std::forward<Args>(args)...);
}
String getName() const override { return name; }
@ -41,8 +78,18 @@ public:
const std::optional<FormatSettings> & format_settings,
const ContextPtr & local_context)
{
auto configuration = getConfigurationForDataRead(base_configuration, local_context);
return Storage::getTableStructureFromData(configuration, format_settings, local_context);
base_configuration.update(local_context);
auto metadata = std::make_unique<MetadataParser>(base_configuration, local_context);
auto schema = metadata->getTableSchema();
if (!schema.empty())
{
return ColumnsDescription(schema);
}
else
{
auto read_configuration = getConfigurationForDataRead(*metadata, base_configuration, local_context);
return Storage::getTableStructureFromData(read_configuration, format_settings, local_context);
}
}
static Configuration getConfiguration(ASTs & engine_args, const ContextPtr & local_context)
@ -65,51 +112,31 @@ public:
private:
static Configuration getConfigurationForDataRead(
const Configuration & base_configuration, const ContextPtr & local_context, const Strings & keys = {},
LoadingStrictnessLevel mode = LoadingStrictnessLevel::CREATE)
MetadataParser & metadata_,
const Configuration & base_configuration,
const ContextPtr & local_context)
{
auto configuration{base_configuration};
configuration.update(local_context);
configuration.static_configuration = true;
try
{
if (keys.empty())
configuration.keys = getDataFiles(configuration, local_context);
else
configuration.keys = keys;
LOG_TRACE(
getLogger("DataLake"),
"New configuration path: {}, keys: {}",
configuration.getPath(), fmt::join(configuration.keys, ", "));
configuration.keys = metadata_.getFiles();
configuration.connect(local_context);
return configuration;
}
catch (...)
{
if (mode <= LoadingStrictnessLevel::CREATE)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
return configuration;
}
}
static Strings getDataFiles(const Configuration & configuration, const ContextPtr & local_context)
{
return MetadataParser().getFiles(configuration, local_context);
}
void updateConfigurationImpl(const ContextPtr & local_context)
{
const bool updated = base_configuration.update(local_context);
auto new_keys = getDataFiles(base_configuration, local_context);
auto metadata = MetadataParser(base_configuration, local_context);
auto new_keys = metadata.getFiles();
Storage::partition_columns = metadata.getPartitionColumns();
if (!updated && new_keys == Storage::getConfiguration().keys)
return;
Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys));
auto read_configuration = getConfigurationForDataRead(metadata, base_configuration, local_context);
Storage::useConfiguration(read_configuration);
}
Configuration base_configuration;
@ -127,7 +154,8 @@ static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args)
if (configuration.format == "auto")
configuration.format = "Parquet";
return DataLake::create(configuration, args.getContext(), args.mode, args.table_id, args.columns, args.constraints,
return DataLake::create(configuration, args.getContext(), args.mode,
args.columns, args.table_id, args.constraints,
args.comment, getFormatSettings(args.getContext()));
}

View File

@ -9,8 +9,8 @@ StoragePtr StorageIceberg::create(
const DB::StorageIceberg::Configuration & base_configuration,
DB::ContextPtr context_,
LoadingStrictnessLevel mode,
const DB::StorageID & table_id_,
const DB::ColumnsDescription & columns_,
const DB::StorageID & table_id_,
const DB::ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_)
@ -36,8 +36,8 @@ StoragePtr StorageIceberg::create(
std::move(metadata),
configuration,
context_,
table_id_,
columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_,
table_id_,
constraints_,
comment,
format_settings_);
@ -47,12 +47,12 @@ StorageIceberg::StorageIceberg(
std::unique_ptr<IcebergMetadata> metadata_,
const Configuration & configuration_,
ContextPtr context_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const StorageID & table_id_,
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_)
: StorageS3(configuration_, context_, table_id_, columns_, constraints_, comment, format_settings_)
: StorageS3(configuration_, context_, columns_, table_id_, constraints_, comment, format_settings_)
, current_metadata(std::move(metadata_))
, base_configuration(configuration_)
{

View File

@ -31,8 +31,8 @@ public:
static StoragePtr create(const Configuration & base_configuration,
ContextPtr context_,
LoadingStrictnessLevel mode,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const StorageID & table_id_,
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_);
@ -41,8 +41,8 @@ public:
std::unique_ptr<IcebergMetadata> metadata_,
const Configuration & configuration_,
ContextPtr context_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const StorageID & table_id_,
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_);

View File

@ -0,0 +1,17 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Core/Field.h>
namespace DB
{
struct DataLakePartitionColumn
{
NameAndTypePair name_and_type;
Field value;
};
/// Data file -> partition columns
using DataLakePartitionColumns = std::unordered_map<std::string, std::vector<DataLakePartitionColumn>>;
}

View File

@ -45,12 +45,16 @@ std::vector<String> S3DataLakeMetadataReadHelper::listFiles(
const auto & bucket = base_configuration.url.bucket;
const auto & client = base_configuration.client;
auto path = std::filesystem::path(table_path) / prefix;
std::vector<String> res;
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
request.SetBucket(bucket);
request.SetPrefix(std::filesystem::path(table_path) / prefix);
request.SetPrefix(path);
LOG_TEST(getLogger("S3DataLakeMetadataReadHelper"), "Listing files in {}", path.string());
bool is_finished{false};
while (!is_finished)
@ -69,6 +73,7 @@ std::vector<String> S3DataLakeMetadataReadHelper::listFiles(
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey();
LOG_TEST(getLogger("S3DataLakeMetadataReadHelper"), "Listed file: {} (searching for suffix: {})", filename, suffix);
if (filename.ends_with(suffix))
res.push_back(filename);
}

View File

@ -148,20 +148,23 @@ public:
const StorageSnapshotPtr & storage_snapshot_,
const ContextPtr & context_,
Block sample_block,
const StorageS3::Configuration & query_configuration_,
StorageS3 & storage_,
ReadFromFormatInfo read_from_format_info_,
bool need_only_count_,
size_t max_block_size_,
size_t num_streams_)
size_t num_streams_,
const DataLakePartitionColumns & partition_columns_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)}, column_names_, query_info_, storage_snapshot_, context_)
, column_names(column_names_)
, storage(storage_)
, read_from_format_info(std::move(read_from_format_info_))
, need_only_count(need_only_count_)
, query_configuration(query_configuration_)
, partition_columns(partition_columns_)
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
query_configuration = storage.updateConfigurationAndGetCopy(context);
virtual_columns = storage.getVirtualsList();
}
@ -172,6 +175,7 @@ private:
bool need_only_count;
StorageS3::Configuration query_configuration;
NamesAndTypesList virtual_columns;
DataLakePartitionColumns partition_columns;
size_t max_block_size;
size_t num_streams;
@ -577,7 +581,8 @@ StorageS3Source::StorageS3Source(
const String & url_host_and_port_,
std::shared_ptr<IIterator> file_iterator_,
const size_t max_parsing_threads_,
bool need_only_count_)
bool need_only_count_,
const DataLakePartitionColumns & partition_columns_)
: SourceWithKeyCondition(info.source_header, false)
, WithContext(context_)
, name(std::move(name_))
@ -593,6 +598,7 @@ StorageS3Source::StorageS3Source(
, client(client_)
, sample_block(info.format_header)
, format_settings(format_settings_)
, partition_columns(partition_columns_)
, requested_virtual_columns(info.requested_virtual_columns)
, file_iterator(file_iterator_)
, max_parsing_threads(max_parsing_threads_)
@ -798,8 +804,37 @@ Chunk StorageS3Source::generate()
size_t chunk_size = 0;
if (const auto * input_format = reader.getInputFormat())
chunk_size = reader.getInputFormat()->getApproxBytesReadForChunk();
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath(), reader.getFileSize());
if (!partition_columns.empty() && chunk_size && chunk.hasColumns())
{
auto filename = fs::path(reader.getPath()).filename().string();
auto partition_values = partition_columns.find(filename);
for (const auto & [name_and_type, value] : partition_values->second)
{
if (!sample_block.has(name_and_type.name))
continue;
auto column_pos = sample_block.getPositionByName(name_and_type.name);
const auto & type = name_and_type.type;
auto partition_column = type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst();
/// This column is filled with default value now, remove it.
chunk.erase(column_pos);
/// Add correct values.
if (chunk.hasColumns())
{
chunk.addColumn(column_pos, std::move(partition_column));
}
else
{
chunk.addColumn(std::move(partition_column));
}
}
}
return chunk;
}
@ -1072,8 +1107,8 @@ private:
StorageS3::StorageS3(
const Configuration & configuration_,
const ContextPtr & context_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const StorageID & table_id_,
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_,
@ -1190,17 +1225,20 @@ void StorageS3::read(
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
auto query_configuration = updateConfigurationAndGetCopy(local_context);
auto reading = std::make_unique<ReadFromStorageS3Step>(
column_names,
query_info,
storage_snapshot,
local_context,
read_from_format_info.source_header,
query_configuration,
*this,
std::move(read_from_format_info),
need_only_count,
max_block_size,
num_streams);
num_streams,
partition_columns);
query_plan.addStep(std::move(reading));
}
@ -1262,7 +1300,8 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
iterator_wrapper,
max_parsing_threads,
need_only_count);
need_only_count,
partition_columns);
source->setKeyCondition(filter_actions_dag, context);
pipes.emplace_back(std::move(source));
@ -1973,8 +2012,8 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
return std::make_shared<StorageS3>(
std::move(configuration),
args.getContext(),
args.table_id,
args.columns,
args.table_id,
args.constraints,
args.comment,
format_settings,

View File

@ -20,6 +20,7 @@
#include <Storages/StorageConfiguration.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/DataLakes/PartitionColumns.h>
#include <Poco/URI.h>
#include <Common/threadPoolCallbackRunner.h>
@ -141,7 +142,8 @@ public:
const String & url_host_and_port,
std::shared_ptr<IIterator> file_iterator_,
size_t max_parsing_threads,
bool need_only_count_);
bool need_only_count_,
const DataLakePartitionColumns & partition_columns_ = {});
~StorageS3Source() override;
@ -170,6 +172,7 @@ private:
std::shared_ptr<const S3::Client> client;
Block sample_block;
std::optional<FormatSettings> format_settings;
DataLakePartitionColumns partition_columns;
struct ReaderHolder
{
@ -305,8 +308,8 @@ public:
StorageS3(
const Configuration & configuration_,
const ContextPtr & context_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const StorageID & table_id_,
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_,
@ -363,6 +366,8 @@ protected:
const Configuration & getConfiguration();
mutable DataLakePartitionColumns partition_columns;
private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;

View File

@ -26,21 +26,28 @@ protected:
const ASTPtr & /*ast_function*/,
ContextPtr context,
const std::string & table_name,
ColumnsDescription /*cached_columns*/,
ColumnsDescription cached_columns,
bool /*is_insert_query*/) const override
{
ColumnsDescription columns;
if (TableFunction::configuration.structure != "auto")
columns = parseColumnsListFromString(TableFunction::configuration.structure, context);
else if (!structure_hint.empty())
columns = structure_hint;
else if (!cached_columns.empty())
columns = cached_columns;
StoragePtr storage = Storage::create(
TableFunction::configuration, context, LoadingStrictnessLevel::CREATE, StorageID(TableFunction::getDatabaseName(), table_name),
columns, ConstraintsDescription{}, String{}, std::nullopt);
TableFunction::configuration, context, LoadingStrictnessLevel::CREATE,
columns, StorageID(TableFunction::getDatabaseName(), table_name),
ConstraintsDescription{}, String{}, std::nullopt);
storage->startup();
return storage;
}
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
const char * getStorageTypeName() const override { return Storage::name; }
ColumnsDescription getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const override
@ -60,6 +67,8 @@ protected:
TableFunction::configuration.format = "Parquet";
TableFunction::parseArguments(ast_function, context);
}
ColumnsDescription structure_hint;
};
}

View File

@ -420,8 +420,8 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
StoragePtr storage = std::make_shared<StorageS3>(
configuration,
context,
StorageID(getDatabaseName(), table_name),
columns,
StorageID(getDatabaseName(), table_name),
ConstraintsDescription{},
String{},
/// No format_settings for table function S3

View File

@ -37,8 +37,8 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
storage = std::make_shared<StorageS3>(
configuration,
context,
StorageID(getDatabaseName(), table_name),
columns,
StorageID(getDatabaseName(), table_name),
ConstraintsDescription{},
/* comment */String{},
/* format_settings */std::nullopt, /// No format_settings for S3Cluster

View File

@ -511,3 +511,98 @@ def test_restart_broken_table_function(started_cluster):
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
def test_partition_columns(started_cluster):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_partition_columns"
result_file = f"{TABLE_NAME}"
partition_column = "c"
delta_table = (
DeltaTable.create(spark)
.tableName(TABLE_NAME)
.location(f"/{result_file}")
.addColumn("a", "INT")
.addColumn("b", "STRING")
.addColumn("c", "DATE")
.partitionedBy(partition_column)
.execute()
)
num_rows = 9
schema = StructType(
[
StructField("a", IntegerType()),
StructField("b", StringType()),
StructField("c", DateType()),
]
)
for i in range(1, num_rows + 1):
data = [
(
i,
"test" + str(i),
datetime.strptime(f"2000-01-0{i}", "%Y-%m-%d"),
)
]
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.write.mode("append").format("delta").partitionBy(partition_column).save(
f"/{TABLE_NAME}"
)
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert len(files) > 0
print(f"Uploaded files: {files}")
result = instance.query(
f"describe table deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"
).strip()
assert (
result
== "a\tNullable(Int32)\t\t\t\t\t\nb\tNullable(String)\t\t\t\t\t\nc\tNullable(Date)"
)
result = int(
instance.query(
f"""SELECT count()
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
"""
)
)
assert result == num_rows
result = int(
instance.query(
f"""SELECT count()
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
WHERE c == toDateTime('2000/01/05')
"""
)
)
assert result == 1
# instance.query(
# f"""
# DROP TABLE IF EXISTS {TABLE_NAME};
# CREATE TABLE {TABLE_NAME} (a Int32, b String, c DateTime)
# ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
# )
# assert (
# int(
# instance.query(
# f"SELECT count() FROM {TABLE_NAME} WHERE c != toDateTime('2000/01/05')"
# )
# )
# == num_rows - 1
# )
# instance.query(f"SELECT a, b, c, FROM {TABLE_NAME}")
# assert False