Style changes around filterKeysForPartitionPruning

This commit is contained in:
vdimir 2023-11-20 18:08:45 +00:00
parent 95e9a27417
commit 31a6c7c1c4
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
2 changed files with 18 additions and 16 deletions

View File

@ -172,7 +172,7 @@ private:
}; };
static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, const String & bucket, const Strings & keys) static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, const String & bucket, const std::unordered_set<String> & keys)
{ {
Block virtual_columns_block; Block virtual_columns_block;
fs::path bucket_path(bucket); fs::path bucket_path(bucket);
@ -226,31 +226,31 @@ static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, con
return virtual_columns_block; return virtual_columns_block;
} }
static void filterKeysForPartitionPruning(std::vector<String> & keys, static std::vector<String> filterKeysForPartitionPruning(
const String & bucket, const std::vector<String> & keys,
const NamesAndTypesList & virtual_columns, const String & bucket,
const std::vector<ActionsDAGPtr> & filter_dags, const NamesAndTypesList & virtual_columns,
ContextPtr context) const std::vector<ActionsDAGPtr> & filter_dags,
ContextPtr context)
{ {
std::unordered_set<String> result_keys(keys.begin(), keys.end());
for (const auto & filter_dag : filter_dags) for (const auto & filter_dag : filter_dags)
{ {
if (keys.empty()) if (result_keys.empty())
break; break;
auto block = getBlockWithVirtuals(virtual_columns, bucket, keys); auto block = getBlockWithVirtuals(virtual_columns, bucket, result_keys);
auto filter_actions = VirtualColumnUtils::splitFilterDagForAllowedInputs(block, filter_dag, context); auto filter_actions = VirtualColumnUtils::splitFilterDagForAllowedInputs(block, filter_dag, context);
if (!filter_actions) if (!filter_actions)
continue; continue;
VirtualColumnUtils::filterBlockWithQuery(filter_actions, block, context); VirtualColumnUtils::filterBlockWithQuery(filter_actions, block, context);
std::unordered_set<String> filtered_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, "_key"); result_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, "_key");
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", filtered_keys.size(), keys.size());
keys.clear();
keys.reserve(filtered_keys.size());
for (auto && key : filtered_keys)
keys.emplace_back(key);
} }
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", result_keys.size(), keys.size());
return std::vector<String>(result_keys.begin(), result_keys.end());
} }
class IOutputFormat; class IOutputFormat;
@ -1161,8 +1161,7 @@ static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
} }
else else
{ {
Strings keys = configuration.keys; Strings keys = filterKeysForPartitionPruning(configuration.keys, configuration.url.bucket, virtual_columns, filter_dags, local_context);
filterKeysForPartitionPruning(keys, configuration.url.bucket, virtual_columns, filter_dags, local_context);
return std::make_shared<StorageS3Source::KeysIterator>( return std::make_shared<StorageS3Source::KeysIterator>(
*configuration.client, configuration.url.version_id, keys, *configuration.client, configuration.url.version_id, keys,
configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback); configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback);
@ -1266,6 +1265,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
void ReadFromStorageS3Step::applyFilters() void ReadFromStorageS3Step::applyFilters()
{ {
/// We will use filter_dags in filterKeysForPartitionPruning called from initializePipeline, nothing to do here
} }
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/) SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)

View File

@ -34,6 +34,8 @@ bool prepareFilterBlockWithQuery(const ASTPtr & query, ContextPtr context, Block
/// If `expression_ast` is passed, use it to filter block. /// If `expression_ast` is passed, use it to filter block.
void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr context, ASTPtr expression_ast = {}); void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr context, ASTPtr expression_ast = {});
void filterBlockWithQuery(ActionsDAGPtr dag, Block & block, ContextPtr context); void filterBlockWithQuery(ActionsDAGPtr dag, Block & block, ContextPtr context);
/// Extract subset of filter_dag that can be evaluated using only columns from header
ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context); ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context);
/// Extract from the input stream a set of `name` column values /// Extract from the input stream a set of `name` column values