Merge pull request #10279 from ClickHouse/fix-max-rows-to-gruop-by-stuck

Fix max_rows_to_group_by stuck
This commit is contained in:
Nikolai Kochetov 2020-04-15 16:10:54 +03:00 committed by GitHub
commit 8ab9c5b0d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 0 deletions

View File

@ -413,6 +413,9 @@ AggregatingTransform::~AggregatingTransform() = default;
IProcessor::Status AggregatingTransform::prepare()
{
/// There are one or two input ports.
/// The first one is used at aggregation step, the second one - while reading merged data from ConvertingAggregated
auto & output = outputs.front();
/// Last output is current. All other outputs should already be closed.
auto & input = inputs.back();
@ -432,7 +435,12 @@ IProcessor::Status AggregatingTransform::prepare()
/// Finish data processing, prepare to generating.
if (is_consume_finished && !is_generate_initialized)
{
/// Close input port in case max_rows_to_group_by was reached but not all data was read.
inputs.front().close();
return Status::Ready;
}
if (is_generate_initialized && !is_pipeline_created && !processors.empty())
return Status::ExpandPipeline;

View File

@ -49,6 +49,21 @@ struct ManyAggregatedData
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
using ManyAggregatedDataPtr = std::shared_ptr<ManyAggregatedData>;
/** Aggregates the stream of blocks using the specified key columns and aggregate functions.
* Columns with aggregate functions adds to the end of the block.
* If final = false, the aggregate functions are not finalized, that is, they are not replaced by their value, but contain an intermediate state of calculations.
* This is necessary so that aggregation can continue (for example, by combining streams of partially aggregated data).
*
* For every separate stream of data separate AggregatingTransform is created.
* Every AggregatingTransform reads data from the first port till is is not run out, or max_rows_to_group_by reached.
* When the last AggregatingTransform finish reading, the result of aggregation is needed to be merged together.
* This task is performed by ConvertingAggregatedToChunksTransform.
* Last AggregatingTransform expands pipeline and adds second input port, which reads from ConvertingAggregated.
*
* Aggregation data is passed by ManyAggregatedData structure, which is shared between all aggregating transforms.
* At aggregation step, every transform uses it's own AggregatedDataVariants structure.
* At merging step, all structures pass to ConvertingAggregatedToChunksTransform.
*/
class AggregatingTransform : public IProcessor
{
public:

View File

@ -0,0 +1,33 @@
test2 0
test2 1
test2 2
test2 3
test2 4
test2 5
test2 6
test2 7
test2 8
test2 9
test2 10
test3 0
test3 1
test3 2
test3 3
test3 4
test3 5
test3 6
test3 7
test3 8
test3 9
test3 10
test5 0
test5 1
test5 2
test5 3
test5 4
test5 5
test5 6
test5 7
test5 8
test5 9
test5 10

View File

@ -0,0 +1,17 @@
SET max_block_size = 1;
SET max_rows_to_group_by = 10;
SET group_by_overflow_mode = 'throw';
SELECT 'test1', number FROM system.numbers GROUP BY number; -- { serverError 158 }
SET group_by_overflow_mode = 'break';
SELECT 'test2', number FROM system.numbers GROUP BY number ORDER BY number;
SET max_rows_to_read = 500;
SELECT 'test3', number FROM system.numbers GROUP BY number ORDER BY number;
SET group_by_overflow_mode = 'any';
SELECT 'test4', number FROM numbers(1000) GROUP BY number ORDER BY number; -- { serverError 158 }
SET max_rows_to_read = 1000;
SELECT 'test5', number FROM numbers(1000) GROUP BY number ORDER BY number;