fix rollup and cube modifiers with two-level aggregation

This commit is contained in:
CurtizJ 2019-07-30 19:36:52 +03:00
parent c4f1038efc
commit 218de76285
8 changed files with 110 additions and 60 deletions

View File

@ -36,43 +36,58 @@ Block CubeBlockInputStream::getHeader() const
Block CubeBlockInputStream::readImpl() Block CubeBlockInputStream::readImpl()
{ {
/** After reading a block from input stream, /** After reading all blocks from input stream,
* we will calculate all subsets of columns on next iterations of readImpl * we will calculate all subsets of columns on next iterations of readImpl
* by zeroing columns at positions, where bits are zero in current bitmask. * by zeroing columns at positions, where bits are zero in current bitmask.
*/ */
if (mask)
if (!is_data_read)
{ {
--mask; BlocksList source_blocks;
Block cube_block = source_block; while (auto block = children[0]->read())
for (size_t i = 0; i < keys.size(); ++i) source_blocks.push_back(block);
if (source_blocks.empty())
return {};
is_data_read = true;
mask = (1 << keys.size()) - 1;
if (source_blocks.size() > 1)
source_block = aggregator.mergeBlocks(source_blocks, false);
else
source_block = std::move(source_blocks.front());
zero_block = source_block.cloneEmpty();
for (auto key : keys)
{ {
if (!((mask >> i) & 1)) auto & current = zero_block.getByPosition(key);
{ current.column = current.column->cloneResized(source_block.rows());
size_t pos = keys.size() - i - 1;
auto & current = cube_block.getByPosition(keys[pos]);
current.column = zero_block.getByPosition(keys[pos]).column;
}
} }
BlocksList cube_blocks = { cube_block }; auto finalized = source_block;
Block finalized = aggregator.mergeBlocks(cube_blocks, true); finalizeBlock(finalized);
return finalized; return finalized;
} }
source_block = children[0]->read(); if (!mask)
if (!source_block) return {};
return source_block;
zero_block = source_block.cloneEmpty(); --mask;
for (auto key : keys) auto cube_block = source_block;
for (size_t i = 0; i < keys.size(); ++i)
{ {
auto & current = zero_block.getByPosition(key); if (!((mask >> i) & 1))
current.column = current.column->cloneResized(source_block.rows()); {
size_t pos = keys.size() - i - 1;
auto & current = cube_block.getByPosition(keys[pos]);
current.column = zero_block.getByPosition(keys[pos]).column;
}
} }
Block finalized = source_block;
finalizeBlock(finalized);
mask = (1 << keys.size()) - 1;
BlocksList cube_blocks = { cube_block };
Block finalized = aggregator.mergeBlocks(cube_blocks, true);
return finalized; return finalized;
} }
} }

View File

@ -36,6 +36,7 @@ private:
UInt32 mask = 0; UInt32 mask = 0;
Block source_block; Block source_block;
Block zero_block; Block zero_block;
bool is_data_read = false;
}; };
} }

View File

@ -33,26 +33,40 @@ Block RollupBlockInputStream::readImpl()
* by zeroing out every column one-by-one and re-merging a block. * by zeroing out every column one-by-one and re-merging a block.
*/ */
if (current_key >= 0) if (!is_data_read)
{ {
auto & current = rollup_block.getByPosition(keys[current_key]); BlocksList source_blocks;
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows()); while (auto block = children[0]->read())
--current_key; source_blocks.push_back(block);
BlocksList rollup_blocks = { rollup_block }; if (source_blocks.empty())
rollup_block = aggregator.mergeBlocks(rollup_blocks, false); return {};
Block finalized = rollup_block; is_data_read = true;
if (source_blocks.size() > 1)
rollup_block = aggregator.mergeBlocks(source_blocks, false);
else
rollup_block = std::move(source_blocks.front());
current_key = keys.size() - 1;
auto finalized = rollup_block;
finalizeBlock(finalized); finalizeBlock(finalized);
return finalized; return finalized;
} }
Block block = children[0]->read(); if (current_key < 0)
current_key = keys.size() - 1; return {};
rollup_block = block; auto & current = rollup_block.getByPosition(keys[current_key]);
finalizeBlock(block); current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;
return block; BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
auto finalized = rollup_block;
finalizeBlock(finalized);
return finalized;
} }
} }

View File

@ -35,6 +35,7 @@ private:
ColumnNumbers keys; ColumnNumbers keys;
ssize_t current_key = -1; ssize_t current_key = -1;
Block rollup_block; Block rollup_block;
bool is_data_read = false;
}; };
} }

View File

@ -25,3 +25,13 @@ a 70 4
b 50 4 b 50 4
120 8 120 8
120 8
a 70 4
b 50 4
0 120 8
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2

View File

@ -1,14 +1,9 @@
DROP TABLE IF EXISTS rollup; DROP TABLE IF EXISTS rollup;
CREATE TABLE rollup(a String, b Int32, s Int32) ENGINE = Memory; CREATE TABLE rollup(a String, b Int32, s Int32) ENGINE = Memory;
INSERT INTO rollup VALUES('a', 1, 10); INSERT INTO rollup VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20);
INSERT INTO rollup VALUES('a', 1, 15); INSERT INTO rollup VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5);
INSERT INTO rollup VALUES('a', 2, 20); INSERT INTO rollup VALUES ('b', 2, 20), ('b', 2, 15);
INSERT INTO rollup VALUES('a', 2, 25);
INSERT INTO rollup VALUES('b', 1, 10);
INSERT INTO rollup VALUES('b', 1, 5);
INSERT INTO rollup VALUES('b', 2, 20);
INSERT INTO rollup VALUES('b', 2, 15);
SELECT a, b, sum(s), count() from rollup GROUP BY ROLLUP(a, b) ORDER BY a, b; SELECT a, b, sum(s), count() from rollup GROUP BY ROLLUP(a, b) ORDER BY a, b;
@ -20,4 +15,9 @@ SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP ORDER BY a;
SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP WITH TOTALS ORDER BY a; SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP WITH TOTALS ORDER BY a;
SET group_by_two_level_threshold = 1;
SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP ORDER BY a;
SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH ROLLUP ORDER BY a, b;
DROP TABLE rollup; DROP TABLE rollup;

View File

@ -18,8 +18,8 @@ b 1 15 2
b 2 35 2 b 2 35 2
0 120 8 0 120 8
1 40 4
0 120 8 0 120 8
1 40 4
2 80 4 2 80 4
a 0 70 4 a 0 70 4
a 1 25 2 a 1 25 2
@ -27,8 +27,8 @@ a 2 45 2
b 0 50 4 b 0 50 4
b 1 15 2 b 1 15 2
b 2 35 2 b 2 35 2
1 40 4
0 120 8 0 120 8
1 40 4
2 80 4 2 80 4
a 0 70 4 a 0 70 4
a 1 25 2 a 1 25 2
@ -38,3 +38,12 @@ b 1 15 2
b 2 35 2 b 2 35 2
0 120 8 0 120 8
0 120 8
1 40 4
2 80 4
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2

View File

@ -1,21 +1,21 @@
DROP TABLE IF EXISTS rollup; DROP TABLE IF EXISTS cube;
CREATE TABLE rollup(a String, b Int32, s Int32) ENGINE = Memory; CREATE TABLE cube(a String, b Int32, s Int32) ENGINE = Memory;
INSERT INTO rollup VALUES('a', 1, 10); -- SET experimental_use_processors=1;
INSERT INTO rollup VALUES('a', 1, 15);
INSERT INTO rollup VALUES('a', 2, 20);
INSERT INTO rollup VALUES('a', 2, 25);
INSERT INTO rollup VALUES('b', 1, 10);
INSERT INTO rollup VALUES('b', 1, 5);
INSERT INTO rollup VALUES('b', 2, 20);
INSERT INTO rollup VALUES('b', 2, 15);
SELECT a, b, sum(s), count() from rollup GROUP BY CUBE(a, b) ORDER BY a, b; INSERT INTO cube VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20);
INSERT INTO cube VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5);
INSERT INTO cube VALUES ('b', 2, 20), ('b', 2, 15);
SELECT a, b, sum(s), count() from rollup GROUP BY CUBE(a, b) WITH TOTALS ORDER BY a, b; SELECT a, b, sum(s), count() from cube GROUP BY CUBE(a, b) ORDER BY a, b;
SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH CUBE ORDER BY a; SELECT a, b, sum(s), count() from cube GROUP BY CUBE(a, b) WITH TOTALS ORDER BY a, b;
SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH CUBE WITH TOTALS ORDER BY a; SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE ORDER BY a, b;
DROP TABLE rollup; SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE WITH TOTALS ORDER BY a, b;
SET group_by_two_level_threshold = 1;
SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE ORDER BY a, b;
DROP TABLE cube;