From 218de76285281540400c4234ff76b60b5a7e98ef Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Tue, 30 Jul 2019 19:36:52 +0300 Subject: [PATCH] fix rollup and cube modifiers with two-level aggregation --- dbms/src/DataStreams/CubeBlockInputStream.cpp | 61 ++++++++++++------- dbms/src/DataStreams/CubeBlockInputStream.h | 1 + .../DataStreams/RollupBlockInputStream.cpp | 38 ++++++++---- dbms/src/DataStreams/RollupBlockInputStream.h | 1 + .../0_stateless/00701_rollup.reference | 10 +++ .../queries/0_stateless/00701_rollup.sql | 16 ++--- .../0_stateless/00720_with_cube.reference | 13 +++- .../queries/0_stateless/00720_with_cube.sql | 30 ++++----- 8 files changed, 110 insertions(+), 60 deletions(-) diff --git a/dbms/src/DataStreams/CubeBlockInputStream.cpp b/dbms/src/DataStreams/CubeBlockInputStream.cpp index c32378d97e6..50a6c0a970b 100644 --- a/dbms/src/DataStreams/CubeBlockInputStream.cpp +++ b/dbms/src/DataStreams/CubeBlockInputStream.cpp @@ -36,43 +36,58 @@ Block CubeBlockInputStream::getHeader() const Block CubeBlockInputStream::readImpl() { - /** After reading a block from input stream, + /** After reading all blocks from input stream, * we will calculate all subsets of columns on next iterations of readImpl * by zeroing columns at positions, where bits are zero in current bitmask. */ - if (mask) + + if (!is_data_read) { - --mask; - Block cube_block = source_block; - for (size_t i = 0; i < keys.size(); ++i) + BlocksList source_blocks; + while (auto block = children[0]->read()) + source_blocks.push_back(block); + + if (source_blocks.empty()) + return {}; + + is_data_read = true; + mask = (1 << keys.size()) - 1; + + if (source_blocks.size() > 1) + source_block = aggregator.mergeBlocks(source_blocks, false); + else + source_block = std::move(source_blocks.front()); + + zero_block = source_block.cloneEmpty(); + for (auto key : keys) { - if (!((mask >> i) & 1)) - { - size_t pos = keys.size() - i - 1; - auto & current = cube_block.getByPosition(keys[pos]); - current.column = zero_block.getByPosition(keys[pos]).column; - } + auto & current = zero_block.getByPosition(key); + current.column = current.column->cloneResized(source_block.rows()); } - BlocksList cube_blocks = { cube_block }; - Block finalized = aggregator.mergeBlocks(cube_blocks, true); + auto finalized = source_block; + finalizeBlock(finalized); return finalized; } - source_block = children[0]->read(); - if (!source_block) - return source_block; + if (!mask) + return {}; - zero_block = source_block.cloneEmpty(); - for (auto key : keys) + --mask; + auto cube_block = source_block; + + for (size_t i = 0; i < keys.size(); ++i) { - auto & current = zero_block.getByPosition(key); - current.column = current.column->cloneResized(source_block.rows()); + if (!((mask >> i) & 1)) + { + size_t pos = keys.size() - i - 1; + auto & current = cube_block.getByPosition(keys[pos]); + current.column = zero_block.getByPosition(keys[pos]).column; + } } - Block finalized = source_block; - finalizeBlock(finalized); - mask = (1 << keys.size()) - 1; + BlocksList cube_blocks = { cube_block }; + Block finalized = aggregator.mergeBlocks(cube_blocks, true); return finalized; } } diff --git a/dbms/src/DataStreams/CubeBlockInputStream.h b/dbms/src/DataStreams/CubeBlockInputStream.h index 2f435a6031c..7e62950e8ee 100644 --- a/dbms/src/DataStreams/CubeBlockInputStream.h +++ b/dbms/src/DataStreams/CubeBlockInputStream.h @@ -36,6 +36,7 @@ private: UInt32 mask = 0; Block source_block; Block zero_block; + bool is_data_read = false; }; } diff --git a/dbms/src/DataStreams/RollupBlockInputStream.cpp b/dbms/src/DataStreams/RollupBlockInputStream.cpp index e43aa51e617..a913dc727fa 100644 --- a/dbms/src/DataStreams/RollupBlockInputStream.cpp +++ b/dbms/src/DataStreams/RollupBlockInputStream.cpp @@ -33,26 +33,40 @@ Block RollupBlockInputStream::readImpl() * by zeroing out every column one-by-one and re-merging a block. */ - if (current_key >= 0) + if (!is_data_read) { - auto & current = rollup_block.getByPosition(keys[current_key]); - current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows()); - --current_key; + BlocksList source_blocks; + while (auto block = children[0]->read()) + source_blocks.push_back(block); - BlocksList rollup_blocks = { rollup_block }; - rollup_block = aggregator.mergeBlocks(rollup_blocks, false); + if (source_blocks.empty()) + return {}; - Block finalized = rollup_block; + is_data_read = true; + if (source_blocks.size() > 1) + rollup_block = aggregator.mergeBlocks(source_blocks, false); + else + rollup_block = std::move(source_blocks.front()); + + current_key = keys.size() - 1; + + auto finalized = rollup_block; finalizeBlock(finalized); return finalized; } - Block block = children[0]->read(); - current_key = keys.size() - 1; + if (current_key < 0) + return {}; - rollup_block = block; - finalizeBlock(block); + auto & current = rollup_block.getByPosition(keys[current_key]); + current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows()); + --current_key; - return block; + BlocksList rollup_blocks = { rollup_block }; + rollup_block = aggregator.mergeBlocks(rollup_blocks, false); + + auto finalized = rollup_block; + finalizeBlock(finalized); + return finalized; } } diff --git a/dbms/src/DataStreams/RollupBlockInputStream.h b/dbms/src/DataStreams/RollupBlockInputStream.h index 1c1e29e7a13..dabf1e392a3 100644 --- a/dbms/src/DataStreams/RollupBlockInputStream.h +++ b/dbms/src/DataStreams/RollupBlockInputStream.h @@ -35,6 +35,7 @@ private: ColumnNumbers keys; ssize_t current_key = -1; Block rollup_block; + bool is_data_read = false; }; } diff --git a/dbms/tests/queries/0_stateless/00701_rollup.reference b/dbms/tests/queries/0_stateless/00701_rollup.reference index ec07ad52cae..637ae0bcb52 100644 --- a/dbms/tests/queries/0_stateless/00701_rollup.reference +++ b/dbms/tests/queries/0_stateless/00701_rollup.reference @@ -25,3 +25,13 @@ a 70 4 b 50 4 120 8 + 120 8 +a 70 4 +b 50 4 + 0 120 8 +a 0 70 4 +a 1 25 2 +a 2 45 2 +b 0 50 4 +b 1 15 2 +b 2 35 2 diff --git a/dbms/tests/queries/0_stateless/00701_rollup.sql b/dbms/tests/queries/0_stateless/00701_rollup.sql index 3f4df923f90..fa7f3a21657 100644 --- a/dbms/tests/queries/0_stateless/00701_rollup.sql +++ b/dbms/tests/queries/0_stateless/00701_rollup.sql @@ -1,14 +1,9 @@ DROP TABLE IF EXISTS rollup; CREATE TABLE rollup(a String, b Int32, s Int32) ENGINE = Memory; -INSERT INTO rollup VALUES('a', 1, 10); -INSERT INTO rollup VALUES('a', 1, 15); -INSERT INTO rollup VALUES('a', 2, 20); -INSERT INTO rollup VALUES('a', 2, 25); -INSERT INTO rollup VALUES('b', 1, 10); -INSERT INTO rollup VALUES('b', 1, 5); -INSERT INTO rollup VALUES('b', 2, 20); -INSERT INTO rollup VALUES('b', 2, 15); +INSERT INTO rollup VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20); +INSERT INTO rollup VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5); +INSERT INTO rollup VALUES ('b', 2, 20), ('b', 2, 15); SELECT a, b, sum(s), count() from rollup GROUP BY ROLLUP(a, b) ORDER BY a, b; @@ -20,4 +15,9 @@ SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP ORDER BY a; SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP WITH TOTALS ORDER BY a; +SET group_by_two_level_threshold = 1; + +SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP ORDER BY a; +SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH ROLLUP ORDER BY a, b; + DROP TABLE rollup; diff --git a/dbms/tests/queries/0_stateless/00720_with_cube.reference b/dbms/tests/queries/0_stateless/00720_with_cube.reference index a0b951978f9..818e8626dde 100644 --- a/dbms/tests/queries/0_stateless/00720_with_cube.reference +++ b/dbms/tests/queries/0_stateless/00720_with_cube.reference @@ -18,8 +18,8 @@ b 1 15 2 b 2 35 2 0 120 8 - 1 40 4 0 120 8 + 1 40 4 2 80 4 a 0 70 4 a 1 25 2 @@ -27,8 +27,8 @@ a 2 45 2 b 0 50 4 b 1 15 2 b 2 35 2 - 1 40 4 0 120 8 + 1 40 4 2 80 4 a 0 70 4 a 1 25 2 @@ -38,3 +38,12 @@ b 1 15 2 b 2 35 2 0 120 8 + 0 120 8 + 1 40 4 + 2 80 4 +a 0 70 4 +a 1 25 2 +a 2 45 2 +b 0 50 4 +b 1 15 2 +b 2 35 2 diff --git a/dbms/tests/queries/0_stateless/00720_with_cube.sql b/dbms/tests/queries/0_stateless/00720_with_cube.sql index bcde617803e..42b65c8222c 100644 --- a/dbms/tests/queries/0_stateless/00720_with_cube.sql +++ b/dbms/tests/queries/0_stateless/00720_with_cube.sql @@ -1,21 +1,21 @@ -DROP TABLE IF EXISTS rollup; -CREATE TABLE rollup(a String, b Int32, s Int32) ENGINE = Memory; +DROP TABLE IF EXISTS cube; +CREATE TABLE cube(a String, b Int32, s Int32) ENGINE = Memory; -INSERT INTO rollup VALUES('a', 1, 10); -INSERT INTO rollup VALUES('a', 1, 15); -INSERT INTO rollup VALUES('a', 2, 20); -INSERT INTO rollup VALUES('a', 2, 25); -INSERT INTO rollup VALUES('b', 1, 10); -INSERT INTO rollup VALUES('b', 1, 5); -INSERT INTO rollup VALUES('b', 2, 20); -INSERT INTO rollup VALUES('b', 2, 15); +-- SET experimental_use_processors=1; -SELECT a, b, sum(s), count() from rollup GROUP BY CUBE(a, b) ORDER BY a, b; +INSERT INTO cube VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20); +INSERT INTO cube VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5); +INSERT INTO cube VALUES ('b', 2, 20), ('b', 2, 15); -SELECT a, b, sum(s), count() from rollup GROUP BY CUBE(a, b) WITH TOTALS ORDER BY a, b; +SELECT a, b, sum(s), count() from cube GROUP BY CUBE(a, b) ORDER BY a, b; -SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH CUBE ORDER BY a; +SELECT a, b, sum(s), count() from cube GROUP BY CUBE(a, b) WITH TOTALS ORDER BY a, b; -SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH CUBE WITH TOTALS ORDER BY a; +SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE ORDER BY a, b; -DROP TABLE rollup; +SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE WITH TOTALS ORDER BY a, b; + +SET group_by_two_level_threshold = 1; +SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE ORDER BY a, b; + +DROP TABLE cube;