This commit is contained in:
Nikita Taranov 2023-01-05 19:22:42 +00:00
parent 2e3a3cc4dc
commit a2c9aeb7c9
13 changed files with 525 additions and 55 deletions

View File

@ -1084,6 +1084,7 @@ private:
friend struct AggregatedDataVariants;
friend class ConvertingAggregatedToChunksTransform;
friend class ConvertingAggregatedToChunksSource;
friend class ConvertingAggregatedToChunksWithMergingSource;
friend class AggregatingInOrderTransform;
/// Data structure of source blocks.

View File

@ -166,6 +166,12 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
params.group_by_two_level_threshold_bytes = 0;
}
/// In case of external aggregation we cannot completely avoid merging,
/// because each thread might use multiple hash tables during the execution and then the same key might be present in multiple hash tables.
/// But nevertheless we could save some time merging only HTs from the same thread (future task).
if (params.max_bytes_before_external_group_by)
skip_merging = false;
/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
@ -228,7 +234,14 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
auto many_data = std::make_shared<ManyAggregatedData>(streams);
for (size_t j = 0; j < streams; ++j)
{
auto aggregation_for_set = std::make_shared<AggregatingTransform>(input_header, transform_params_for_set, many_data, j, merge_threads, temporary_data_merge_threads);
auto aggregation_for_set = std::make_shared<AggregatingTransform>(
input_header,
transform_params_for_set,
many_data,
j,
merge_threads,
temporary_data_merge_threads,
skip_merging);
// For each input stream we have `grouping_sets_size` copies, so port index
// for transform #j should skip ports of first (j-1) streams.
connect(*ports[i + grouping_sets_size * j], aggregation_for_set->getInputs().front());
@ -410,16 +423,19 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
if (pipeline.getNumStreams() > 1)
{
/// Add resize transform to uniformly distribute data between aggregating streams.
if (!storage_has_evenly_distributed_read)
/// But not if we execute aggregation over partitoned data in which case data streams shouldn't be mixed.
if (!storage_has_evenly_distributed_read && !skip_merging)
pipeline.resize(pipeline.getNumStreams(), true, true);
auto many_data = std::make_shared<ManyAggregatedData>(pipeline.getNumStreams());
size_t counter = 0;
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
pipeline.addSimpleTransform(
[&](const Block & header)
{
return std::make_shared<AggregatingTransform>(
header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads, skip_merging);
});
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */);

View File

@ -59,6 +59,7 @@ public:
bool isGroupingSets() const { return !grouping_sets_params.empty(); }
void applyOrder(SortDescription sort_description_for_merging_, SortDescription group_by_sort_description_);
bool memoryBoundMergingWillBeUsed() const;
void skipMerging() { skip_merging = true; }
private:
void updateOutputStream() override;
@ -70,6 +71,7 @@ private:
size_t aggregation_in_order_max_block_bytes;
size_t merge_threads;
size_t temporary_data_merge_threads;
bool skip_merging = false; // if we aggregate partitioned data merging is not needed
bool storage_has_evenly_distributed_read;
bool group_by_use_nulls;

View File

@ -68,17 +68,22 @@ void tryRemoveRedundantSorting(QueryPlan::Node * root);
/// - Something - - Expression - Something -
size_t tryLiftUpUnion(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
size_t tryAggregateEachPartitionIndependently(QueryPlan::Node * node, QueryPlan::Nodes &);
inline const auto & getOptimizations()
{
static const std::array<Optimization, 8> optimizations = {{
static const std::array<Optimization, 9> optimizations = {{
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan},
{tryReuseStorageOrderingForWindowFunctions,
"reuseStorageOrderingForWindowFunctions",
&QueryPlanOptimizationSettings::optimize_plan},
{tryLiftUpUnion, "liftUpUnion", &QueryPlanOptimizationSettings::optimize_plan},
{tryAggregateEachPartitionIndependently, "aggregationPartitionsIndepedently", &QueryPlanOptimizationSettings::optimize_plan},
}};
return optimizations;

View File

@ -0,0 +1,70 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
using namespace DB;
namespace
{
bool isPartitionKeySuitsGroupByKey(const ReadFromMergeTree & reading, const AggregatingStep & aggregating)
{
const auto & gb_keys = aggregating.getParams().keys;
if (aggregating.isGroupingSets() || gb_keys.size() != 1)
return false;
const auto & pkey_nodes = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG().getNodes();
LOG_DEBUG(&Poco::Logger::get("debug"), "{}", reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG().dumpDAG());
if (!pkey_nodes.empty())
{
const auto & func_node = pkey_nodes.back();
LOG_DEBUG(&Poco::Logger::get("debug"), "{} {} {}", func_node.type, func_node.is_deterministic, func_node.children.size());
if (func_node.type == ActionsDAG::ActionType::FUNCTION && func_node.function->getName() == "modulo"
&& func_node.children.size() == 2)
{
const auto & arg1 = func_node.children.front();
const auto & arg2 = func_node.children.back();
LOG_DEBUG(&Poco::Logger::get("debug"), "{} {} {}", arg1->type, arg1->result_name, arg2->type);
if (arg1->type == ActionsDAG::ActionType::INPUT && arg1->result_name == gb_keys[0]
&& arg2->type == ActionsDAG::ActionType::COLUMN && typeid_cast<const ColumnConst *>(arg2->column.get()))
return true;
}
}
return false;
}
}
namespace DB::QueryPlanOptimizations
{
size_t tryAggregateEachPartitionIndependently(QueryPlan::Node * node, QueryPlan::Nodes &)
{
if (!node || node->children.size() != 1)
return 0;
auto * aggregating_step = typeid_cast<AggregatingStep *>(node->step.get());
if (!aggregating_step)
return 0;
const auto * expression_node = node->children.front();
if (expression_node->children.size() != 1 || !typeid_cast<const ExpressionStep *>(expression_node->step.get()))
return 0;
auto * reading_step = expression_node->children.front()->step.get();
auto * reading = typeid_cast<ReadFromMergeTree *>(reading_step);
if (!reading)
return 0;
if (!reading->willOutputEachPartitionThroughSeparatePort() && isPartitionKeySuitsGroupByKey(*reading, *aggregating_step))
{
if (reading->requestOutputEachPartitionThroughSeparatePort())
aggregating_step->skipMerging();
}
return 0;
}
}

View File

@ -1,13 +1,14 @@
#include <algorithm>
#include <functional>
#include <iterator>
#include <memory>
#include <numeric>
#include <queue>
#include <stdexcept>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h>
@ -30,13 +31,31 @@
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeSource.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/logger_useful.h>
#include <base/sort.h>
#include <Poco/Logger.h>
#include <Common/JSONBuilder.h>
#include <Common/logger_useful.h>
namespace
{
size_t countPartitions(DB::RangesInDataParts & parts_with_ranges)
{
std::string cur_partition_id = parts_with_ranges[0].data_part->info.partition_id;
size_t unique_partitions = 1;
for (size_t i = 1; i < parts_with_ranges.size(); ++i)
{
if (parts_with_ranges[i].data_part->info.partition_id != cur_partition_id)
{
++unique_partitions;
cur_partition_id = parts_with_ranges[i].data_part->info.partition_id;
}
}
return unique_partitions;
}
}
namespace ProfileEvents
{
@ -242,7 +261,6 @@ Pipe ReadFromMergeTree::readFromPool(
if (i == 0 && !client_info.collaborate_with_initiator)
source->addTotalRowsApprox(total_rows);
pipes.emplace_back(std::move(source));
}
@ -335,8 +353,12 @@ Pipe ReadFromMergeTree::read(
size_t max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache)
{
if (read_type == ReadType::Default && max_streams > 1)
return readFromPool(parts_with_range, required_columns, max_streams,
min_marks_for_concurrent_read, use_uncompressed_cache);
{
Pipe pipe = readFromPool(parts_with_range, required_columns, max_streams, min_marks_for_concurrent_read, use_uncompressed_cache);
if (output_each_partition_through_separate_port)
pipe.resize(1);
return pipe;
}
auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache, 0);
@ -418,9 +440,8 @@ struct PartRangesReadInfo
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,
const Names & column_names)
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsImpl(
RangesInDataParts && parts_with_ranges, const Names & column_names, size_t num_streams)
{
const auto & settings = context->getSettingsRef();
const auto data_settings = data.getSettings();
@ -430,16 +451,67 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
if (0 == info.sum_marks)
return {};
size_t num_streams = requested_num_streams;
if (num_streams > 1)
{
/// Reduce the number of num_streams if the data is small.
if (info.sum_marks < num_streams * info.min_marks_for_concurrent_read && parts_with_ranges.size() < num_streams)
num_streams = std::max((info.sum_marks + info.min_marks_for_concurrent_read - 1) / info.min_marks_for_concurrent_read, parts_with_ranges.size());
num_streams = std::max(
(info.sum_marks + info.min_marks_for_concurrent_read - 1) / info.min_marks_for_concurrent_read, parts_with_ranges.size());
}
return read(std::move(parts_with_ranges), column_names, ReadType::Default,
num_streams, info.min_marks_for_concurrent_read, info.use_uncompressed_cache);
return read(
std::move(parts_with_ranges),
column_names,
ReadType::Default,
num_streams,
info.min_marks_for_concurrent_read,
info.use_uncompressed_cache);
}
Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(
RangesInDataParts && parts_with_ranges,
const Names & column_names)
{
if (parts_with_ranges.empty())
return {};
size_t num_streams = requested_num_streams;
if (!output_each_partition_through_separate_port)
{
return spreadMarkRangesAmongStreamsImpl(std::move(parts_with_ranges), column_names, num_streams);
}
else
{
num_streams = std::max<size_t>(1, num_streams / countPartitions(parts_with_ranges));
LOG_DEBUG(
&Poco::Logger::get("debug"),
"spreadMarkRangesAmongStreams {} {} {}",
parts_with_ranges.size(),
requested_num_streams,
countPartitions(parts_with_ranges));
Pipes pipes;
for (auto begin = parts_with_ranges.begin(); begin != parts_with_ranges.end();)
{
const auto end = std::find_if(
begin,
parts_with_ranges.end(),
[&begin](auto & part) { return begin->data_part->info.partition_id != part.data_part->info.partition_id; });
LOG_DEBUG(&Poco::Logger::get("debug"), "spreadMarkRangesAmongStreams {} {}", begin->data_part->info.partition_id, end - begin);
RangesInDataParts partition_parts;
partition_parts.insert(partition_parts.end(), std::make_move_iterator(begin), std::make_move_iterator(end));
pipes.emplace_back(spreadMarkRangesAmongStreamsImpl(std::move(partition_parts), column_names, num_streams));
begin = end;
}
return Pipe::unitePipes(std::move(pipes));
}
}
static ActionsDAGPtr createProjection(const Block & header)
@ -1197,6 +1269,18 @@ void ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
output_stream->sort_description = std::move(sort_description);
output_stream->sort_scope = DataStream::SortScope::Stream;
}
/// Not supported currently. Disable optimisation.
output_each_partition_through_separate_port = false;
}
bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort()
{
if (isQueryWithFinal() || query_info.getInputOrderInfo())
return false;
output_each_partition_through_separate_port = true;
return true;
}
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
@ -1208,6 +1292,15 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
return std::get<ReadFromMergeTree::AnalysisResult>(result_ptr->result);
}
bool ReadFromMergeTree::isQueryWithFinal() const
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
if (query_info.table_expression_modifiers)
return query_info.table_expression_modifiers->hasFinal();
else
return select.final();
}
void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto result = getAnalysisResult();
@ -1243,12 +1336,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
ActionsDAGPtr result_projection;
Names column_names_to_read = std::move(result.column_names_to_read);
const auto & select = query_info.query->as<ASTSelectQuery &>();
bool final = false;
if (query_info.table_expression_modifiers)
final = query_info.table_expression_modifiers->hasFinal();
else
final = select.final();
bool final = isQueryWithFinal();
if (!final && result.sampling.use_sampling)
{
@ -1267,6 +1355,9 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
if (final)
{
if (output_each_partition_through_separate_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used for queries with final");
/// Add columns needed to calculate the sorting expression and the sign.
std::vector<String> add_columns = metadata_for_reading->getColumnsRequiredForSortingKey();
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
@ -1286,6 +1377,9 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
}
else if (input_order_info)
{
if (output_each_partition_through_separate_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used when reading in order is used");
pipe = spreadMarkRangesAmongStreamsWithOrder(
std::move(result.parts_with_ranges),
column_names_to_read,

View File

@ -154,6 +154,10 @@ public:
void requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
/// Returns true if the optimisation is applicable (and applies it then).
bool requestOutputEachPartitionThroughSeparatePort();
bool willOutputEachPartitionThroughSeparatePort() const { return output_each_partition_through_separate_port; }
private:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToReadImpl(
MergeTreeData::DataPartsVector parts,
@ -168,6 +172,8 @@ private:
bool sample_factor_column_queried,
Poco::Logger * log);
bool isQueryWithFinal() const;
int getSortDirection() const
{
const InputOrderInfoPtr & order_info = query_info.getInputOrderInfo();
@ -203,6 +209,9 @@ private:
const size_t preferred_max_column_in_block_size_bytes;
const bool sample_factor_column_queried;
/// Used for aggregation optimisation (see DB::QueryPlanOptimizations::tryAggregateEachPartitionIndependently).
bool output_each_partition_through_separate_port = false;
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read;
Poco::Logger * log;
@ -221,6 +230,8 @@ private:
RangesInDataParts && parts_with_ranges,
const Names & column_names);
Pipe spreadMarkRangesAmongStreamsImpl(RangesInDataParts && parts_with_ranges, const Names & column_names, size_t num_streams);
Pipe spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts_with_ranges,
const Names & column_names,

View File

@ -81,7 +81,7 @@ namespace
/// Worker which merges buckets for two-level aggregation.
/// Atomically increments bucket counter and returns merged result.
class ConvertingAggregatedToChunksSource : public ISource
class ConvertingAggregatedToChunksWithMergingSource : public ISource
{
public:
static constexpr UInt32 NUM_BUCKETS = 256;
@ -101,19 +101,17 @@ public:
using SharedDataPtr = std::shared_ptr<SharedData>;
ConvertingAggregatedToChunksSource(
AggregatingTransformParamsPtr params_,
ManyAggregatedDataVariantsPtr data_,
SharedDataPtr shared_data_,
Arena * arena_)
ConvertingAggregatedToChunksWithMergingSource(
AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, SharedDataPtr shared_data_, Arena * arena_)
: ISource(params_->getHeader(), false)
, params(std::move(params_))
, data(std::move(data_))
, shared_data(std::move(shared_data_))
, arena(arena_)
{}
{
}
String getName() const override { return "ConvertingAggregatedToChunksSource"; }
String getName() const override { return "ConvertingAggregatedToChunksWithMergingSource"; }
protected:
Chunk generate() override
@ -138,21 +136,131 @@ private:
Arena * arena;
};
class ConvertingAggregatedToChunksSource : public ISource
{
public:
ConvertingAggregatedToChunksSource(AggregatingTransformParamsPtr params_, AggregatedDataVariantsPtr variant_)
: ISource(params_->getHeader(), false), params(params_), variant(variant_)
{
}
String getName() const override { return "ConvertingAggregatedToChunksSource"; }
protected:
Chunk generate() override
{
if (!converted)
{
blocks = params->aggregator.convertToBlocks(*variant, params->final, 1 /* max_threads */);
converted = true;
}
if (blocks.empty())
return {};
auto res = convertToChunk(blocks.front());
blocks.pop_front();
return res;
}
private:
AggregatingTransformParamsPtr params;
AggregatedDataVariantsPtr variant;
bool converted = false;
BlocksList blocks;
};
/// Reads chunks from GroupingAggregatedTransform and outputs them.
class FlattenChunksToMergeTransform : public IProcessor
{
public:
explicit FlattenChunksToMergeTransform(const Block & input_header, const Block & output_header)
: IProcessor({input_header}, {output_header})
{
}
String getName() const override { return "FlattenChunksToMergeTransform"; }
private:
void work() override
{
}
void process(Chunk chunk)
{
if (chunk.hasChunkInfo())
{
const auto & info = chunk.getChunkInfo();
if (const auto * chunks_to_merge = typeid_cast<const ChunksToMerge *>(info.get()); chunks_to_merge && chunks_to_merge->chunks)
for (auto & cur_chunk : *chunks_to_merge->chunks)
chunks.emplace_back(std::move(cur_chunk));
}
}
Status prepare() override
{
auto & input = inputs.front();
auto & output = outputs.front();
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (!chunks.empty())
{
output.push(std::move(chunks.front()));
chunks.pop_front();
}
if (input.isFinished() && chunks.empty())
{
output.finish();
return Status::Finished;
}
if (input.isFinished())
return Status::Ready;
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
Chunk chunk = input.pull(false /* set_not_needed */);
process(std::move(chunk));
/// Now transform.
return Status::Ready;
}
std::list<Chunk> chunks;
};
/// Generates chunks with aggregated data.
/// In single level case, aggregates data itself.
/// In two-level case, creates `ConvertingAggregatedToChunksSource` workers:
/// In two-level case, creates `ConvertingAggregatedToChunksWithMergingSource` workers:
///
/// ConvertingAggregatedToChunksSource ->
/// ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform -> AggregatingTransform
/// ConvertingAggregatedToChunksSource ->
/// ConvertingAggregatedToChunksWithMergingSource ->
/// ConvertingAggregatedToChunksWithMergingSource -> ConvertingAggregatedToChunksTransform -> AggregatingTransform
/// ConvertingAggregatedToChunksWithMergingSource ->
///
/// Result chunks guaranteed to be sorted by bucket number.
class ConvertingAggregatedToChunksTransform : public IProcessor
{
public:
ConvertingAggregatedToChunksTransform(AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, size_t num_threads_)
: IProcessor({}, {params_->getHeader()})
, params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_) {}
: IProcessor({}, {params_->getHeader()}), params(std::move(params_)), data(std::move(data_)), num_threads(num_threads_)
{
}
String getName() const override { return "ConvertingAggregatedToChunksTransform"; }
@ -298,7 +406,7 @@ private:
AggregatingTransformParamsPtr params;
ManyAggregatedDataVariantsPtr data;
ConvertingAggregatedToChunksSource::SharedDataPtr shared_data;
ConvertingAggregatedToChunksWithMergingSource::SharedDataPtr shared_data;
size_t num_threads;
@ -368,13 +476,13 @@ private:
void createSources()
{
AggregatedDataVariantsPtr & first = data->at(0);
shared_data = std::make_shared<ConvertingAggregatedToChunksSource::SharedData>();
shared_data = std::make_shared<ConvertingAggregatedToChunksWithMergingSource::SharedData>();
for (size_t thread = 0; thread < num_threads; ++thread)
{
/// Select Arena to avoid race conditions
Arena * arena = first->aggregates_pools.at(thread).get();
auto source = std::make_shared<ConvertingAggregatedToChunksSource>(params, data, shared_data, arena);
auto source = std::make_shared<ConvertingAggregatedToChunksWithMergingSource>(params, data, shared_data, arena);
processors.emplace_back(std::move(source));
}
@ -382,8 +490,7 @@ private:
};
AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformParamsPtr params_)
: AggregatingTransform(std::move(header), std::move(params_)
, std::make_unique<ManyAggregatedData>(1), 0, 1, 1)
: AggregatingTransform(std::move(header), std::move(params_), std::make_unique<ManyAggregatedData>(1), 0, 1, 1, false)
{
}
@ -391,18 +498,22 @@ AggregatingTransform::AggregatingTransform(
Block header,
AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data_,
size_t current_variant,
size_t current_variant_,
size_t max_threads_,
size_t temporary_data_merge_threads_)
size_t temporary_data_merge_threads_,
bool skip_merging_)
: IProcessor({std::move(header)}, {params_->getHeader()})
, params(std::move(params_))
, key_columns(params->params.keys_size)
, aggregate_columns(params->params.aggregates_size)
, many_data(std::move(many_data_))
, variants(*many_data->variants[current_variant])
, variants(*many_data->variants[current_variant_])
, max_threads(std::min(many_data->variants.size(), max_threads_))
, temporary_data_merge_threads(temporary_data_merge_threads_)
, current_variant(current_variant_)
, skip_merging(skip_merging_)
{
(void)current_variant;
}
AggregatingTransform::~AggregatingTransform() = default;
@ -575,12 +686,30 @@ void AggregatingTransform::initGenerate()
if (!params->aggregator.hasTemporaryData())
{
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data));
processors.emplace_back(std::make_shared<ConvertingAggregatedToChunksTransform>(params, std::move(prepared_data_ptr), max_threads));
if (!skip_merging)
{
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data));
processors.emplace_back(
std::make_shared<ConvertingAggregatedToChunksTransform>(params, std::move(prepared_data_ptr), max_threads));
}
else
{
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
Pipes pipes;
for (auto & variant : prepared_data)
pipes.emplace_back(std::make_shared<ConvertingAggregatedToChunksSource>(params, variant));
Pipe pipe = Pipe::unitePipes(std::move(pipes));
pipe.addTransform(std::make_shared<GroupingAggregatedTransform>(pipe.getHeader(), pipe.numOutputPorts(), params));
pipe.addTransform(std::make_shared<FlattenChunksToMergeTransform>(pipe.getHeader(), params->getHeader()));
processors = Pipe::detachProcessors(std::move(pipe));
}
}
else
{
if (skip_merging)
throw Exception(ErrorCodes::LOGICAL_ERROR, "not expected for external aggregation");
/// If there are temporary files with partially-aggregated data on the disk,
/// then read and merge them, spending the minimum amount of memory.

View File

@ -147,9 +147,10 @@ public:
Block header,
AggregatingTransformParamsPtr params_,
ManyAggregatedDataPtr many_data,
size_t current_variant,
size_t current_variant_,
size_t max_threads,
size_t temporary_data_merge_threads);
size_t temporary_data_merge_threads,
bool skip_merging_ = false);
~AggregatingTransform() override;
String getName() const override { return "AggregatingTransform"; }
@ -181,6 +182,8 @@ private:
AggregatedDataVariants & variants;
size_t max_threads = 1;
size_t temporary_data_merge_threads = 1;
size_t current_variant;
bool skip_merging = false;
/// TODO: calculate time only for aggregation.
Stopwatch watch;

View File

@ -86,7 +86,8 @@ private:
bool read_from_all_inputs = false;
std::vector<bool> read_from_input;
bool expect_several_chunks_for_single_bucket_per_source = false;
/// If we aggregate partitioned data several chunks might be produced for the same bucket: one for each partition.
bool expect_several_chunks_for_single_bucket_per_source = true;
/// Add chunk read from input to chunks_map, overflow_chunks or single_level_chunks according to it's chunk info.
void addChunk(Chunk chunk, size_t input);

View File

@ -0,0 +1,13 @@
<test>
<settings>
<!-- <allow_aggregate_each_partition_independently>1</allow_aggregate_each_partition_independently> -->
</settings>
<create_query>create table t(a UInt32) engine=MergeTree order by tuple() partition by a % 16</create_query>
<fill_query>insert into t select * from numbers_mt(5e7)</fill_query>
<query>select a from t group by a format Null</query>
<drop_query>drop table t</drop_query>
</test>

View File

@ -0,0 +1,83 @@
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 4 → 16
AggregatingTransform × 4
(Expression)
ExpressionTransform × 4
(ReadFromMergeTree)
Resize 3 → 1
MergeTreeThread × 3 0 → 1
Resize 3 → 1
MergeTreeThread × 3 0 → 1
Resize 3 → 1
MergeTreeThread × 3 0 → 1
Resize 3 → 1
MergeTreeThread × 3 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 8 → 16
AggregatingTransform × 8
(Expression)
ExpressionTransform × 8
(ReadFromMergeTree)
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
Resize 2 → 1
MergeTreeThread × 2 0 → 1
1000000
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 16 → 16
AggregatingTransform × 16
(Expression)
ExpressionTransform × 16
(ReadFromMergeTree)
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
1000000

View File

@ -0,0 +1,42 @@
set max_threads = 16;
create table t1(a UInt32) engine=MergeTree order by tuple() partition by a % 4;
system stop merges t1;
insert into t1 select number from numbers_mt(1e6);
insert into t1 select number from numbers_mt(1e6);
explain pipeline select a from t1 group by a;
select count() from (select throwIf(count() != 2) from t1 group by a);
drop table t1;
create table t2(a UInt32) engine=MergeTree order by tuple() partition by a % 8;
system stop merges t2;
insert into t2 select number from numbers_mt(1e6);
insert into t2 select number from numbers_mt(1e6);
explain pipeline select a from t2 group by a;
select count() from (select throwIf(count() != 2) from t2 group by a);
drop table t2;
create table t3(a UInt32) engine=MergeTree order by tuple() partition by a % 16;
system stop merges t3;
insert into t3 select number from numbers_mt(1e6);
insert into t3 select number from numbers_mt(1e6);
explain pipeline select a from t3 group by a;
select count() from (select throwIf(count() != 2) from t3 group by a);
select throwIf(count() != 4) from remote('127.0.0.{1,2}', currentDatabase(), t3) group by a format Null;
drop table t3;