Add a separate optimisation to enable memory bound aggregation.

This commit is contained in:
Nikolai Kochetov 2022-11-30 14:31:39 +00:00
parent 500d54847e
commit 43b3c10e9a
14 changed files with 197 additions and 97 deletions

View File

@ -16,31 +16,6 @@
#include <QueryPipeline/Pipe.h>
#include <Storages/SelectQueryInfo.h>
using namespace DB;
namespace
{
/// We determine output stream sort properties by a local plan (local because otherwise table could be unknown).
/// If no local shard exist for this cluster, no sort properties will be provided, c'est la vie.
auto getRemoteShardsOutputStreamSortingProperties(const std::vector<QueryPlanPtr> & plans, ContextMutablePtr context)
{
SortDescription sort_description;
DataStream::SortScope sort_scope = DataStream::SortScope::None;
if (!plans.empty())
{
if (const auto * step = dynamic_cast<const ITransformingStep *>(plans.front()->getRootNode()->step.get());
step && step->getDataStreamTraits().can_enforce_sorting_properties_in_distributed_query)
{
step->adjustSettingsToEnforceSortingPropertiesInDistributedQuery(context);
sort_description = step->getOutputStream().sort_description;
sort_scope = step->getOutputStream().sort_scope;
}
}
return std::make_pair(sort_description, sort_scope);
}
}
namespace DB
{
@ -216,8 +191,6 @@ void executeQuery(
"_shard_count", Block{{DataTypeUInt32().createColumnConst(1, shards), std::make_shared<DataTypeUInt32>(), "_shard_count"}});
auto external_tables = context->getExternalTables();
auto && [sort_description, sort_scope] = getRemoteShardsOutputStreamSortingProperties(plans, new_context);
auto plan = std::make_unique<QueryPlan>();
auto read_from_remote = std::make_unique<ReadFromRemote>(
std::move(remote_shards),
@ -231,9 +204,7 @@ void executeQuery(
std::move(external_tables),
log,
shards,
query_info.storage_limits,
std::move(sort_description),
std::move(sort_scope));
query_info.storage_limits);
read_from_remote->setStepDescription("Read from remote replica");
plan->addStep(std::move(read_from_remote));
@ -329,7 +300,6 @@ void executeQueryWithParallelReplicas(
if (!remote_shards.empty())
{
auto new_context = Context::createCopy(context);
auto && [sort_description, sort_scope] = getRemoteShardsOutputStreamSortingProperties(plans, new_context);
for (const auto & shard : remote_shards)
{
@ -345,9 +315,7 @@ void executeQueryWithParallelReplicas(
scalars,
external_tables,
&Poco::Logger::get("ReadFromParallelRemoteReplicasStep"),
query_info.storage_limits,
sort_description,
sort_scope);
query_info.storage_limits);
auto remote_plan = std::make_unique<QueryPlan>();
remote_plan->addStep(std::move(read_from_remote));

View File

@ -2485,6 +2485,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
group_by_info = std::make_shared<InputOrderInfo>(
group_by_sort_description, group_by_sort_description.size(), 1 /* direction */, 0 /* limit */);
sort_description_for_merging = group_by_info->sort_description_for_merging;
}
auto merge_threads = max_streams;

View File

@ -32,16 +32,15 @@ static bool memoryBoundMergingWillBeUsed(
return should_produce_results_in_order_of_bucket_number && memory_bound_merging_of_aggregation_results_enabled && !sort_description_for_merging.empty();
}
static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number, bool memory_bound_merging_will_be_used)
static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys
.returns_single_stream = should_produce_results_in_order_of_bucket_number || memory_bound_merging_will_be_used,
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
.preserves_number_of_streams = false,
.preserves_sorting = false,
.can_enforce_sorting_properties_in_distributed_query = memory_bound_merging_will_be_used,
},
{
.preserves_number_of_rows = false,
@ -106,10 +105,7 @@ AggregatingStep::AggregatingStep(
: ITransformingStep(
input_stream_,
appendGroupingColumn(params_.getHeader(input_stream_.header, final_), params_.keys, grouping_sets_params_, group_by_use_nulls_),
getTraits(
should_produce_results_in_order_of_bucket_number_,
DB::memoryBoundMergingWillBeUsed(
should_produce_results_in_order_of_bucket_number_, memory_bound_merging_of_aggregation_results_enabled_, sort_description_for_merging_)),
getTraits(should_produce_results_in_order_of_bucket_number_),
false)
, params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
@ -141,6 +137,7 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
{
output_stream->sort_description = group_by_sort_description;
output_stream->sort_scope = DataStream::SortScope::Global;
output_stream->has_single_port = true;
}
}
@ -476,13 +473,6 @@ void AggregatingStep::updateOutputStream()
getDataStreamTraits());
}
void AggregatingStep::adjustSettingsToEnforceSortingPropertiesInDistributedQuery(ContextMutablePtr context) const
{
context->setSetting("enable_memory_bound_merging_of_aggregation_results", true);
context->setSetting("optimize_aggregation_in_order", true);
context->setSetting("force_aggregation_in_order", true);
}
bool AggregatingStep::memoryBoundMergingWillBeUsed() const
{
return DB::memoryBoundMergingWillBeUsed(

View File

@ -56,14 +56,11 @@ public:
bool inOrder() const { return !sort_description_for_merging.empty(); }
bool isGroupingSets() const { return !grouping_sets_params.empty(); }
void applyOrder(SortDescription sort_description_for_merging_, SortDescription group_by_sort_description_);
void adjustSettingsToEnforceSortingPropertiesInDistributedQuery(ContextMutablePtr context) const override;
bool memoryBoundMergingWillBeUsed() const;
private:
void updateOutputStream() override;
bool memoryBoundMergingWillBeUsed() const;
Aggregator::Params params;
GroupingSetsParamsList grouping_sets_params;
bool final;

View File

@ -34,9 +34,6 @@ public:
/// Doesn't change row order.
/// Examples: true for FilterStep, false for PartialSortingStep
bool preserves_sorting;
/// See adjustSettingsToEnforceSortingPropertiesInDistributedQuery().
bool can_enforce_sorting_properties_in_distributed_query = false;
};
/// This flags are used by QueryPlan optimizers.

View File

@ -19,7 +19,7 @@ static bool memoryBoundMergingWillBeUsed(
&& input_stream.sort_scope >= DataStream::SortScope::Stream && input_stream.sort_description.hasPrefix(group_by_sort_description);
}
static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number, bool memory_bound_merging_will_be_used)
static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_of_bucket_number)
{
return ITransformingStep::Traits
{
@ -28,7 +28,6 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
.preserves_number_of_streams = false,
.preserves_sorting = false,
.can_enforce_sorting_properties_in_distributed_query = memory_bound_merging_will_be_used,
},
{
.preserves_number_of_rows = false,
@ -51,10 +50,7 @@ MergingAggregatedStep::MergingAggregatedStep(
: ITransformingStep(
input_stream_,
params_.getHeader(input_stream_.header, final_),
getTraits(
should_produce_results_in_order_of_bucket_number_,
DB::memoryBoundMergingWillBeUsed(
input_stream_, memory_bound_merging_of_aggregation_results_enabled_, group_by_sort_description_)))
getTraits(should_produce_results_in_order_of_bucket_number_))
, params(std::move(params_))
, final(final_)
, memory_efficient_aggregation(memory_efficient_aggregation_)
@ -77,6 +73,19 @@ MergingAggregatedStep::MergingAggregatedStep(
}
}
void MergingAggregatedStep::updateInputSortDescription(SortDescription sort_description, DataStream::SortScope sort_scope)
{
auto & input_stream = input_streams.front();
input_stream.sort_scope = sort_scope;
input_stream.sort_description = sort_description;
if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)
{
output_stream->sort_description = group_by_sort_description;
output_stream->sort_scope = DataStream::SortScope::Global;
}
}
void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
@ -151,11 +160,6 @@ void MergingAggregatedStep::updateOutputStream()
output_stream->distinct_columns.insert(key);
}
void MergingAggregatedStep::adjustSettingsToEnforceSortingPropertiesInDistributedQuery(ContextMutablePtr context) const
{
context->setSetting("enable_memory_bound_merging_of_aggregation_results", true);
}
bool MergingAggregatedStep::memoryBoundMergingWillBeUsed() const
{
return DB::memoryBoundMergingWillBeUsed(

View File

@ -33,12 +33,13 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
void adjustSettingsToEnforceSortingPropertiesInDistributedQuery(ContextMutablePtr context) const override;
void updateInputSortDescription(SortDescription input_sort_description, DataStream::SortScope sort_scope);
bool memoryBoundMergingWillBeUsed() const;
private:
void updateOutputStream() override;
bool memoryBoundMergingWillBeUsed() const;
Aggregator::Params params;
bool final;

View File

@ -94,6 +94,10 @@ void optimizePrimaryKeyCondition(const Stack & stack);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
/// Enable memory bound merging of aggregation states for remote queries
/// in case it was enabled for local plan
void enableMemoryBoundMerging(QueryPlan::Node & node, QueryPlan::Nodes &);
}
}

View File

@ -0,0 +1,95 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/UnionStep.h>
namespace DB::QueryPlanOptimizations
{
/// We are trying to find a part of plan like
///
/// - ReadFromRemote (x N)
/// - Union - ReadFromParallelRemoteReplicasStep (x M)
/// - Aggregating/MergingAggregated
///
/// and enable memory bound merging for remote steps if it was enabled for local aggregation.
void enableMemoryBoundMerging(QueryPlan::Node & node, QueryPlan::Nodes &)
{
auto * root_mergine_aggeregated = typeid_cast<MergingAggregatedStep *>(node.step.get());
if (!root_mergine_aggeregated)
return;
const auto & union_node = *node.children.front();
auto * union_step = typeid_cast<UnionStep *>(union_node.step.get());
if (!union_step)
return;
std::vector<ReadFromRemote *> reading_steps;
std::vector<ReadFromParallelRemoteReplicasStep *> async_reading_steps;
IQueryPlanStep * local_plan = nullptr;
reading_steps.reserve((union_node.children.size()));
async_reading_steps.reserve((union_node.children.size()));
for (const auto & child : union_node.children)
{
auto * child_node = child->step.get();
if (auto * reading_step = typeid_cast<ReadFromRemote *>(child_node))
reading_steps.push_back(reading_step);
else if (auto * async_reading_step = typeid_cast<ReadFromParallelRemoteReplicasStep *>(child_node))
async_reading_steps.push_back(async_reading_step);
else if (local_plan)
/// Usually there is a signle local plan.
/// TODO: we can support many local plans and calculate common sort description prefix. Do we need it?
return;
else
local_plan = child_node;
}
/// We determine output stream sort properties by a local plan (local because otherwise table could be unknown).
/// If no local shard exist for this cluster, no sort properties will be provided, c'est la vie.
if (local_plan == nullptr || (reading_steps.empty() && async_reading_steps.empty()))
return;
SortDescription sort_description;
bool enforce_aggregation_in_order = false;
if (auto * aggregating_step = typeid_cast<AggregatingStep *>(local_plan))
{
if (aggregating_step->memoryBoundMergingWillBeUsed())
{
sort_description = aggregating_step->getOutputStream().sort_description;
enforce_aggregation_in_order = true;
}
}
else if (auto * mergine_aggeregated = typeid_cast<MergingAggregatedStep *>(local_plan))
{
if (mergine_aggeregated->memoryBoundMergingWillBeUsed())
{
sort_description = mergine_aggeregated->getOutputStream().sort_description;
}
}
else
return;
for (auto & reading : reading_steps)
{
reading->enforceSorting(sort_description);
if (enforce_aggregation_in_order)
reading->enforceAggregationInOrder();
}
for (auto & reading : async_reading_steps)
{
reading->enforceSorting(sort_description);
if (enforce_aggregation_in_order)
reading->enforceAggregationInOrder();
}
///
//union_step->updateOutputSortDescription();
root_mergine_aggeregated->updateInputSortDescription(sort_description, DataStream::SortScope::Stream);
}
}

View File

@ -1,6 +1,8 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Common/Exception.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <stack>
namespace DB
@ -129,6 +131,7 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
}
optimizePrimaryKeyCondition(stack);
enableMemoryBoundMerging(*frame.node, nodes);
stack.pop_back();
}

View File

@ -51,6 +51,32 @@ static void addConvertingActions(Pipe & pipe, const Block & header)
});
}
static void enforceSorting(QueryProcessingStage::Enum stage, DataStream & output_stream, Context & context, SortDescription output_sort_description)
{
if (stage != QueryProcessingStage::WithMergeableState)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot enforce sorting for ReadFromRemote step up to stage {}",
QueryProcessingStage::toString(stage));
context.setSetting("enable_memory_bound_merging_of_aggregation_results", true);
output_stream.sort_description = std::move(output_sort_description);
output_stream.sort_scope = DataStream::SortScope::Stream;
}
static void enforceAggregationInOrder(QueryProcessingStage::Enum stage, Context & context)
{
if (stage != QueryProcessingStage::WithMergeableState)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot enforce aggregation in order for ReadFromRemote step up to stage {}",
QueryProcessingStage::toString(stage));
context.setSetting("optimize_aggregation_in_order", true);
context.setSetting("force_aggregation_in_order", true);
}
static String formattedAST(const ASTPtr & ast)
{
if (!ast)
@ -70,15 +96,13 @@ ReadFromRemote::ReadFromRemote(
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextPtr context_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
SortDescription output_sort_description_,
DataStream::SortScope output_sort_scope_)
std::shared_ptr<const StorageLimitsList> storage_limits_)
: ISourceStep(DataStream{.header = std::move(header_)})
, shards(std::move(shards_))
, stage(stage_)
@ -92,8 +116,16 @@ ReadFromRemote::ReadFromRemote(
, log(log_)
, shard_count(shard_count_)
{
output_stream->sort_description = std::move(output_sort_description_);
output_stream->sort_scope = output_sort_scope_;
}
void ReadFromRemote::enforceSorting(SortDescription output_sort_description)
{
DB::enforceSorting(stage, *output_stream, *context, output_sort_description);
}
void ReadFromRemote::enforceAggregationInOrder()
{
DB::enforceAggregationInOrder(stage, *context);
}
void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard)
@ -238,14 +270,12 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextPtr context_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
SortDescription output_sort_description_,
DataStream::SortScope output_sort_scope_)
std::shared_ptr<const StorageLimitsList> storage_limits_)
: ISourceStep(DataStream{.header = std::move(header_)})
, coordinator(std::move(coordinator_))
, shard(std::move(shard_))
@ -266,11 +296,17 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
description.push_back(fmt::format("Replica: {}", address.host_name));
setStepDescription(boost::algorithm::join(description, ", "));
output_stream->sort_description = std::move(output_sort_description_);
output_stream->sort_scope = output_sort_scope_;
}
void ReadFromParallelRemoteReplicasStep::enforceSorting(SortDescription output_sort_description)
{
DB::enforceSorting(stage, *output_stream, *context, output_sort_description);
}
void ReadFromParallelRemoteReplicasStep::enforceAggregationInOrder()
{
DB::enforceAggregationInOrder(stage, *context);
}
void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{

View File

@ -27,34 +27,29 @@ public:
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextPtr context_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
SortDescription output_sort_description_,
DataStream::SortScope output_sort_scope_);
std::shared_ptr<const StorageLimitsList> storage_limits_);
String getName() const override { return "ReadFromRemote"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
private:
enum class Mode
{
PerReplica,
PerShard
};
void enforceSorting(SortDescription output_sort_description);
void enforceAggregationInOrder();
private:
ClusterProxy::SelectStreamFactory::Shards shards;
QueryProcessingStage::Enum stage;
StorageID main_table;
ASTPtr table_func_ptr;
ContextPtr context;
ContextMutablePtr context;
ThrottlerPtr throttler;
Scalars scalars;
@ -80,19 +75,20 @@ public:
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextPtr context_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
SortDescription output_sort_description_,
DataStream::SortScope output_sort_scope_);
std::shared_ptr<const StorageLimitsList> storage_limits_);
String getName() const override { return "ReadFromRemoteParallelReplicas"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void enforceSorting(SortDescription output_sort_description);
void enforceAggregationInOrder();
private:
void addPipeForSingeReplica(Pipes & pipes, std::shared_ptr<ConnectionPoolWithFailover> pool, IConnections::ReplicaInfo replica_info);
@ -104,7 +100,7 @@ private:
StorageID main_table;
ASTPtr table_func_ptr;
ContextPtr context;
ContextMutablePtr context;
ThrottlerPtr throttler;
Scalars scalars;

View File

@ -37,6 +37,11 @@ UnionStep::UnionStep(DataStreams input_streams_, size_t max_threads_)
else
output_stream = DataStream{.header = header};
updateOutputSortDescription();
}
void UnionStep::updateOutputSortDescription()
{
SortDescription common_sort_description = input_streams.front().sort_description;
DataStream::SortScope sort_scope = input_streams.front().sort_scope;
for (const auto & input_stream : input_streams)

View File

@ -19,6 +19,8 @@ public:
size_t getMaxThreads() const { return max_threads; }
void updateOutputSortDescription();
private:
Block header;
size_t max_threads;