This commit is contained in:
Nikita Taranov 2023-09-26 00:20:40 +02:00
parent 694792fa5d
commit 595027b1c6
2 changed files with 18 additions and 18 deletions

View File

@ -1,4 +1,5 @@
#include <algorithm>
#include <functional>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/Utils.h>
@ -588,8 +589,6 @@ void ReadFromMerge::createChildPlans()
context,
current_streams));
}
applyFilters();
}
SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & query_info,
@ -655,13 +654,15 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer
return modified_query_info;
}
template <typename Func>
bool recursivelyApplyToReadingSteps(QueryPlan::Node * node, const Func & func)
bool recursivelyApplyToReadingSteps(QueryPlan::Node * node, const std::function<bool(ReadFromMergeTree &)> & func)
{
bool ok = true;
for (auto * child : node->children)
ok &= recursivelyApplyToReadingSteps(child, func);
// This code is mainly meant to be used to call `requestReadingInOrder` on child steps.
// In this case it is ok if one child will read in order and other will not (though I don't know when it is possible),
// the only important part is to acknowledge this at the parent and don't rely on any particular ordering of input data.
if (!ok)
return false;
@ -823,16 +824,7 @@ QueryPlan ReadFromMerge::createPlanForTable(
if (!plan.isInitialized())
return {};
auto apply_filters = [this](ReadFromMergeTree & read_from_merge_tree)
{
size_t filters_dags_size = filter_dags.size();
for (size_t i = 0; i < filters_dags_size; ++i)
read_from_merge_tree.addFilter(filter_dags[i], filter_nodes.nodes[i]);
read_from_merge_tree.applyFilters();
return true;
};
recursivelyApplyToReadingSteps(plan.getRootNode(), apply_filters);
applyFilters(plan);
}
else if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns))
{
@ -1088,10 +1080,10 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
if (order_info_->direction != 1 && InterpreterSelectQuery::isQueryWithFinal(query_info))
return false;
auto request_read_in_order = [order_info = order_info_](ReadFromMergeTree & read_from_merge_tree)
auto request_read_in_order = [order_info_](ReadFromMergeTree & read_from_merge_tree)
{
return read_from_merge_tree.requestReadingInOrder(
order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
order_info_->used_prefix_of_sorting_key_size, order_info_->direction, order_info_->limit);
};
bool ok = true;
@ -1107,7 +1099,7 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
return true;
}
void ReadFromMerge::applyFilters()
void ReadFromMerge::applyFilters(const QueryPlan & plan) const
{
auto apply_filters = [this](ReadFromMergeTree & read_from_merge_tree)
{
@ -1119,9 +1111,14 @@ void ReadFromMerge::applyFilters()
return true;
};
recursivelyApplyToReadingSteps(plan.getRootNode(), apply_filters);
}
void ReadFromMerge::applyFilters()
{
for (const auto & plan : child_plans)
if (plan.isInitialized())
recursivelyApplyToReadingSteps(plan.getRootNode(), apply_filters);
applyFilters(plan);
}
IStorage::ColumnSizeByName StorageMerge::getColumnSizes() const

View File

@ -1,5 +1,6 @@
#pragma once
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
@ -183,6 +184,8 @@ private:
void createChildPlans();
void applyFilters(const QueryPlan & plan) const;
QueryPlan createPlanForTable(
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,