Merge pull request #6225 from CurtizJ/withcube-fix

Fix rollup and cube modifiers with two-level aggregation.
This commit is contained in:
alexey-milovidov 2019-07-31 00:54:35 +03:00 committed by GitHub
commit 8949ef6dd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 110 additions and 60 deletions

View File

@ -36,43 +36,58 @@ Block CubeBlockInputStream::getHeader() const
Block CubeBlockInputStream::readImpl()
{
/** After reading a block from input stream,
/** After reading all blocks from input stream,
* we will calculate all subsets of columns on next iterations of readImpl
* by zeroing columns at positions, where bits are zero in current bitmask.
*/
if (mask)
if (!is_data_read)
{
--mask;
Block cube_block = source_block;
for (size_t i = 0; i < keys.size(); ++i)
BlocksList source_blocks;
while (auto block = children[0]->read())
source_blocks.push_back(block);
if (source_blocks.empty())
return {};
is_data_read = true;
mask = (1 << keys.size()) - 1;
if (source_blocks.size() > 1)
source_block = aggregator.mergeBlocks(source_blocks, false);
else
source_block = std::move(source_blocks.front());
zero_block = source_block.cloneEmpty();
for (auto key : keys)
{
if (!((mask >> i) & 1))
{
size_t pos = keys.size() - i - 1;
auto & current = cube_block.getByPosition(keys[pos]);
current.column = zero_block.getByPosition(keys[pos]).column;
}
auto & current = zero_block.getByPosition(key);
current.column = current.column->cloneResized(source_block.rows());
}
BlocksList cube_blocks = { cube_block };
Block finalized = aggregator.mergeBlocks(cube_blocks, true);
auto finalized = source_block;
finalizeBlock(finalized);
return finalized;
}
source_block = children[0]->read();
if (!source_block)
return source_block;
if (!mask)
return {};
zero_block = source_block.cloneEmpty();
for (auto key : keys)
--mask;
auto cube_block = source_block;
for (size_t i = 0; i < keys.size(); ++i)
{
auto & current = zero_block.getByPosition(key);
current.column = current.column->cloneResized(source_block.rows());
if (!((mask >> i) & 1))
{
size_t pos = keys.size() - i - 1;
auto & current = cube_block.getByPosition(keys[pos]);
current.column = zero_block.getByPosition(keys[pos]).column;
}
}
Block finalized = source_block;
finalizeBlock(finalized);
mask = (1 << keys.size()) - 1;
BlocksList cube_blocks = { cube_block };
Block finalized = aggregator.mergeBlocks(cube_blocks, true);
return finalized;
}
}

View File

@ -36,6 +36,7 @@ private:
UInt32 mask = 0;
Block source_block;
Block zero_block;
bool is_data_read = false;
};
}

View File

@ -33,26 +33,40 @@ Block RollupBlockInputStream::readImpl()
* by zeroing out every column one-by-one and re-merging a block.
*/
if (current_key >= 0)
if (!is_data_read)
{
auto & current = rollup_block.getByPosition(keys[current_key]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;
BlocksList source_blocks;
while (auto block = children[0]->read())
source_blocks.push_back(block);
BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
if (source_blocks.empty())
return {};
Block finalized = rollup_block;
is_data_read = true;
if (source_blocks.size() > 1)
rollup_block = aggregator.mergeBlocks(source_blocks, false);
else
rollup_block = std::move(source_blocks.front());
current_key = keys.size() - 1;
auto finalized = rollup_block;
finalizeBlock(finalized);
return finalized;
}
Block block = children[0]->read();
current_key = keys.size() - 1;
if (current_key < 0)
return {};
rollup_block = block;
finalizeBlock(block);
auto & current = rollup_block.getByPosition(keys[current_key]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;
return block;
BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
auto finalized = rollup_block;
finalizeBlock(finalized);
return finalized;
}
}

View File

@ -35,6 +35,7 @@ private:
ColumnNumbers keys;
ssize_t current_key = -1;
Block rollup_block;
bool is_data_read = false;
};
}

View File

@ -25,3 +25,13 @@ a 70 4
b 50 4
120 8
120 8
a 70 4
b 50 4
0 120 8
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2

View File

@ -1,14 +1,9 @@
DROP TABLE IF EXISTS rollup;
CREATE TABLE rollup(a String, b Int32, s Int32) ENGINE = Memory;
INSERT INTO rollup VALUES('a', 1, 10);
INSERT INTO rollup VALUES('a', 1, 15);
INSERT INTO rollup VALUES('a', 2, 20);
INSERT INTO rollup VALUES('a', 2, 25);
INSERT INTO rollup VALUES('b', 1, 10);
INSERT INTO rollup VALUES('b', 1, 5);
INSERT INTO rollup VALUES('b', 2, 20);
INSERT INTO rollup VALUES('b', 2, 15);
INSERT INTO rollup VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20);
INSERT INTO rollup VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5);
INSERT INTO rollup VALUES ('b', 2, 20), ('b', 2, 15);
SELECT a, b, sum(s), count() from rollup GROUP BY ROLLUP(a, b) ORDER BY a, b;
@ -20,4 +15,9 @@ SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP ORDER BY a;
SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP WITH TOTALS ORDER BY a;
SET group_by_two_level_threshold = 1;
SELECT a, sum(s), count() from rollup GROUP BY a WITH ROLLUP ORDER BY a;
SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH ROLLUP ORDER BY a, b;
DROP TABLE rollup;

View File

@ -18,8 +18,8 @@ b 1 15 2
b 2 35 2
0 120 8
1 40 4
0 120 8
1 40 4
2 80 4
a 0 70 4
a 1 25 2
@ -27,8 +27,8 @@ a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2
1 40 4
0 120 8
1 40 4
2 80 4
a 0 70 4
a 1 25 2
@ -38,3 +38,12 @@ b 1 15 2
b 2 35 2
0 120 8
0 120 8
1 40 4
2 80 4
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2

View File

@ -1,21 +1,21 @@
DROP TABLE IF EXISTS rollup;
CREATE TABLE rollup(a String, b Int32, s Int32) ENGINE = Memory;
DROP TABLE IF EXISTS cube;
CREATE TABLE cube(a String, b Int32, s Int32) ENGINE = Memory;
INSERT INTO rollup VALUES('a', 1, 10);
INSERT INTO rollup VALUES('a', 1, 15);
INSERT INTO rollup VALUES('a', 2, 20);
INSERT INTO rollup VALUES('a', 2, 25);
INSERT INTO rollup VALUES('b', 1, 10);
INSERT INTO rollup VALUES('b', 1, 5);
INSERT INTO rollup VALUES('b', 2, 20);
INSERT INTO rollup VALUES('b', 2, 15);
-- SET experimental_use_processors=1;
SELECT a, b, sum(s), count() from rollup GROUP BY CUBE(a, b) ORDER BY a, b;
INSERT INTO cube VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20);
INSERT INTO cube VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5);
INSERT INTO cube VALUES ('b', 2, 20), ('b', 2, 15);
SELECT a, b, sum(s), count() from rollup GROUP BY CUBE(a, b) WITH TOTALS ORDER BY a, b;
SELECT a, b, sum(s), count() from cube GROUP BY CUBE(a, b) ORDER BY a, b;
SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH CUBE ORDER BY a;
SELECT a, b, sum(s), count() from cube GROUP BY CUBE(a, b) WITH TOTALS ORDER BY a, b;
SELECT a, b, sum(s), count() from rollup GROUP BY a, b WITH CUBE WITH TOTALS ORDER BY a;
SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE ORDER BY a, b;
DROP TABLE rollup;
SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE WITH TOTALS ORDER BY a, b;
SET group_by_two_level_threshold = 1;
SELECT a, b, sum(s), count() from cube GROUP BY a, b WITH CUBE ORDER BY a, b;
DROP TABLE cube;