Merge pull request #30310 from amosbird/projection-fix20

Fix IN set filtering when projection is used.
This commit is contained in:
alexey-milovidov 2021-10-31 15:00:45 +03:00 committed by GitHub
commit 34009ea4a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 60 additions and 7 deletions

View File

@ -1951,7 +1951,7 @@ void Context::shutdownKeeperDispatcher() const
}
void Context::updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config)
void Context::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::AbstractConfiguration & config)
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_dispatcher_mutex);

View File

@ -156,6 +156,15 @@ InterpreterSelectQuery::InterpreterSelectQuery(
{
}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const SelectQueryOptions & options_,
PreparedSets prepared_sets_)
: InterpreterSelectQuery(query_ptr_, context_, std::nullopt, nullptr, options_, {}, {}, std::move(prepared_sets_))
{
}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
@ -258,13 +267,15 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const StoragePtr & storage_,
const SelectQueryOptions & options_,
const Names & required_result_column_names,
const StorageMetadataPtr & metadata_snapshot_)
const StorageMetadataPtr & metadata_snapshot_,
PreparedSets prepared_sets_)
/// NOTE: the query almost always should be cloned because it will be modified during analysis.
: IInterpreterUnionOrSelectQuery(options_.modify_inplace ? query_ptr_ : query_ptr_->clone(), context_, options_)
, storage(storage_)
, input_pipe(std::move(input_pipe_))
, log(&Poco::Logger::get("InterpreterSelectQuery"))
, metadata_snapshot(metadata_snapshot_)
, prepared_sets(std::move(prepared_sets_))
{
checkStackSize();
@ -354,7 +365,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Reuse already built sets for multiple passes of analysis
SubqueriesForSets subquery_for_sets;
PreparedSets prepared_sets;
auto analyze = [&] (bool try_move_to_prewhere)
{
@ -517,7 +527,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Reuse already built sets for multiple passes of analysis
subquery_for_sets = std::move(query_analyzer->getSubqueriesForSets());
prepared_sets = std::move(query_analyzer->getPreparedSets());
prepared_sets = query_info.sets.empty() ? std::move(query_analyzer->getPreparedSets()) : std::move(query_info.sets);
/// Do not try move conditions to PREWHERE for the second time.
/// Otherwise, we won't be able to fallback from inefficient PREWHERE to WHERE later.

View File

@ -6,6 +6,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IInterpreterUnionOrSelectQuery.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/StorageID.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/ReadInOrderOptimizer.h>
@ -66,6 +67,13 @@ public:
const StorageMetadataPtr & metadata_snapshot_ = nullptr,
const SelectQueryOptions & = {});
/// Read data not from the table specified in the query, but from the specified `storage_`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const SelectQueryOptions &,
PreparedSets prepared_sets_);
~InterpreterSelectQuery() override;
/// Execute a query. Get the stream of blocks to read.
@ -83,7 +91,7 @@ public:
const SelectQueryInfo & getQueryInfo() const { return query_info; }
const SelectQueryExpressionAnalyzer * getQueryAnalyzer() const { return query_analyzer.get(); }
SelectQueryExpressionAnalyzer * getQueryAnalyzer() const { return query_analyzer.get(); }
const ExpressionAnalysisResult & getAnalysisResult() const { return analysis_result; }
@ -104,7 +112,8 @@ private:
const StoragePtr & storage_,
const SelectQueryOptions &,
const Names & required_result_column_names = {},
const StorageMetadataPtr & metadata_snapshot_ = nullptr);
const StorageMetadataPtr & metadata_snapshot_ = nullptr,
PreparedSets prepared_sets_ = {});
ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
@ -193,6 +202,9 @@ private:
Poco::Logger * log;
StorageMetadataPtr metadata_snapshot;
/// Reuse already built sets for multiple passes of analysis, possibly across interpreters.
PreparedSets prepared_sets;
};
}

View File

@ -4552,8 +4552,12 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
return false;
InterpreterSelectQuery select(
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
query_ptr,
query_context,
SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias(),
query_info.sets /* prepared_sets */);
const auto & analysis_result = select.getAnalysisResult();
query_info.sets = std::move(select.getQueryAnalyzer()->getPreparedSets());
bool can_use_aggregate_projection = true;
/// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage,
@ -4897,6 +4901,8 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
{
selected_candidate->aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
selected_candidate->subqueries_for_sets
= std::make_shared<SubqueriesForSets>(std::move(select.getQueryAnalyzer()->getSubqueriesForSets()));
}
query_info.projection = std::move(*selected_candidate);

View File

@ -19,6 +19,7 @@
#include <Interpreters/Context.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
@ -374,6 +375,12 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
std::move(pipe),
fmt::format("MergeTree(with {} projection {})", query_info.projection->desc->type, query_info.projection->desc->name));
plan->addStep(std::move(step));
if (query_info.projection->subqueries_for_sets && !query_info.projection->subqueries_for_sets->empty())
{
SizeLimits limits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode);
addCreatingSetsStep(*plan, std::move(*query_info.projection->subqueries_for_sets), limits, context);
}
return plan;
}

View File

@ -42,6 +42,9 @@ using ClusterPtr = std::shared_ptr<Cluster>;
struct MergeTreeDataSelectAnalysisResult;
using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr<MergeTreeDataSelectAnalysisResult>;
struct SubqueryForSet;
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
struct PrewhereInfo
{
/// Actions which are executed in order to alias columns are used for prewhere actions.
@ -121,6 +124,7 @@ struct ProjectionCandidate
ReadInOrderOptimizerPtr order_optimizer;
InputOrderInfoPtr input_order_info;
ManyExpressionActions group_by_elements_actions;
std::shared_ptr<SubqueriesForSets> subqueries_for_sets;
MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr;
MergeTreeDataSelectAnalysisResultPtr merge_tree_normal_select_result_ptr;
};

View File

@ -0,0 +1,2 @@
2 3
2 3

View File

@ -0,0 +1,12 @@
drop table if exists x;
create table x (i UInt64, j UInt64, k UInt64, projection agg (select sum(j), avg(k) group by i), projection norm (select j, k order by i)) engine MergeTree order by tuple();
insert into x values (1, 2, 3);
set allow_experimental_projection_optimization = 1, use_index_for_in_with_subqueries = 0;
select sum(j), avg(k) from x where i in (select number from numbers(4));
select j, k from x where i in (select number from numbers(4));
drop table x;