Merge pull request #3172 from CurtizJ/CLICKHOUSE-3979

Add modificator CUBE [CLICKHOUSE-3979]
This commit is contained in:
alexey-milovidov 2018-09-20 21:04:56 +03:00 committed by GitHub
commit 4a095d8433
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 223 additions and 15 deletions

View File

@ -0,0 +1,78 @@
#include <DataStreams/CubeBlockInputStream.h>
#include <DataStreams/finalizeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/FilterDescription.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_COLUMNS;
}
CubeBlockInputStream::CubeBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_) : aggregator(params_),
keys(params_.keys)
{
if (keys.size() > 30)
throw Exception("Too many columns for cube", ErrorCodes::TOO_MANY_COLUMNS);
children.push_back(input_);
Aggregator::CancellationHook hook = [this]() { return this->isCancelled(); };
aggregator.setCancellationHook(hook);
}
Block CubeBlockInputStream::getHeader() const
{
Block res = children.at(0)->getHeader();
finalizeBlock(res);
return res;
}
Block CubeBlockInputStream::readImpl()
{
/** After reading a block from input stream,
* we will calculate all subsets of columns on next iterations of readImpl
* by zeroing columns at positions, where bits are zero in current bitmask.
*/
if (mask)
{
--mask;
Block cube_block = source_block;
for (size_t i = 0; i < keys.size(); ++i)
{
if (!((mask >> i) & 1))
{
size_t pos = keys.size() - i - 1;
auto & current = cube_block.getByPosition(keys[pos]);
current.column = zero_block.getByPosition(pos).column;
}
}
BlocksList cube_blocks = { cube_block };
Block finalized = aggregator.mergeBlocks(cube_blocks, true);
return finalized;
}
source_block = children[0]->read();
if (!source_block)
return source_block;
zero_block = source_block.cloneEmpty();
for (auto key : keys)
{
auto & current = zero_block.getByPosition(key);
current.column = current.column->cloneResized(source_block.rows());
}
Block finalized = source_block;
finalizeBlock(finalized);
mask = (1 << keys.size()) - 1;
return finalized;
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Common/Arena.h>
#include <Interpreters/Aggregator.h>
#include <Core/ColumnNumbers.h>
namespace DB
{
class ExpressionActions;
/** Takes blocks after grouping, with non-finalized aggregate functions.
* Calculates all subsets of columns and aggreagetes over them.
*/
class CubeBlockInputStream : public IProfilingBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using AggregateColumns = std::vector<ColumnRawPtrs>;
public:
CubeBlockInputStream(
const BlockInputStreamPtr & input_, const Aggregator::Params & params_);
String getName() const override { return "Cube"; }
Block getHeader() const override;
protected:
Block readImpl() override;
private:
Aggregator aggregator;
ColumnNumbers keys;
UInt32 mask = 0;
Block source_block;
Block zero_block;
};
}

View File

@ -19,6 +19,7 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <DataStreams/RollupBlockInputStream.h>
#include <DataStreams/CubeBlockInputStream.h>
#include <DataStreams/ConvertColumnWithDictionaryToFullBlockInputStream.h>
#include <Parsers/ASTSelectQuery.h>
@ -499,7 +500,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
bool aggregate_final =
expressions.need_aggregate &&
to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals && !query.group_by_with_rollup;
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
if (expressions.first_stage)
{
@ -557,10 +558,15 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
if (!aggregate_final)
{
if (query.group_by_with_totals)
executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row, !query.group_by_with_rollup);
{
bool final = !query.group_by_with_rollup && !query.group_by_with_cube;
executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row, final);
}
if (query.group_by_with_rollup)
executeRollup(pipeline);
if (query.group_by_with_rollup)
executeRollupOrCube(pipeline, Modificator::ROLLUP);
else if(query.group_by_with_cube)
executeRollupOrCube(pipeline, Modificator::CUBE);
}
else if (expressions.has_having)
executeHaving(pipeline, expressions.before_having);
@ -575,10 +581,15 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
need_second_distinct_pass = query.distinct && pipeline.hasMoreThanOneStream();
if (query.group_by_with_totals && !aggregate_final)
executeTotalsAndHaving(pipeline, false, nullptr, aggregate_overflow_row, !query.group_by_with_rollup);
{
bool final = !query.group_by_with_rollup && !query.group_by_with_cube;
executeTotalsAndHaving(pipeline, expressions.has_having, expressions.before_having, aggregate_overflow_row, final);
}
if (query.group_by_with_rollup && !aggregate_final)
executeRollup(pipeline);
executeRollupOrCube(pipeline, Modificator::ROLLUP);
else if (query.group_by_with_cube && !aggregate_final)
executeRollupOrCube(pipeline, Modificator::CUBE);
}
if (expressions.has_order_by)
@ -1087,7 +1098,7 @@ void InterpreterSelectQuery::executeTotalsAndHaving(Pipeline & pipeline, bool ha
has_having ? query.having_expression->getColumnName() : "", settings.totals_mode, settings.totals_auto_threshold, final);
}
void InterpreterSelectQuery::executeRollup(Pipeline & pipeline)
void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificator modificator)
{
executeUnion(pipeline);
@ -1111,7 +1122,10 @@ void InterpreterSelectQuery::executeRollup(Pipeline & pipeline)
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath());
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
if (modificator == Modificator::ROLLUP)
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
else
pipeline.firstStream() = std::make_shared<CubeBlockInputStream>(pipeline.firstStream(), params);
}

View File

@ -190,7 +190,14 @@ private:
void executeDistinct(Pipeline & pipeline, bool before_order, Names columns);
void executeExtremes(Pipeline & pipeline);
void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void executeRollup(Pipeline & pipeline);
enum class Modificator
{
ROLLUP = 0,
CUBE = 1
};
void executeRollupOrCube(Pipeline & pipeline, Modificator modificator);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
*

View File

@ -106,6 +106,9 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
if (group_by_with_rollup)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH ROLLUP" << (s.hilite ? hilite_none : "");
if (group_by_with_cube)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH CUBE" << (s.hilite ? hilite_none : "");
if (group_by_with_totals)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH TOTALS" << (s.hilite ? hilite_none : "");

View File

@ -29,6 +29,7 @@ public:
ASTPtr group_expression_list;
bool group_by_with_totals = false;
bool group_by_with_rollup = false;
bool group_by_with_cube = false;
ASTPtr having_expression;
ASTPtr order_expression_list;
ASTPtr limit_by_value;

View File

@ -40,6 +40,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_settings("SETTINGS");
ParserKeyword s_by("BY");
ParserKeyword s_rollup("ROLLUP");
ParserKeyword s_cube("CUBE");
ParserKeyword s_top("TOP");
ParserKeyword s_offset("OFFSET");
@ -116,24 +117,27 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (s_group_by.ignore(pos, expected))
{
if (s_rollup.ignore(pos, expected))
{
select_query->group_by_with_rollup = true;
if (!open_bracket.ignore(pos, expected))
return false;
}
else if (s_cube.ignore(pos, expected))
select_query->group_by_with_cube = true;
if ((select_query->group_by_with_rollup || select_query->group_by_with_cube) && !open_bracket.ignore(pos, expected))
return false;
if (!exp_list.parse(pos, select_query->group_expression_list, expected))
return false;
if (select_query->group_by_with_rollup && !close_bracket.ignore(pos, expected))
if ((select_query->group_by_with_rollup || select_query->group_by_with_cube) && !close_bracket.ignore(pos, expected))
return false;
}
/// WITH ROLLUP
/// WITH ROLLUP, CUBE or TOTALS
if (s_with.ignore(pos, expected))
{
if (s_rollup.ignore(pos, expected))
select_query->group_by_with_rollup = true;
else if (s_cube.ignore(pos, expected))
select_query->group_by_with_cube = true;
else if (s_totals.ignore(pos, expected))
select_query->group_by_with_totals = true;
else

View File

@ -0,0 +1,40 @@
0 120 8
1 40 4
2 80 4
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2
0 120 8
1 40 4
2 80 4
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2
0 120 8
1 40 4
0 120 8
2 80 4
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2
1 40 4
0 120 8
2 80 4
a 0 70 4
a 1 25 2
a 2 45 2
b 0 50 4
b 1 15 2
b 2 35 2
0 120 8

View File

@ -0,0 +1,19 @@
DROP TABLE IF EXISTS test.rollup;
CREATE TABLE test.rollup(a String, b Int32, s Int32) ENGINE = Memory;
INSERT INTO test.rollup VALUES('a', 1, 10);
INSERT INTO test.rollup VALUES('a', 1, 15);
INSERT INTO test.rollup VALUES('a', 2, 20);
INSERT INTO test.rollup VALUES('a', 2, 25);
INSERT INTO test.rollup VALUES('b', 1, 10);
INSERT INTO test.rollup VALUES('b', 1, 5);
INSERT INTO test.rollup VALUES('b', 2, 20);
INSERT INTO test.rollup VALUES('b', 2, 15);
SELECT a, b, sum(s), count() from test.rollup GROUP BY CUBE(a, b) ORDER BY a, b;
SELECT a, b, sum(s), count() from test.rollup GROUP BY CUBE(a, b) WITH TOTALS ORDER BY a, b;
SELECT a, b, sum(s), count() from test.rollup GROUP BY a, b WITH CUBE ORDER BY a;
SELECT a, b, sum(s), count() from test.rollup GROUP BY a, b WITH CUBE WITH TOTALS ORDER BY a;