Commit support use_nulls for GS

This commit is contained in:
Dmitry Novik 2022-06-30 15:14:26 +00:00
parent 98e9bc84d5
commit 33f601ec0a
5 changed files with 99 additions and 13 deletions

View File

@ -2295,6 +2295,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
merge_threads,
temporary_data_merge_threads,
storage_has_evenly_distributed_read,
settings.group_by_use_nulls,
std::move(group_by_info),
std::move(group_by_sort_description));
query_plan.addStep(std::move(aggregating_step));

View File

@ -16,6 +16,8 @@
#include <Columns/ColumnFixedString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Poco/Logger.h>
#include "Common/logger_useful.h"
#include "Core/ColumnNumbers.h"
#include "DataTypes/IDataType.h"
@ -49,12 +51,11 @@ Block appendGroupingSetColumn(Block header)
return res;
}
Block generateOutputHeader(const Block & input_header)
static inline void convertToNullable(Block & header, const ColumnNumbers & keys)
{
auto header = appendGroupingSetColumn(input_header);
for (size_t i = 1; i < header.columns(); ++i)
for (auto key : keys)
{
auto & column = header.getByPosition(i);
auto & column = header.getByPosition(key);
if (!isAggregateFunction(column.type))
{
@ -62,7 +63,6 @@ Block generateOutputHeader(const Block & input_header)
column.column = makeNullable(column.column);
}
}
return header;
}
Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys, bool use_nulls)
@ -84,12 +84,12 @@ Block generateOutputHeader(const Block & input_header, const ColumnNumbers & key
return header;
}
static Block appendGroupingColumn(Block block, const GroupingSetsParamsList & params)
static Block appendGroupingColumn(Block block, const ColumnNumbers & keys, const GroupingSetsParamsList & params, bool use_nulls)
{
if (params.empty())
return block;
return generateOutputHeader(block);
return generateOutputHeader(block, keys, use_nulls);
}
AggregatingStep::AggregatingStep(
@ -102,9 +102,10 @@ AggregatingStep::AggregatingStep(
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool storage_has_evenly_distributed_read_,
bool group_by_use_nulls_,
InputOrderInfoPtr group_by_info_,
SortDescription group_by_sort_description_)
: ITransformingStep(input_stream_, appendGroupingColumn(params_.getHeader(final_), grouping_sets_params_), getTraits(), false)
: ITransformingStep(input_stream_, appendGroupingColumn(params_.getHeader(final_), params_.keys, grouping_sets_params_, group_by_use_nulls_), getTraits(), false)
, params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
, final(std::move(final_))
@ -113,6 +114,7 @@ AggregatingStep::AggregatingStep(
, merge_threads(merge_threads_)
, temporary_data_merge_threads(temporary_data_merge_threads_)
, storage_has_evenly_distributed_read(storage_has_evenly_distributed_read_)
, group_by_use_nulls(group_by_use_nulls_)
, group_by_info(std::move(group_by_info_))
, group_by_sort_description(std::move(group_by_sort_description_))
{
@ -243,6 +245,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
assert(ports.size() == grouping_sets_size);
auto output_header = transform_params->getHeader();
if (group_by_use_nulls)
convertToNullable(output_header, params.keys);
for (size_t set_counter = 0; set_counter < grouping_sets_size; ++set_counter)
{
@ -279,16 +283,22 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
else
{
const auto * column_node = dag->getIndex()[header.getPositionByName(col.name)];
// index.push_back(dag->getIndex()[header.getPositionByName(col.name)]);
const auto * node = &dag->addFunction(FunctionFactory::instance().get("toNullable", nullptr), { column_node }, col.name);
index.push_back(node);
if (isAggregateFunction(column_node->result_type) || !group_by_use_nulls)
{
index.push_back(column_node);
}
else
{
const auto * node = &dag->addFunction(FunctionFactory::instance().get("toNullable", nullptr), { column_node }, col.name);
index.push_back(node);
}
}
}
dag->getIndex().swap(index);
auto expression = std::make_shared<ExpressionActions>(dag, settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression);
LOG_DEBUG(&Poco::Logger::get("AggregatingStep"), "Header for GROUPING SET #{}: {}", set_counter, transform->getOutputPort().getHeader().dumpStructure());
connect(*ports[set_counter], transform->getInputPort());
processors.emplace_back(std::move(transform));

View File

@ -27,7 +27,6 @@ struct GroupingSetsParams
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
Block appendGroupingSetColumn(Block header);
Block generateOutputHeader(const Block & input_header);
Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys, bool use_nulls);
/// Aggregation. See AggregatingTransform.
@ -44,6 +43,7 @@ public:
size_t merge_threads_,
size_t temporary_data_merge_threads_,
bool storage_has_evenly_distributed_read_,
bool group_by_use_nulls_,
InputOrderInfoPtr group_by_info_,
SortDescription group_by_sort_description_);
@ -68,6 +68,7 @@ private:
size_t temporary_data_merge_threads;
bool storage_has_evenly_distributed_read;
bool group_by_use_nulls;
InputOrderInfoPtr group_by_info;
SortDescription group_by_sort_description;

View File

@ -107,3 +107,51 @@ SETTINGS group_by_use_nulls=0;
8 0 8
9 0 9
9 1 9
SELECT
number,
number % 2,
sum(number) AS val
FROM numbers(10)
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 1;
0 \N 0
1 \N 1
2 \N 2
3 \N 3
4 \N 4
5 \N 5
6 \N 6
7 \N 7
8 \N 8
9 \N 9
\N 0 20
\N 1 25
SELECT
number,
number % 2,
sum(number) AS val
FROM numbers(10)
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 0;
0 0 0
0 0 20
0 1 25
1 0 1
2 0 2
3 0 3
4 0 4
5 0 5
6 0 6
7 0 7
8 0 8
9 0 9

View File

@ -22,3 +22,29 @@ FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
SELECT
number,
number % 2,
sum(number) AS val
FROM numbers(10)
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 1;
SELECT
number,
number % 2,
sum(number) AS val
FROM numbers(10)
GROUP BY
GROUPING SETS (
(number),
(number % 2)
)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls = 0;