fix bugs in aggregation by primary key

This commit is contained in:
Anton Popov 2021-03-18 23:17:09 +03:00
parent c821c0c27a
commit 6f7c8894e9
7 changed files with 91 additions and 34 deletions

View File

@ -1260,22 +1260,30 @@ Block Aggregator::prepareBlockAndFill(
return res;
}
void Aggregator::fillAggregateColumnsWithSingleKey(
void Aggregator::addToAggregateColumnsWithSingleKey(
const AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns)
{
const auto & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*final_aggregate_columns[i]);
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
}
}
void Aggregator::finalizeAggregateColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns)
{
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*final_aggregate_columns[i]);
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*final_aggregate_columns[i]);
for (auto & pool : data_variants.aggregates_pools)
{
column_aggregate_func.addArena(pool);
}
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
}
data = nullptr;
data_variants.without_key = nullptr;
}
void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(

View File

@ -1303,7 +1303,11 @@ protected:
AggregateFunctionInstructions & instructions,
NestedColumnsHolder & nested_columns_holder);
void fillAggregateColumnsWithSingleKey(
void addToAggregateColumnsWithSingleKey(
const AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns);
void finalizeAggregateColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns);

View File

@ -24,11 +24,13 @@ FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_)
SortDescription description_,
size_t max_block_size_)
: header(header_)
, num_inputs(num_inputs_)
, params(params_)
, description(std::move(description_))
, max_block_size(max_block_size_)
{
/// Replace column names in description to positions.
for (auto & column_description : description)
@ -56,6 +58,13 @@ void FinishAggregatingInOrderAlgorithm::consume(Input & input, size_t source_num
IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
{
if (!inputs_to_update.empty())
{
Status status(inputs_to_update.back());
inputs_to_update.pop_back();
return status;
}
/// Find the input with smallest last row.
std::optional<size_t> best_input;
for (size_t i = 0; i < num_inputs; ++i)
@ -94,16 +103,30 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
states[i].to_row = (it == indices.end() ? states[i].num_rows : *it);
}
Status status(*best_input);
status.chunk = aggregate();
addToAggregation();
/// At least one chunk should be fully aggregated.
assert(!inputs_to_update.empty());
Status status(inputs_to_update.back());
inputs_to_update.pop_back();
/// Do not merge blocks, if there are too few rows.
if (accumulated_rows >= max_block_size)
status.chunk = aggregate();
return status;
}
Chunk FinishAggregatingInOrderAlgorithm::aggregate()
{
BlocksList blocks;
auto aggregated = params->aggregator.mergeBlocks(blocks, false);
blocks.clear();
accumulated_rows = 0;
return {aggregated.getColumns(), aggregated.rows()};
}
void FinishAggregatingInOrderAlgorithm::addToAggregation()
{
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & state = states[i];
@ -112,7 +135,7 @@ Chunk FinishAggregatingInOrderAlgorithm::aggregate()
if (state.to_row - state.current_row == state.num_rows)
{
blocks.emplace_back(header.cloneWithColumns(states[i].all_columns));
blocks.emplace_back(header.cloneWithColumns(state.all_columns));
}
else
{
@ -125,10 +148,11 @@ Chunk FinishAggregatingInOrderAlgorithm::aggregate()
}
states[i].current_row = states[i].to_row;
accumulated_rows += blocks.back().rows();
if (!states[i].isValid())
inputs_to_update.push_back(i);
}
auto aggregated = params->aggregator.mergeBlocks(blocks, false);
return {aggregated.getColumns(), aggregated.rows()};
}
}

View File

@ -37,7 +37,8 @@ public:
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_);
SortDescription description_,
size_t max_block_size_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
@ -45,6 +46,7 @@ public:
private:
Chunk aggregate();
void addToAggregation();
struct State
{
@ -66,8 +68,13 @@ private:
size_t num_inputs;
AggregatingTransformParamsPtr params;
SortDescription description;
size_t max_block_size;
Inputs current_inputs;
std::vector<State> states;
std::vector<size_t> inputs_to_update;
BlocksList blocks;
size_t accumulated_rows = 0;
};
}

View File

@ -16,13 +16,15 @@ public:
const Block & header,
size_t num_inputs,
AggregatingTransformParamsPtr params,
SortDescription description)
SortDescription description,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,
params,
std::move(description))
std::move(description),
max_block_size)
{
}

View File

@ -100,7 +100,8 @@ void AggregatingStep::transformPipeline(QueryPipeline & pipeline)
pipeline.getHeader(),
pipeline.getNumStreams(),
transform_params,
group_by_sort_description);
group_by_sort_description,
max_block_size);
pipeline.addTransform(std::move(transform));
aggregating_sorted = collector.detachProcessors(1);

View File

@ -58,6 +58,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
LOG_TRACE(log, "Aggregating in order");
is_consume_started = true;
}
src_rows += rows;
src_bytes += chunk.bytes();
@ -82,23 +83,24 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
res_aggregate_columns.resize(params->params.aggregates_size);
for (size_t i = 0; i < params->params.keys_size; ++i)
{
res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
}
for (size_t i = 0; i < params->params.aggregates_size; ++i)
{
res_aggregate_columns[i] = res_header.safeGetByPosition(i + params->params.keys_size).type->createColumn();
}
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
++cur_block_size;
}
ssize_t mid = 0;
ssize_t high = 0;
ssize_t low = -1;
/// Will split block into segments with the same key
while (key_end != rows)
{
high = rows;
/// Find the first position of new (not current) key in current chunk
while (high - low > 1)
{
@ -108,32 +110,34 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
else
high = mid;
}
key_end = high;
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
{
params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
}
low = key_begin = key_end;
/// We finalize last key aggregation state if a new key found.
if (key_begin != rows)
if (key_end != rows)
{
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
params->aggregator.addToAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
/// If res_block_size is reached we have to stop consuming and generate the block. Save the extra rows into new chunk.
if (cur_block_size == res_block_size)
{
Columns source_columns = chunk.detachColumns();
for (auto & source_column : source_columns)
source_column = source_column->cut(key_begin, rows - key_begin);
source_column = source_column->cut(key_end, rows - key_end);
current_chunk = Chunk(source_columns, rows - key_begin);
current_chunk = Chunk(source_columns, rows - key_end);
src_rows -= current_chunk.getNumRows();
block_end_reached = true;
need_generate = true;
cur_block_size = 0;
params->aggregator.finalizeAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
/// Arenas cannot be destroyed here, since later, in FinalizingSimpleTransform
/// there will be finalizeChunk(), but even after
/// finalizeChunk() we cannot destroy arena, since some memory
@ -155,10 +159,14 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
}
/// We create a new state for the new key and update res_key_columns
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_end, res_key_columns);
++cur_block_size;
}
key_begin = key_end;
low = key_end;
}
block_end_reached = false;
}
@ -234,7 +242,10 @@ IProcessor::Status AggregatingInOrderTransform::prepare()
void AggregatingInOrderTransform::generate()
{
if (cur_block_size && is_consume_finished)
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
{
params->aggregator.addToAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
params->aggregator.finalizeAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
}
Block res = res_header.cloneEmpty();