Code cleanup & fix GROUPING() with TOTALS

This commit is contained in:
Dmitry Novik 2022-05-19 16:36:51 +00:00
parent 6356112a76
commit d4c66f4a48
11 changed files with 146 additions and 32 deletions

View File

@ -1,14 +1,14 @@
#include <Functions/IFunction.h>
#pragma once
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFixedString.h>
#include <Core/ColumnNumbers.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypesNumber.h>
#include "Core/ColumnNumbers.h"
#include "DataTypes/Serializations/ISerialization.h"
#include "base/types.h"
#include <Functions/IFunction.h>
namespace DB
{

View File

@ -65,6 +65,8 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int DUPLICATE_COLUMN;
extern const int LOGICAL_ERROR;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
}
static NamesAndTypesList::iterator findColumn(const String & name, NamesAndTypesList & cols)
@ -841,6 +843,11 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
if (node.name == "grouping")
{
size_t arguments_size = node.arguments->children.size();
if (arguments_size == 0)
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "Function GROUPING expects at least one argument");
if (arguments_size > 64)
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION, "Function GROUPING can have up to 64 arguments, but {} provided", arguments_size);
ColumnNumbers arguments_indexes;
auto aggregation_keys_number = data.aggregation_keys.size();
for (auto const & arg : node.arguments->children)

View File

@ -1095,6 +1095,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (query.group_by_with_grouping_sets && query.group_by_with_totals)
throw Exception("WITH TOTALS and GROUPING SETS are not supported together", ErrorCodes::NOT_IMPLEMENTED);
if (expressions.hasHaving() && query.group_by_with_totals && (query.group_by_with_rollup || query.group_by_with_cube))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING");
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
query_info.projection->aggregate_overflow_row = aggregate_overflow_row;
@ -1387,11 +1390,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
executeRollupOrCube(query_plan, Modificator::CUBE);
if ((query.group_by_with_rollup || query.group_by_with_cube || query.group_by_with_grouping_sets) && expressions.hasHaving())
{
if (query.group_by_with_totals)
throw Exception("WITH TOTALS and WITH ROLLUP or CUBE or GROUPING SETS are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED);
executeHaving(query_plan, expressions.before_having, expressions.remove_having_filter);
}
}
else if (expressions.hasHaving())
executeHaving(query_plan, expressions.before_having, expressions.remove_having_filter);

View File

@ -89,30 +89,28 @@ public:
bool group_by_with_grouping_sets = false;
bool limit_with_ties = false;
bool needGroupingSetColumn() const noexcept { return group_by_with_cube || group_by_with_rollup || group_by_with_grouping_sets; }
ASTPtr & refSelect() { return getExpression(Expression::SELECT); }
ASTPtr & refTables() { return getExpression(Expression::TABLES); }
ASTPtr & refPrewhere() { return getExpression(Expression::PREWHERE); }
ASTPtr & refWhere() { return getExpression(Expression::WHERE); }
ASTPtr & refHaving() { return getExpression(Expression::HAVING); }
const ASTPtr with() const { return getExpression(Expression::WITH); }
const ASTPtr select() const { return getExpression(Expression::SELECT); }
const ASTPtr tables() const { return getExpression(Expression::TABLES); }
const ASTPtr prewhere() const { return getExpression(Expression::PREWHERE); }
const ASTPtr where() const { return getExpression(Expression::WHERE); }
const ASTPtr groupBy() const { return getExpression(Expression::GROUP_BY); }
const ASTPtr having() const { return getExpression(Expression::HAVING); }
const ASTPtr window() const { return getExpression(Expression::WINDOW); }
const ASTPtr orderBy() const { return getExpression(Expression::ORDER_BY); }
const ASTPtr limitByOffset() const { return getExpression(Expression::LIMIT_BY_OFFSET); }
const ASTPtr limitByLength() const { return getExpression(Expression::LIMIT_BY_LENGTH); }
const ASTPtr limitBy() const { return getExpression(Expression::LIMIT_BY); }
const ASTPtr limitOffset() const { return getExpression(Expression::LIMIT_OFFSET); }
const ASTPtr limitLength() const { return getExpression(Expression::LIMIT_LENGTH); }
const ASTPtr settings() const { return getExpression(Expression::SETTINGS); }
const ASTPtr interpolate() const { return getExpression(Expression::INTERPOLATE); }
ASTPtr with() const { return getExpression(Expression::WITH); }
ASTPtr select() const { return getExpression(Expression::SELECT); }
ASTPtr tables() const { return getExpression(Expression::TABLES); }
ASTPtr prewhere() const { return getExpression(Expression::PREWHERE); }
ASTPtr where() const { return getExpression(Expression::WHERE); }
ASTPtr groupBy() const { return getExpression(Expression::GROUP_BY); }
ASTPtr having() const { return getExpression(Expression::HAVING); }
ASTPtr window() const { return getExpression(Expression::WINDOW); }
ASTPtr orderBy() const { return getExpression(Expression::ORDER_BY); }
ASTPtr limitByOffset() const { return getExpression(Expression::LIMIT_BY_OFFSET); }
ASTPtr limitByLength() const { return getExpression(Expression::LIMIT_BY_LENGTH); }
ASTPtr limitBy() const { return getExpression(Expression::LIMIT_BY); }
ASTPtr limitOffset() const { return getExpression(Expression::LIMIT_OFFSET); }
ASTPtr limitLength() const { return getExpression(Expression::LIMIT_LENGTH); }
ASTPtr settings() const { return getExpression(Expression::SETTINGS); }
ASTPtr interpolate() const { return getExpression(Expression::INTERPOLATE); }
bool hasFiltration() const { return where() || prewhere() || having(); }

View File

@ -223,7 +223,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs index;
index.reserve(output_header.columns() + 2);
index.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag->addColumn(

View File

@ -1,7 +1,9 @@
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
@ -24,6 +26,7 @@ static ITransformingStep::Traits getTraits()
CubeStep::CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, appendGroupingSetColumn(params_->getHeader()), getTraits())
, keys_size(params_->params.keys_size)
, params(std::move(params_))
{
/// Aggregation keys are distinct
@ -31,14 +34,30 @@ CubeStep::CubeStep(const DataStream & input_stream_, AggregatingTransformParamsP
output_stream->distinct_columns.insert(params->params.src_header.getByPosition(key).name);
}
void CubeStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
ProcessorPtr addGroupingSetForTotals(const Block & header, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
{
auto dag = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
auto grouping_col = ColumnUInt64::create(1, grouping_set_number);
const auto * grouping_node = &dag->addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag->materializeNode(*grouping_node);
auto & index = dag->getIndex();
index.insert(index.begin(), grouping_node);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
return std::make_shared<ExpressionTransform>(header, expression);
}
void CubeStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return nullptr;
return addGroupingSetForTotals(header, settings, (UInt64(1) << keys_size) - 1);
return std::make_shared<CubeTransform>(header, std::move(params));
});

View File

@ -21,6 +21,7 @@ public:
const Aggregator::Params & getParams() const;
private:
size_t keys_size;
AggregatingTransformParamsPtr params;
};

View File

@ -25,20 +25,23 @@ static ITransformingStep::Traits getTraits()
RollupStep::RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, appendGroupingSetColumn(params_->getHeader()), getTraits())
, params(std::move(params_))
, keys_size(params->params.keys_size)
{
/// Aggregation keys are distinct
for (auto key : params->params.keys)
output_stream->distinct_columns.insert(params->params.src_header.getByPosition(key).name);
}
void RollupStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
ProcessorPtr addGroupingSetForTotals(const Block & header, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number);
void RollupStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return nullptr;
return addGroupingSetForTotals(header, settings, keys_size);
return std::make_shared<RollupTransform>(header, std::move(params));
});

View File

@ -20,6 +20,7 @@ public:
private:
AggregatingTransformParamsPtr params;
size_t keys_size;
};
}

View File

@ -126,3 +126,51 @@
8 6
9 5
9 6
0 0
0 1
0 1
0 2
0 3
1 2
1 3
2 2
2 3
3 2
3 3
4 2
4 3
5 2
5 3
6 2
6 3
7 2
7 3
8 2
8 3
9 2
9 3
0 0
0 0
0 2
0 3
1 2
1 3
2 2
2 3
3 2
3 3
4 2
4 3
5 2
5 3
6 2
6 3
7 2
7 3
8 2
8 3
9 2
9 3
0 0

View File

@ -75,3 +75,41 @@ GROUP BY
HAVING grouping(number) != 0
ORDER BY
number, gr;
SELECT
number,
grouping(number, number % 2) as gr
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
CUBE(number, number % 2) WITH TOTALS
HAVING grouping(number) != 0
ORDER BY
number, gr; -- { serverError NOT_IMPLEMENTED }
SELECT
number,
grouping(number, number % 2) as gr
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
CUBE(number, number % 2) WITH TOTALS
ORDER BY
number, gr;
SELECT
number,
grouping(number, number % 2) as gr
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
ROLLUP(number, number % 2) WITH TOTALS
HAVING grouping(number) != 0
ORDER BY
number, gr; -- { serverError NOT_IMPLEMENTED }
SELECT
number,
grouping(number, number % 2) as gr
FROM remote('127.0.0.{2,3}', numbers(10))
GROUP BY
ROLLUP(number, number % 2) WITH TOTALS
ORDER BY
number, gr;