Merge pull request #67931 from ClickHouse/backport/24.3/65057

Backport #65057 to 24.3: Refactor query plan prewhere optimization for Merge
This commit is contained in:
robot-clickhouse-ci-2 2024-08-06 22:54:10 +02:00 committed by GitHub
commit da2e926102
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 210 additions and 308 deletions

View File

@ -2371,49 +2371,6 @@ UInt64 InterpreterSelectQuery::maxBlockSizeByLimit() const
return 0;
}
/** Storages can rely that filters that for storage will be available for analysis before
* plan is fully constructed and optimized.
*
* StorageMerge common header calculation and prewhere push-down relies on this.
*
* This is similar to Planner::collectFiltersForAnalysis
*/
void collectFiltersForAnalysis(
const ASTPtr & query_ptr,
const ContextPtr & query_context,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryOptions & options,
SelectQueryInfo & query_info)
{
auto get_column_options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects().withVirtuals();
auto dummy = std::make_shared<StorageDummy>(
storage_snapshot->storage.getStorageID(), ColumnsDescription(storage_snapshot->getColumns(get_column_options)), storage_snapshot);
QueryPlan query_plan;
InterpreterSelectQuery(query_ptr, query_context, dummy, dummy->getInMemoryMetadataPtr(), options).buildQueryPlan(query_plan);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(query_context);
query_plan.optimize(optimization_settings);
std::vector<QueryPlan::Node *> nodes_to_process;
nodes_to_process.push_back(query_plan.getRootNode());
while (!nodes_to_process.empty())
{
const auto * node_to_process = nodes_to_process.back();
nodes_to_process.pop_back();
nodes_to_process.insert(nodes_to_process.end(), node_to_process->children.begin(), node_to_process->children.end());
auto * read_from_dummy = typeid_cast<ReadFromDummy *>(node_to_process->step.get());
if (!read_from_dummy)
continue;
query_info.filter_actions_dag = read_from_dummy->getFilterActionsDAG();
query_info.optimized_prewhere_info = read_from_dummy->getPrewhereInfo();
}
}
void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan)
{
auto & query = getSelectQuery();
@ -2540,10 +2497,6 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
}
else if (storage)
{
if (shouldMoveToPrewhere() && settings.query_plan_optimize_prewhere && settings.query_plan_enable_optimizations
&& typeid_cast<const StorageMerge *>(storage.get()))
collectFiltersForAnalysis(query_ptr, context, storage_snapshot, options, query_info);
/// Table.
if (max_streams == 0)
max_streams = 1;

View File

@ -164,7 +164,7 @@ FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr &
continue;
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
if (typeid_cast<const StorageDistributed *>(storage.get()) || typeid_cast<const StorageMerge *>(storage.get())
if (typeid_cast<const StorageDistributed *>(storage.get())
|| (parallel_replicas_estimation_enabled && std::dynamic_pointer_cast<MergeTreeData>(storage)))
{
collect_filters = true;

View File

@ -645,7 +645,6 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
auto table_expression_query_info = select_query_info;
table_expression_query_info.table_expression = table_expression;
table_expression_query_info.filter_actions_dag = table_expression_data.getFilterActions();
table_expression_query_info.optimized_prewhere_info = table_expression_data.getPrewhereInfo();
table_expression_query_info.analyzer_can_use_parallel_replicas_on_follower = table_node == planner_context->getGlobalPlannerContext()->parallel_replicas_table;
size_t max_streams = settings.max_threads;

View File

@ -19,6 +19,7 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Storages/StorageMerge.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
@ -480,6 +481,14 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
return 3;
}
if (auto * read_from_merge = typeid_cast<ReadFromMerge *>(child.get()))
{
FilterDAGInfo info{filter->getExpression(), filter->getFilterColumnName(), filter->removesFilterColumn()};
read_from_merge->addFilter(std::move(info));
std::swap(*parent_node, *child_node);
return 1;
}
return 0;
}

View File

@ -4,10 +4,10 @@
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Storages/MergeTree/MergeTreeWhereOptimizer.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageMerge.h>
#include <Interpreters/ActionsDAG.h>
#include <Functions/FunctionsLogical.h>
#include <Functions/IFunctionAdaptors.h>
namespace DB
{
@ -30,7 +30,7 @@ static void removeFromOutput(ActionsDAG & dag, const std::string name)
void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
{
if (stack.size() < 3)
if (stack.size() < 2)
return;
auto & frame = stack.back();
@ -45,6 +45,9 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes &)
if (!source_step_with_filter)
return;
if (typeid_cast<ReadFromMerge *>(frame.node->step.get()))
return;
const auto & storage_snapshot = source_step_with_filter->getStorageSnapshot();
const auto & storage = storage_snapshot->storage;
if (!storage.canMoveConditionsToPrewhere())

View File

@ -49,11 +49,6 @@ public:
filter_dags.push_back(std::move(filter_dag));
}
void addFilterFromParentStep(const ActionsDAG::Node * filter_node)
{
filter_nodes.nodes.push_back(filter_node);
}
/// Apply filters that can optimize reading from storage.
void applyFilters()
{

View File

@ -208,10 +208,6 @@ struct SelectQueryInfo
bool need_aggregate = false;
PrewhereInfoPtr prewhere_info;
/// Generated by pre-run optimization with StorageDummy.
/// Currently it's used to support StorageMerge PREWHERE optimization.
PrewhereInfoPtr optimized_prewhere_info;
/// If query has aggregate functions
bool has_aggregates = false;

View File

@ -34,9 +34,10 @@
#include <Parsers/ASTSelectQuery.h>
#include <Planner/PlannerActionsVisitor.h>
#include <Planner/Utils.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sources/NullSource.h>
@ -410,10 +411,14 @@ ReadFromMerge::ReadFromMerge(
{
}
void ReadFromMerge::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value)
void ReadFromMerge::addFilter(FilterDAGInfo filter)
{
SourceStepWithFilter::updatePrewhereInfo(prewhere_info_value);
common_header = applyPrewhereActions(common_header, prewhere_info);
output_stream->header = FilterTransform::transformHeader(
output_stream->header,
filter.actions.get(),
filter.column_name,
filter.do_remove_column);
pushed_down_filters.push_back(std::move(filter));
}
void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
@ -443,21 +448,7 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
Names column_names_as_aliases;
Aliases aliases;
Names real_column_names = column_names;
if (child_plan.row_policy_data_opt)
child_plan.row_policy_data_opt->extendNames(real_column_names);
auto modified_query_info = getModifiedQueryInfo(modified_context, table, nested_storage_snaphsot, real_column_names, column_names_as_aliases, aliases);
auto source_pipeline = createSources(
child_plan.plan,
nested_storage_snaphsot,
modified_query_info,
common_processed_stage,
common_header,
child_plan.table_aliases,
child_plan.row_policy_data_opt,
table);
auto source_pipeline = buildPipeline(child_plan, common_processed_stage);
if (source_pipeline && source_pipeline->initialized())
{
@ -575,10 +566,8 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
if (sampling_requested && !storage->supportsSampling())
throw Exception(ErrorCodes::SAMPLING_NOT_SUPPORTED, "Illegal SAMPLE: table {} doesn't support sampling", storage->getStorageID().getNameForLogs());
res.emplace_back();
auto & aliases = res.back().table_aliases;
auto & row_policy_data_opt = res.back().row_policy_data_opt;
Aliases aliases;
RowPolicyDataOpt row_policy_data_opt;
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, modified_context);
@ -657,7 +646,7 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
}
res.back().plan = createPlanForTable(
auto child = createPlanForTable(
nested_storage_snaphsot,
modified_query_info,
common_processed_stage,
@ -667,9 +656,32 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
row_policy_data_opt,
modified_context,
current_streams);
res.back().plan.addInterpreterContext(modified_context);
}
child.plan.addInterpreterContext(modified_context);
if (child.plan.isInitialized())
{
addVirtualColumns(child, modified_query_info, common_processed_stage, table);
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertAndFilterSourceStream(common_header, modified_query_info, nested_storage_snaphsot, aliases, row_policy_data_opt, context, child);
for (const auto & filter_info : pushed_down_filters)
{
auto filter_step = std::make_unique<FilterStep>(
child.plan.getCurrentDataStream(),
filter_info.actions->clone(),
filter_info.column_name,
filter_info.do_remove_column);
child.plan.addStep(std::move(filter_step));
}
child.plan.optimize(QueryPlanOptimizationSettings::fromContext(modified_context));
}
res.emplace_back(std::move(child));
}
return res;
}
@ -884,8 +896,6 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const ContextMutablePtr & mo
const StorageID current_storage_id = storage->getStorageID();
SelectQueryInfo modified_query_info = query_info;
if (modified_query_info.optimized_prewhere_info && !modified_query_info.prewhere_info)
modified_query_info.prewhere_info = modified_query_info.optimized_prewhere_info;
if (modified_query_info.planner_context)
modified_query_info.planner_context = std::make_shared<PlannerContext>(modified_context, modified_query_info.planner_context);
@ -1027,31 +1037,101 @@ bool recursivelyApplyToReadingSteps(QueryPlan::Node * node, const std::function<
return ok;
}
QueryPipelineBuilderPtr ReadFromMerge::createSources(
QueryPlan & plan,
const StorageSnapshotPtr & storage_snapshot_,
void ReadFromMerge::addVirtualColumns(
ChildPlan & child,
SelectQueryInfo & modified_query_info,
QueryProcessingStage::Enum processed_stage,
const Block & header,
const Aliases & aliases,
const RowPolicyDataOpt & row_policy_data_opt,
const StorageWithLockAndName & storage_with_lock,
bool concat_streams) const
const StorageWithLockAndName & storage_with_lock) const
{
if (!plan.isInitialized())
return std::make_unique<QueryPipelineBuilder>();
QueryPipelineBuilderPtr builder;
const auto & [database_name, storage, _, table_name] = storage_with_lock;
const auto & [database_name, _, storage, table_name] = storage_with_lock;
bool allow_experimental_analyzer = context->getSettingsRef().allow_experimental_analyzer;
auto storage_stage
= storage->getQueryProcessingStage(context, processed_stage, storage_snapshot_, modified_query_info);
builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
/// Add virtual columns if we don't already have them.
if (processed_stage > storage_stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns))
Block plan_header = child.plan.getCurrentDataStream().header;
if (allow_experimental_analyzer)
{
String table_alias = modified_query_info.query_tree->as<QueryNode>()->getJoinTree()->as<TableNode>()->getAlias();
String database_column = table_alias.empty() || processed_stage == QueryProcessingStage::FetchColumns ? "_database" : table_alias + "._database";
String table_column = table_alias.empty() || processed_stage == QueryProcessingStage::FetchColumns ? "_table" : table_alias + "._table";
if (has_database_virtual_column && common_header.has(database_column)
&& child.stage == QueryProcessingStage::FetchColumns && !plan_header.has(database_column))
{
ColumnWithTypeAndName column;
column.name = database_column;
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(database_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto expression_step = std::make_unique<ExpressionStep>(child.plan.getCurrentDataStream(), adding_column_dag);
child.plan.addStep(std::move(expression_step));
plan_header = child.plan.getCurrentDataStream().header;
}
if (has_table_virtual_column && common_header.has(table_column)
&& child.stage == QueryProcessingStage::FetchColumns && !plan_header.has(table_column))
{
ColumnWithTypeAndName column;
column.name = table_column;
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(table_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto expression_step = std::make_unique<ExpressionStep>(child.plan.getCurrentDataStream(), adding_column_dag);
child.plan.addStep(std::move(expression_step));
plan_header = child.plan.getCurrentDataStream().header;
}
}
else
{
if (has_database_virtual_column && common_header.has("_database") && !plan_header.has("_database"))
{
ColumnWithTypeAndName column;
column.name = "_database";
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(database_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto expression_step = std::make_unique<ExpressionStep>(child.plan.getCurrentDataStream(), adding_column_dag);
child.plan.addStep(std::move(expression_step));
plan_header = child.plan.getCurrentDataStream().header;
}
if (has_table_virtual_column && common_header.has("_table") && !plan_header.has("_table"))
{
ColumnWithTypeAndName column;
column.name = "_table";
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(table_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto expression_step = std::make_unique<ExpressionStep>(child.plan.getCurrentDataStream(), adding_column_dag);
child.plan.addStep(std::move(expression_step));
plan_header = child.plan.getCurrentDataStream().header;
}
}
}
QueryPipelineBuilderPtr ReadFromMerge::buildPipeline(
ChildPlan & child,
QueryProcessingStage::Enum processed_stage) const
{
if (!child.plan.isInitialized())
return nullptr;
auto optimisation_settings = QueryPlanOptimizationSettings::fromContext(context);
/// All optimisations will be done at plans creation
optimisation_settings.optimize_plan = false;
auto builder = child.plan.buildQueryPipeline(optimisation_settings, BuildQueryPipelineSettings::fromContext(context));
if (!builder->initialized())
return builder;
bool allow_experimental_analyzer = context->getSettingsRef().allow_experimental_analyzer;
if (processed_stage > child.stage || (allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns))
{
/** Materialization is needed, since from distributed storage the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
@ -1060,99 +1140,10 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
builder->addSimpleTransform([](const Block & stream_header) { return std::make_shared<MaterializingTransform>(stream_header); });
}
if (builder->initialized())
{
if (concat_streams && builder->getNumStreams() > 1)
{
// It's possible to have many tables read from merge, resize(1) might open too many files at the same time.
// Using concat instead.
builder->addTransform(std::make_shared<ConcatProcessor>(builder->getHeader(), builder->getNumStreams()));
}
/// Add virtual columns if we don't already have them.
Block pipe_header = builder->getHeader();
if (allow_experimental_analyzer)
{
String table_alias = modified_query_info.query_tree->as<QueryNode>()->getJoinTree()->as<TableNode>()->getAlias();
String database_column = table_alias.empty() || processed_stage == QueryProcessingStage::FetchColumns ? "_database" : table_alias + "._database";
String table_column = table_alias.empty() || processed_stage == QueryProcessingStage::FetchColumns ? "_table" : table_alias + "._table";
if (has_database_virtual_column && common_header.has(database_column)
&& storage_stage == QueryProcessingStage::FetchColumns && !pipe_header.has(database_column))
{
ColumnWithTypeAndName column;
column.name = database_column;
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(database_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
if (has_table_virtual_column && common_header.has(table_column)
&& storage_stage == QueryProcessingStage::FetchColumns && !pipe_header.has(table_column))
{
ColumnWithTypeAndName column;
column.name = table_column;
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(table_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
}
else
{
if (has_database_virtual_column && common_header.has("_database") && !pipe_header.has("_database"))
{
ColumnWithTypeAndName column;
column.name = "_database";
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(database_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
if (has_table_virtual_column && common_header.has("_table") && !pipe_header.has("_table"))
{
ColumnWithTypeAndName column;
column.name = "_table";
column.type = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
column.column = column.type->createColumnConst(0, Field(table_name));
auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column));
auto adding_column_actions = std::make_shared<ExpressionActions>(
std::move(adding_column_dag), ExpressionActionsSettings::fromContext(context, CompileExpressions::yes));
builder->addSimpleTransform([&](const Block & stream_header)
{ return std::make_shared<ExpressionTransform>(stream_header, adding_column_actions); });
}
}
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertAndFilterSourceStream(
header, modified_query_info, storage_snapshot_, aliases, row_policy_data_opt, context, *builder, storage_stage);
}
return builder;
}
QueryPlan ReadFromMerge::createPlanForTable(
ReadFromMerge::ChildPlan ReadFromMerge::createPlanForTable(
const StorageSnapshotPtr & storage_snapshot_,
SelectQueryInfo & modified_query_info,
QueryProcessingStage::Enum processed_stage,
@ -1189,35 +1180,14 @@ QueryPlan ReadFromMerge::createPlanForTable(
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot_->metadata->getColumns().getAllPhysical()).name);
StorageView * view = dynamic_cast<StorageView *>(storage.get());
if (!view || allow_experimental_analyzer)
{
storage->read(plan,
real_column_names,
storage_snapshot_,
modified_query_info,
modified_context,
processed_stage,
max_block_size,
UInt32(streams_num));
}
else
{
/// For view storage, we need to rewrite the `modified_query_info.view_query` to optimize read.
/// The most intuitive way is to use InterpreterSelectQuery.
/// Intercept the settings
modified_context->setSetting("max_threads", streams_num);
modified_context->setSetting("max_streams_to_max_threads_ratio", 1);
modified_context->setSetting("max_block_size", max_block_size);
InterpreterSelectQuery interpreter(modified_query_info.query,
modified_context,
storage,
view->getInMemoryMetadataPtr(),
SelectQueryOptions(processed_stage));
interpreter.buildQueryPlan(plan);
}
storage->read(plan,
real_column_names,
storage_snapshot_,
modified_query_info,
modified_context,
processed_stage,
max_block_size,
UInt32(streams_num));
if (!plan.isInitialized())
return {};
@ -1256,7 +1226,7 @@ QueryPlan ReadFromMerge::createPlanForTable(
}
}
return plan;
return ChildPlan{std::move(plan), storage_stage};
}
ReadFromMerge::RowPolicyData::RowPolicyData(RowPolicyFilterPtr row_policy_filter_ptr,
@ -1314,12 +1284,10 @@ void ReadFromMerge::RowPolicyData::addStorageFilter(SourceStepWithFilter * step)
step->addFilter(actions_dag, filter_column_name);
}
void ReadFromMerge::RowPolicyData::addFilterTransform(QueryPipelineBuilder & builder) const
void ReadFromMerge::RowPolicyData::addFilterTransform(QueryPlan & plan) const
{
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<FilterTransform>(stream_header, filter_actions, filter_column_name, true /* remove filter column */);
});
auto filter_step = std::make_unique<FilterStep>(plan.getCurrentDataStream(), actions_dag, filter_column_name, true /* remove filter column */);
plan.addStep(std::move(filter_step));
}
StorageMerge::StorageListWithLocks ReadFromMerge::getSelectedTables(
@ -1498,13 +1466,12 @@ void ReadFromMerge::convertAndFilterSourceStream(
const Aliases & aliases,
const RowPolicyDataOpt & row_policy_data_opt,
ContextPtr local_context,
QueryPipelineBuilder & builder,
QueryProcessingStage::Enum processed_stage)
ChildPlan & child)
{
Block before_block_header = builder.getHeader();
Block before_block_header = child.plan.getCurrentDataStream().header;
auto storage_sample_block = snapshot->metadata->getSampleBlock();
auto pipe_columns = builder.getHeader().getNamesAndTypesList();
auto pipe_columns = before_block_header.getNamesAndTypesList();
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
@ -1527,13 +1494,8 @@ void ReadFromMerge::convertAndFilterSourceStream(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected to have 1 output but got {}", nodes.size());
actions_dag->addOrReplaceInOutputs(actions_dag->addAlias(*nodes.front(), alias.name));
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
auto expression_step = std::make_unique<ExpressionStep>(child.plan.getCurrentDataStream(), actions_dag);
child.plan.addStep(std::move(expression_step));
}
}
else
@ -1547,37 +1509,26 @@ void ReadFromMerge::convertAndFilterSourceStream(
auto dag = std::make_shared<ActionsDAG>(pipe_columns);
auto actions_dag = expression_analyzer.getActionsDAG(true, false);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
auto expression_step = std::make_unique<ExpressionStep>(child.plan.getCurrentDataStream(), actions_dag);
child.plan.addStep(std::move(expression_step));
}
}
ActionsDAG::MatchColumnsMode convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Name;
if (local_context->getSettingsRef().allow_experimental_analyzer
&& (processed_stage != QueryProcessingStage::FetchColumns || dynamic_cast<const StorageDistributed *>(&snapshot->storage) != nullptr))
&& (child.stage != QueryProcessingStage::FetchColumns || dynamic_cast<const StorageDistributed *>(&snapshot->storage) != nullptr))
convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position;
if (row_policy_data_opt)
{
row_policy_data_opt->addFilterTransform(builder);
}
row_policy_data_opt->addFilterTransform(child.plan);
auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(),
auto convert_actions_dag = ActionsDAG::makeConvertingActions(child.plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
convert_actions_match_columns_mode);
auto actions = std::make_shared<ExpressionActions>(
std::move(convert_actions_dag),
ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<ExpressionTransform>(stream_header, actions);
});
auto expression_step = std::make_unique<ExpressionStep>(child.plan.getCurrentDataStream(), convert_actions_dag);
child.plan.addStep(std::move(expression_step));
}
const ReadFromMerge::StorageListWithLocks & ReadFromMerge::getSelectedTables()
@ -1614,29 +1565,14 @@ bool ReadFromMerge::requestReadingInOrder(InputOrderInfoPtr order_info_)
return true;
}
void ReadFromMerge::applyFilters(const QueryPlan & plan, const ActionDAGNodes & added_filter_nodes) const
{
auto apply_filters = [&added_filter_nodes](ReadFromMergeTree & read_from_merge_tree)
{
for (const auto & node : added_filter_nodes.nodes)
read_from_merge_tree.addFilterFromParentStep(node);
read_from_merge_tree.SourceStepWithFilter::applyFilters();
return true;
};
recursivelyApplyToReadingSteps(plan.getRootNode(), apply_filters);
}
void ReadFromMerge::applyFilters(ActionDAGNodes added_filter_nodes)
{
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
for (const auto & filter_info : pushed_down_filters)
added_filter_nodes.nodes.push_back(&filter_info.actions->findInOutputs(filter_info.column_name));
SourceStepWithFilter::applyFilters(added_filter_nodes);
filterTablesAndCreateChildrenPlans();
for (const auto & child_plan : *child_plans)
if (child_plan.plan.isInitialized())
applyFilters(child_plan.plan, added_filter_nodes);
}
QueryPlanRawPtrs ReadFromMerge::getChildPlans()

View File

@ -164,7 +164,7 @@ public:
QueryPlanRawPtrs getChildPlans() override;
void updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info_value) override;
void addFilter(FilterDAGInfo filter);
private:
const size_t required_max_block_size;
@ -220,7 +220,7 @@ private:
/// Create explicit filter transform to exclude
/// rows that are not conform to row level policy
void addFilterTransform(QueryPipelineBuilder &) const;
void addFilterTransform(QueryPlan &) const;
private:
std::string filter_column_name; // complex filter, may contain logic operations
@ -234,21 +234,21 @@ private:
struct ChildPlan
{
QueryPlan plan;
Aliases table_aliases;
RowPolicyDataOpt row_policy_data_opt;
QueryProcessingStage::Enum stage;
};
/// Store read plan for each child table.
/// It's needed to guarantee lifetime for child steps to be the same as for this step (mainly for EXPLAIN PIPELINE).
std::optional<std::vector<ChildPlan>> child_plans;
/// Store filters pushed down from query plan optimization. Filters are added on top of child plans.
std::vector<FilterDAGInfo> pushed_down_filters;
std::vector<ChildPlan> createChildrenPlans(SelectQueryInfo & query_info_) const;
void filterTablesAndCreateChildrenPlans();
void applyFilters(const QueryPlan & plan, const ActionDAGNodes & added_filter_nodes) const;
QueryPlan createPlanForTable(
ChildPlan createPlanForTable(
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
QueryProcessingStage::Enum processed_stage,
@ -259,16 +259,15 @@ private:
ContextMutablePtr modified_context,
size_t streams_num) const;
QueryPipelineBuilderPtr createSources(
QueryPlan & plan,
const StorageSnapshotPtr & storage_snapshot,
void addVirtualColumns(
ChildPlan & child,
SelectQueryInfo & modified_query_info,
QueryProcessingStage::Enum processed_stage,
const Block & header,
const Aliases & aliases,
const RowPolicyDataOpt & row_policy_data_opt,
const StorageWithLockAndName & storage_with_lock,
bool concat_streams = false) const;
const StorageWithLockAndName & storage_with_lock) const;
QueryPipelineBuilderPtr buildPipeline(
ChildPlan & child,
QueryProcessingStage::Enum processed_stage) const;
static void convertAndFilterSourceStream(
const Block & header,
@ -277,15 +276,12 @@ private:
const Aliases & aliases,
const RowPolicyDataOpt & row_policy_data_opt,
ContextPtr context,
QueryPipelineBuilder & builder,
QueryProcessingStage::Enum processed_stage);
ChildPlan & child);
StorageMerge::StorageListWithLocks getSelectedTables(
ContextPtr query_context,
bool filter_by_database_virtual_column,
bool filter_by_table_virtual_column) const;
// static VirtualColumnsDescription createVirtuals(StoragePtr first_table);
};
}

View File

@ -1,6 +1,3 @@
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
@ -8,8 +5,15 @@
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
2
Filter column: and(equals(k, 3), notEmpty(v)) (removed)
Filter column: and(equals(k, 3), notEmpty(v)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
2
Filter column: and(equals(k, 3), notEmpty(v)) (removed)
Filter column: and(equals(k, 3), notEmpty(v)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
Prewhere info
Prewhere filter
Prewhere filter column: and(notEmpty(v), equals(k, 3)) (removed)
2

View File

@ -0,0 +1 @@
59900 1000 1396

View File

@ -0,0 +1,7 @@
create table merge_kek_1 (x UInt32, y UInt32) engine = MergeTree order by x;
create table merge_kek_2 (x UInt32, y UInt32) engine = MergeTree order by x;
insert into merge_kek_1 select number, number from numbers(100);
insert into merge_kek_2 select number + 500, number + 500 from numbers(1e6);
select sum(x), min(x + x), max(x + x) from merge(currentDatabase(), '^merge_kek_.$') where x > 200 and y in (select 500 + number * 2 from numbers(100)) settings max_threads=2;

View File

@ -7,6 +7,9 @@ Expression ((Projection + Before ORDER BY))
Aggregating
Expression (Before GROUP BY)
ReadFromMerge
ReadFromMergeTree (default.mt1)
ReadFromMergeTree (default.mt2)
ReadFromStorage (TinyLog)
Expression
ReadFromMergeTree (default.mt1)
Expression
ReadFromMergeTree (default.mt2)
Expression
ReadFromStorage (TinyLog)