improve performance of aggregation in order of primary key

This commit is contained in:
Anton Popov 2021-10-18 18:27:07 +03:00
parent e543f720c6
commit 1d9cfc04ef
12 changed files with 70 additions and 101 deletions

View File

@ -82,6 +82,7 @@ public:
MutableColumns cloneEmptyColumns() const;
const ChunkInfoPtr & getChunkInfo() const { return chunk_info; }
bool hasChunkInfo() const { return chunk_info != nullptr; }
void setChunkInfo(ChunkInfoPtr chunk_info_) { chunk_info = std::move(chunk_info_); }
UInt64 getNumRows() const { return num_rows; }

View File

@ -1,4 +1,5 @@
#include <Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Core/SortCursor.h>
@ -25,14 +26,12 @@ FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_,
size_t max_block_size_,
size_t merge_threads_)
size_t max_block_size_)
: header(header_)
, num_inputs(num_inputs_)
, params(params_)
, description(std::move(description_))
, max_block_size(max_block_size_)
, pool(merge_threads_)
{
/// Replace column names in description to positions.
for (auto & column_description : description)
@ -60,12 +59,6 @@ void FinishAggregatingInOrderAlgorithm::consume(Input & input, size_t source_num
IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
{
if (finished)
{
auto res = popResult();
return Status(std::move(res), results.empty());
}
if (!inputs_to_update.empty())
{
Status status(inputs_to_update.back());
@ -89,13 +82,7 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
}
if (!best_input)
{
aggregate();
pool.wait();
finished = true;
auto res = popResult();
return Status(std::move(res), results.empty());
}
return Status(prepareToMerge(), true);
/// Chunk at best_input will be aggregated entirely.
auto & best_state = states[*best_input];
@ -126,34 +113,20 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
/// Do not merge blocks, if there are too few rows.
if (accumulated_rows >= max_block_size)
aggregate();
status.chunk = prepareToMerge();
status.chunk = popResult();
return status;
}
Chunk FinishAggregatingInOrderAlgorithm::popResult()
{
std::lock_guard lock(results_mutex);
if (results.empty())
return {};
auto res = std::move(results.back());
results.pop_back();
return res;
}
void FinishAggregatingInOrderAlgorithm::aggregate()
Chunk FinishAggregatingInOrderAlgorithm::prepareToMerge()
{
accumulated_rows = 0;
pool.scheduleOrThrowOnError([this, blocks_list = std::move(blocks)]() mutable
{
auto aggregated = params->aggregator.mergeBlocks(blocks_list, false);
auto info = std::make_shared<ChunksToMerge>();
info->chunks = std::make_unique<Chunks>(std::move(chunks));
std::lock_guard lock(results_mutex);
results.emplace_back(aggregated.getColumns(), aggregated.rows());
});
Chunk chunk;
chunk.setChunkInfo(std::move(info));
return chunk;
}
void FinishAggregatingInOrderAlgorithm::addToAggregation()
@ -164,22 +137,25 @@ void FinishAggregatingInOrderAlgorithm::addToAggregation()
if (!state.isValid() || state.current_row == state.to_row)
continue;
if (state.to_row - state.current_row == state.num_rows)
size_t current_rows = state.to_row - state.current_row;
if (current_rows == state.num_rows)
{
blocks.emplace_back(header.cloneWithColumns(state.all_columns));
chunks.emplace_back(state.all_columns, current_rows);
}
else
{
Columns new_columns;
new_columns.reserve(state.all_columns.size());
for (const auto & column : state.all_columns)
new_columns.emplace_back(column->cut(state.current_row, state.to_row - state.current_row));
new_columns.emplace_back(column->cut(state.current_row, current_rows));
blocks.emplace_back(header.cloneWithColumns(new_columns));
chunks.emplace_back(std::move(new_columns), current_rows);
}
chunks.back().setChunkInfo(std::make_shared<AggregatedChunkInfo>());
states[i].current_row = states[i].to_row;
accumulated_rows += blocks.back().rows();
accumulated_rows += current_rows;
if (!states[i].isValid())
inputs_to_update.push_back(i);

View File

@ -39,17 +39,15 @@ public:
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_,
size_t max_block_size_,
size_t merge_threads_);
size_t max_block_size_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
Status merge() override;
private:
void aggregate();
Chunk prepareToMerge();
void addToAggregation();
Chunk popResult();
struct State
{
@ -72,17 +70,14 @@ private:
AggregatingTransformParamsPtr params;
SortDescription description;
size_t max_block_size;
ThreadPool pool;
std::mutex results_mutex;
std::vector<Chunk> results;
Inputs current_inputs;
std::vector<State> states;
std::vector<size_t> inputs_to_update;
BlocksList blocks;
std::vector<Chunk> chunks;
size_t accumulated_rows = 0;
bool finished = false;
};
}

View File

@ -17,16 +17,14 @@ public:
size_t num_inputs,
AggregatingTransformParamsPtr params,
SortDescription description,
size_t max_block_size,
size_t merge_threads)
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
header,
num_inputs,
params,
std::move(description),
max_block_size,
merge_threads)
max_block_size)
{
}

View File

@ -132,7 +132,7 @@ IProcessor::Status IMergingTransformBase::prepare()
bool is_port_full = !output.canPush();
/// Push if has data.
if (state.output_chunk && !is_port_full)
if ((state.output_chunk || state.output_chunk.hasChunkInfo()) && !is_port_full)
output.push(std::move(state.output_chunk));
if (!is_initialized)

View File

@ -107,7 +107,7 @@ public:
IMergingAlgorithm::Status status = algorithm.merge();
if (status.chunk && status.chunk.hasRows())
if ((status.chunk && status.chunk.hasRows()) || status.chunk.hasChunkInfo())
{
// std::cerr << "Got chunk with " << status.chunk.getNumRows() << " rows" << std::endl;
state.output_chunk = std::move(status.chunk);

View File

@ -2,6 +2,7 @@
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/FinishAggregatingInOrderTransform.h>
@ -87,24 +88,21 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
aggregating_in_order = collector.detachProcessors(0);
for (auto & column_description : group_by_sort_description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = pipeline.getHeader().getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
transform_params,
group_by_sort_description,
max_block_size,
merge_threads);
max_block_size);
pipeline.addTransform(std::move(transform));
pipeline.resize(merge_threads);
pipeline.addSimpleTransform([&](const Block &)
{
return std::make_shared<MergingAggregatedBucketTransform>(transform_params);
});
aggregating_sorted = collector.detachProcessors(1);
}
else
@ -114,14 +112,14 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
return std::make_shared<AggregatingInOrderTransform>(header, transform_params, group_by_sort_description, max_block_size);
});
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinalizeAggregatedTransform>(header, transform_params);
});
aggregating_in_order = collector.detachProcessors(0);
}
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinalizingSimpleTransform>(header, transform_params);
});
finalizing = collector.detachProcessors(2);
return;
}

View File

@ -133,7 +133,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
variants.without_key = nullptr;
/// Arenas cannot be destroyed here, since later, in FinalizingSimpleTransform
/// Arenas cannot be destroyed here, since later, in FinalizeAggregatedTransform
/// there will be finalizeChunk(), but even after
/// finalizeChunk() we cannot destroy arena, since some memory
/// from Arena still in use, so we attach it to the Chunk to

View File

@ -64,10 +64,10 @@ private:
};
class FinalizingSimpleTransform : public ISimpleTransform
class FinalizeAggregatedTransform : public ISimpleTransform
{
public:
FinalizingSimpleTransform(Block header, AggregatingTransformParamsPtr params_)
FinalizeAggregatedTransform(Block header, AggregatingTransformParamsPtr params_)
: ISimpleTransform({std::move(header)}, {params_->getHeader()}, true)
, params(params_) {}
@ -82,7 +82,7 @@ public:
}
}
String getName() const override { return "FinalizingSimpleTransform"; }
String getName() const override { return "FinalizeAggregatedTransform"; }
private:
AggregatingTransformParamsPtr params;

View File

@ -12,13 +12,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
struct ChunksToMerge : public ChunkInfo
{
std::unique_ptr<Chunks> chunks;
Int32 bucket_num = -1;
bool is_overflows = false;
};
GroupingAggregatedTransform::GroupingAggregatedTransform(
const Block & header_, size_t num_inputs_, AggregatingTransformParamsPtr params_)
: IProcessor(InputPorts(num_inputs_, header_), { Block() })

View File

@ -137,6 +137,13 @@ private:
void addChunk(Chunk chunk, size_t from_input);
};
struct ChunksToMerge : public ChunkInfo
{
std::unique_ptr<Chunks> chunks;
Int32 bucket_num = -1;
bool is_overflows = false;
};
class Pipe;
/// Adds processors to pipe which performs memory efficient merging of partially aggregated data from several sources.

View File

@ -1,17 +1,18 @@
(Expression)
ExpressionTransform
ExpressionTransform × 3
(Aggregating)
FinalizingSimpleTransform
FinishAggregatingInOrderTransform 3 → 1
AggregatingInOrderTransform × 3
(Expression)
ExpressionTransform × 3
(SettingQuotaAndLimits)
(ReadFromMergeTree)
ExpressionTransform × 4
MergeTreeInOrder 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
ExpressionTransform
MergeTreeInOrder 0 → 1
MergingAggregatedBucketTransform × 3
Resize 1 → 3
FinishAggregatingInOrderTransform 3 → 1
AggregatingInOrderTransform × 3
(Expression)
ExpressionTransform × 3
(SettingQuotaAndLimits)
(ReadFromMergeTree)
ExpressionTransform × 4
MergeTreeInOrder 0 → 1
MergingSortedTransform 2 → 1
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
ExpressionTransform
MergeTreeInOrder 0 → 1