mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #33186 from ClickHouse/revert-26869-grouping-sets-dev
Revert "Grouping sets dev"
This commit is contained in:
commit
36608310bf
@ -8,6 +8,5 @@ namespace DB
|
||||
{
|
||||
|
||||
using ColumnNumbers = std::vector<size_t>;
|
||||
using ColumnNumbersList = std::vector<ColumnNumbers>;
|
||||
|
||||
}
|
||||
|
@ -109,8 +109,6 @@ public:
|
||||
std::optional<NameAndTypePair> tryGetByName(const std::string & name) const;
|
||||
};
|
||||
|
||||
using NamesAndTypesLists = std::vector<NamesAndTypesList>;
|
||||
|
||||
}
|
||||
|
||||
namespace std
|
||||
|
@ -183,7 +183,7 @@ public:
|
||||
ActionsDAGPtr clone() const;
|
||||
|
||||
/// Execute actions for header. Input block must have empty columns.
|
||||
/// Result should be equal to the execution of ExpressionActions built from this DAG.
|
||||
/// Result should be equal to the execution of ExpressionActions build form this DAG.
|
||||
/// Actions are not changed, no expressions are compiled.
|
||||
///
|
||||
/// In addition, check that result constants are constants according to DAG.
|
||||
|
@ -878,10 +878,9 @@ public:
|
||||
Block intermediate_header;
|
||||
|
||||
/// What to count.
|
||||
ColumnNumbers keys;
|
||||
const ColumnNumbersList keys_vector;
|
||||
const ColumnNumbers keys;
|
||||
const AggregateDescriptions aggregates;
|
||||
size_t keys_size;
|
||||
const size_t keys_size;
|
||||
const size_t aggregates_size;
|
||||
|
||||
/// The settings of approximate calculation of GROUP BY.
|
||||
@ -939,46 +938,6 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
/// two dimensional vector of aggregating keys in params
|
||||
Params(
|
||||
const Block & src_header_,
|
||||
const ColumnNumbers & keys_,
|
||||
const ColumnNumbersList & keys_vector_,
|
||||
const AggregateDescriptions & aggregates_,
|
||||
bool overflow_row_,
|
||||
size_t max_rows_to_group_by_,
|
||||
OverflowMode group_by_overflow_mode_,
|
||||
size_t group_by_two_level_threshold_,
|
||||
size_t group_by_two_level_threshold_bytes_,
|
||||
size_t max_bytes_before_external_group_by_,
|
||||
bool empty_result_for_aggregation_by_empty_set_,
|
||||
VolumePtr tmp_volume_,
|
||||
size_t max_threads_,
|
||||
size_t min_free_disk_space_,
|
||||
bool compile_aggregate_expressions_,
|
||||
size_t min_count_to_compile_aggregate_expression_,
|
||||
const Block & intermediate_header_ = {})
|
||||
: src_header(src_header_)
|
||||
, intermediate_header(intermediate_header_)
|
||||
, keys(keys_)
|
||||
, keys_vector(keys_vector_)
|
||||
, aggregates(aggregates_)
|
||||
, keys_size(keys.size())
|
||||
, aggregates_size(aggregates.size())
|
||||
, overflow_row(overflow_row_)
|
||||
, max_rows_to_group_by(max_rows_to_group_by_)
|
||||
, group_by_overflow_mode(group_by_overflow_mode_)
|
||||
, group_by_two_level_threshold(group_by_two_level_threshold_)
|
||||
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
|
||||
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
|
||||
, empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_)
|
||||
, tmp_volume(tmp_volume_)
|
||||
, max_threads(max_threads_)
|
||||
, min_free_disk_space(min_free_disk_space_)
|
||||
, compile_aggregate_expressions(compile_aggregate_expressions_)
|
||||
, min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
|
||||
{}
|
||||
|
||||
/// Only parameters that matter during merge.
|
||||
Params(const Block & intermediate_header_,
|
||||
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
|
||||
|
@ -40,7 +40,6 @@
|
||||
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include "Core/NamesAndTypes.h"
|
||||
|
||||
#include <DataTypes/DataTypeFactory.h>
|
||||
|
||||
@ -342,101 +341,44 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
|
||||
|
||||
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);
|
||||
|
||||
if (select_query->group_by_with_grouping_sets)
|
||||
const auto & column_name = group_asts[i]->getColumnName();
|
||||
|
||||
const auto * node = temp_actions->tryFindInIndex(column_name);
|
||||
if (!node)
|
||||
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
|
||||
|
||||
/// Only removes constant keys if it's an initiator or distributed_group_by_no_merge is enabled.
|
||||
if (getContext()->getClientInfo().distributed_depth == 0 || settings.distributed_group_by_no_merge > 0)
|
||||
{
|
||||
ASTs group_elements_ast;
|
||||
const ASTExpressionList * group_ast_element = group_asts[i]->as<const ASTExpressionList>();
|
||||
if (!group_ast_element)
|
||||
throw Exception("Grouping Sets element " + group_asts[i]->getColumnName() + " should be an expression type", ErrorCodes::UNKNOWN_IDENTIFIER);
|
||||
group_elements_ast = group_ast_element->children;
|
||||
|
||||
NamesAndTypesList grouping_set_list;
|
||||
|
||||
for (ssize_t j = 0; j < ssize_t(group_elements_ast.size()); ++j)
|
||||
/// Constant expressions have non-null column pointer at this stage.
|
||||
if (node->column && isColumnConst(*node->column))
|
||||
{
|
||||
ssize_t group_size = group_elements_ast.size();
|
||||
const auto & column_name = group_elements_ast[j]->getColumnName();
|
||||
const auto * node = temp_actions->tryFindInIndex(column_name);
|
||||
if (!node)
|
||||
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
|
||||
select_query->group_by_with_constant_keys = true;
|
||||
|
||||
/// Only removes constant keys if it's an initiator or distributed_group_by_no_merge is enabled.
|
||||
if (getContext()->getClientInfo().distributed_depth == 0 || settings.distributed_group_by_no_merge > 0)
|
||||
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
|
||||
if (!aggregate_descriptions.empty() || size > 1)
|
||||
{
|
||||
/// Constant expressions have non-null column pointer at this stage.
|
||||
if (node->column && isColumnConst(*node->column))
|
||||
{
|
||||
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
|
||||
if (!aggregate_descriptions.empty() || group_size > 1)
|
||||
{
|
||||
if (j + 1 < static_cast<ssize_t>(group_size))
|
||||
group_elements_ast[j] = std::move(group_elements_ast.back());
|
||||
if (i + 1 < static_cast<ssize_t>(size))
|
||||
group_asts[i] = std::move(group_asts.back());
|
||||
|
||||
group_elements_ast.pop_back();
|
||||
group_asts.pop_back();
|
||||
|
||||
--j;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
NameAndTypePair key{column_name, node->result_type};
|
||||
|
||||
grouping_set_list.push_back(key);
|
||||
|
||||
/// Aggregation keys are unique.
|
||||
if (!unique_keys.count(key.name))
|
||||
{
|
||||
unique_keys.insert(key.name);
|
||||
aggregation_keys.push_back(key);
|
||||
|
||||
/// Key is no longer needed, therefore we can save a little by moving it.
|
||||
aggregated_columns.push_back(std::move(key));
|
||||
--i;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
aggregation_keys_list.push_back(std::move(grouping_set_list));
|
||||
}
|
||||
else
|
||||
|
||||
NameAndTypePair key{column_name, node->result_type};
|
||||
|
||||
/// Aggregation keys are uniqued.
|
||||
if (!unique_keys.count(key.name))
|
||||
{
|
||||
const auto & column_name = group_asts[i]->getColumnName();
|
||||
const auto * node = temp_actions->tryFindInIndex(column_name);
|
||||
if (!node)
|
||||
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
|
||||
unique_keys.insert(key.name);
|
||||
aggregation_keys.push_back(key);
|
||||
|
||||
/// Only removes constant keys if it's an initiator or distributed_group_by_no_merge is enabled.
|
||||
if (getContext()->getClientInfo().distributed_depth == 0 || settings.distributed_group_by_no_merge > 0)
|
||||
{
|
||||
/// Constant expressions have non-null column pointer at this stage.
|
||||
if (node->column && isColumnConst(*node->column))
|
||||
{
|
||||
select_query->group_by_with_constant_keys = true;
|
||||
|
||||
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
|
||||
if (!aggregate_descriptions.empty() || size > 1)
|
||||
{
|
||||
if (i + 1 < static_cast<ssize_t>(size))
|
||||
group_asts[i] = std::move(group_asts.back());
|
||||
|
||||
group_asts.pop_back();
|
||||
|
||||
--i;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
NameAndTypePair key{column_name, node->result_type};
|
||||
|
||||
/// Aggregation keys are uniqued.
|
||||
if (!unique_keys.count(key.name))
|
||||
{
|
||||
unique_keys.insert(key.name);
|
||||
aggregation_keys.push_back(key);
|
||||
|
||||
/// Key is no longer needed, therefore we can save a little by moving it.
|
||||
aggregated_columns.push_back(std::move(key));
|
||||
}
|
||||
/// Key is no longer needed, therefore we can save a little by moving it.
|
||||
aggregated_columns.push_back(std::move(key));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1197,24 +1139,10 @@ bool SelectQueryExpressionAnalyzer::appendGroupBy(ExpressionActionsChain & chain
|
||||
ExpressionActionsChain::Step & step = chain.lastStep(columns_after_join);
|
||||
|
||||
ASTs asts = select_query->groupBy()->children;
|
||||
if (select_query->group_by_with_grouping_sets)
|
||||
for (const auto & ast : asts)
|
||||
{
|
||||
for (const auto & ast : asts)
|
||||
{
|
||||
for (const auto & ast_element : ast->children)
|
||||
{
|
||||
step.addRequiredOutput(ast_element->getColumnName());
|
||||
getRootActions(ast_element, only_types, step.actions());
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & ast : asts)
|
||||
{
|
||||
step.addRequiredOutput(ast->getColumnName());
|
||||
getRootActions(ast, only_types, step.actions());
|
||||
}
|
||||
step.addRequiredOutput(ast->getColumnName());
|
||||
getRootActions(ast, only_types, step.actions());
|
||||
}
|
||||
|
||||
if (optimize_aggregation_in_order)
|
||||
|
@ -64,7 +64,6 @@ struct ExpressionAnalyzerData
|
||||
|
||||
bool has_aggregation = false;
|
||||
NamesAndTypesList aggregation_keys;
|
||||
NamesAndTypesLists aggregation_keys_list;
|
||||
bool has_const_aggregation_keys = false;
|
||||
AggregateDescriptions aggregate_descriptions;
|
||||
|
||||
@ -95,8 +94,6 @@ private:
|
||||
explicit ExtractedSettings(const Settings & settings_);
|
||||
};
|
||||
|
||||
Poco::Logger * poco_log = &Poco::Logger::get("ExpressionAnalyzer");
|
||||
|
||||
public:
|
||||
/// Ctor for non-select queries. Generally its usage is:
|
||||
/// auto actions = ExpressionAnalyzer(query, syntax, context).getActions();
|
||||
@ -324,7 +321,6 @@ public:
|
||||
|
||||
const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
|
||||
bool hasConstAggregationKeys() const { return has_const_aggregation_keys; }
|
||||
const NamesAndTypesLists & aggregationKeysList() const { return aggregation_keys_list; }
|
||||
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }
|
||||
|
||||
const PreparedSets & getPreparedSets() const { return prepared_sets; }
|
||||
|
@ -44,7 +44,6 @@
|
||||
#include <Processors/QueryPlan/ExtremesStep.h>
|
||||
#include <Processors/QueryPlan/FillingStep.h>
|
||||
#include <Processors/QueryPlan/FilterStep.h>
|
||||
#include <Processors/QueryPlan/GroupingSetsStep.h>
|
||||
#include <Processors/QueryPlan/JoinStep.h>
|
||||
#include <Processors/QueryPlan/LimitByStep.h>
|
||||
#include <Processors/QueryPlan/LimitStep.h>
|
||||
@ -961,10 +960,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
bool aggregate_final =
|
||||
expressions.need_aggregate &&
|
||||
options.to_stage > QueryProcessingStage::WithMergeableState &&
|
||||
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube && !query.group_by_with_grouping_sets;
|
||||
|
||||
// if (query.group_by_with_grouping_sets && query.group_by_with_totals)
|
||||
// throw Exception("WITH TOTALS and GROUPING SETS are not supported together", ErrorCodes::NOT_IMPLEMENTED);
|
||||
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
|
||||
|
||||
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
|
||||
{
|
||||
@ -1183,7 +1179,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
{
|
||||
executeAggregation(
|
||||
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
|
||||
|
||||
/// We need to reset input order info, so that executeOrder can't use it
|
||||
query_info.input_order_info.reset();
|
||||
}
|
||||
@ -1247,7 +1242,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
{
|
||||
if (query.group_by_with_totals)
|
||||
{
|
||||
bool final = !query.group_by_with_rollup && !query.group_by_with_cube && !query.group_by_with_grouping_sets;
|
||||
bool final = !query.group_by_with_rollup && !query.group_by_with_cube;
|
||||
executeTotalsAndHaving(
|
||||
query_plan, expressions.hasHaving(), expressions.before_having, expressions.remove_having_filter, aggregate_overflow_row, final);
|
||||
}
|
||||
@ -1256,21 +1251,21 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
executeRollupOrCube(query_plan, Modificator::ROLLUP);
|
||||
else if (query.group_by_with_cube)
|
||||
executeRollupOrCube(query_plan, Modificator::CUBE);
|
||||
else if (query.group_by_with_grouping_sets)
|
||||
executeRollupOrCube(query_plan, Modificator::GROUPING_SETS);
|
||||
|
||||
if ((query.group_by_with_rollup || query.group_by_with_cube || query.group_by_with_grouping_sets) && expressions.hasHaving())
|
||||
if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.hasHaving())
|
||||
{
|
||||
if (query.group_by_with_totals)
|
||||
throw Exception("WITH TOTALS and WITH ROLLUP or CUBE or GROUPING SETS are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED);
|
||||
throw Exception(
|
||||
"WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING",
|
||||
ErrorCodes::NOT_IMPLEMENTED);
|
||||
executeHaving(query_plan, expressions.before_having, expressions.remove_having_filter);
|
||||
}
|
||||
}
|
||||
else if (expressions.hasHaving())
|
||||
executeHaving(query_plan, expressions.before_having, expressions.remove_having_filter);
|
||||
}
|
||||
else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube || query.group_by_with_grouping_sets)
|
||||
throw Exception("WITH TOTALS, ROLLUP, CUBE or GROUPING SETS are not supported without aggregation", ErrorCodes::NOT_IMPLEMENTED);
|
||||
else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube)
|
||||
throw Exception("WITH TOTALS, ROLLUP or CUBE are not supported without aggregation", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
// Now we must execute:
|
||||
// 1) expressions before window functions,
|
||||
@ -2034,6 +2029,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter)
|
||||
{
|
||||
auto where_step = std::make_unique<FilterStep>(
|
||||
@ -2043,79 +2039,6 @@ void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsD
|
||||
query_plan.addStep(std::move(where_step));
|
||||
}
|
||||
|
||||
void InterpreterSelectQuery::initAggregatorParams(
|
||||
const Block & current_data_stream_header,
|
||||
AggregatorParamsPtr & params_ptr,
|
||||
const AggregateDescriptions & aggregates,
|
||||
bool overflow_row, const Settings & settings,
|
||||
size_t group_by_two_level_threshold, size_t group_by_two_level_threshold_bytes)
|
||||
{
|
||||
auto & query = getSelectQuery();
|
||||
if (query.group_by_with_grouping_sets)
|
||||
{
|
||||
ColumnNumbers keys;
|
||||
ColumnNumbers all_keys;
|
||||
ColumnNumbersList keys_vector;
|
||||
std::unordered_set<size_t> keys_set;
|
||||
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
|
||||
{
|
||||
keys.clear();
|
||||
for (const auto & key : aggregation_keys)
|
||||
{
|
||||
size_t key_name_pos = current_data_stream_header.getPositionByName(key.name);
|
||||
keys_set.insert(key_name_pos);
|
||||
keys.push_back(key_name_pos);
|
||||
}
|
||||
keys_vector.push_back(keys);
|
||||
}
|
||||
all_keys.assign(keys_set.begin(), keys_set.end());
|
||||
|
||||
params_ptr = std::make_unique<Aggregator::Params>(
|
||||
current_data_stream_header,
|
||||
all_keys,
|
||||
keys_vector,
|
||||
aggregates,
|
||||
overflow_row,
|
||||
settings.max_rows_to_group_by,
|
||||
settings.group_by_overflow_mode,
|
||||
group_by_two_level_threshold,
|
||||
group_by_two_level_threshold_bytes,
|
||||
settings.max_bytes_before_external_group_by,
|
||||
settings.empty_result_for_aggregation_by_empty_set
|
||||
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
|
||||
&& query_analyzer->hasConstAggregationKeys()),
|
||||
context->getTemporaryVolume(),
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
settings.compile_aggregate_expressions,
|
||||
settings.min_count_to_compile_aggregate_expression);
|
||||
}
|
||||
else
|
||||
{
|
||||
ColumnNumbers keys;
|
||||
for (const auto & key : query_analyzer->aggregationKeys())
|
||||
keys.push_back(current_data_stream_header.getPositionByName(key.name));
|
||||
|
||||
params_ptr = std::make_unique<Aggregator::Params>(
|
||||
current_data_stream_header,
|
||||
keys,
|
||||
aggregates,
|
||||
overflow_row,
|
||||
settings.max_rows_to_group_by,
|
||||
settings.group_by_overflow_mode,
|
||||
group_by_two_level_threshold,
|
||||
group_by_two_level_threshold_bytes,
|
||||
settings.max_bytes_before_external_group_by,
|
||||
settings.empty_result_for_aggregation_by_empty_set
|
||||
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
|
||||
&& query_analyzer->hasConstAggregationKeys()),
|
||||
context->getTemporaryVolume(),
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
settings.compile_aggregate_expressions,
|
||||
settings.min_count_to_compile_aggregate_expression);
|
||||
}
|
||||
}
|
||||
|
||||
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
|
||||
{
|
||||
@ -2127,6 +2050,9 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
return;
|
||||
|
||||
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
|
||||
ColumnNumbers keys;
|
||||
for (const auto & key : query_analyzer->aggregationKeys())
|
||||
keys.push_back(header_before_aggregation.getPositionByName(key.name));
|
||||
|
||||
AggregateDescriptions aggregates = query_analyzer->aggregates();
|
||||
for (auto & descr : aggregates)
|
||||
@ -2136,9 +2062,24 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
|
||||
AggregatorParamsPtr params_ptr;
|
||||
initAggregatorParams(header_before_aggregation, params_ptr, aggregates, overflow_row, settings,
|
||||
settings.group_by_two_level_threshold, settings.group_by_two_level_threshold_bytes);
|
||||
Aggregator::Params params(
|
||||
header_before_aggregation,
|
||||
keys,
|
||||
aggregates,
|
||||
overflow_row,
|
||||
settings.max_rows_to_group_by,
|
||||
settings.group_by_overflow_mode,
|
||||
settings.group_by_two_level_threshold,
|
||||
settings.group_by_two_level_threshold_bytes,
|
||||
settings.max_bytes_before_external_group_by,
|
||||
settings.empty_result_for_aggregation_by_empty_set
|
||||
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
|
||||
&& query_analyzer->hasConstAggregationKeys()),
|
||||
context->getTemporaryVolume(),
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
settings.compile_aggregate_expressions,
|
||||
settings.min_count_to_compile_aggregate_expression);
|
||||
|
||||
SortDescription group_by_sort_description;
|
||||
|
||||
@ -2156,7 +2097,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
|
||||
auto aggregating_step = std::make_unique<AggregatingStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
*params_ptr,
|
||||
params,
|
||||
final,
|
||||
settings.max_block_size,
|
||||
settings.aggregation_in_order_max_block_bytes,
|
||||
@ -2165,6 +2106,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
storage_has_evenly_distributed_read,
|
||||
std::move(group_by_info),
|
||||
std::move(group_by_sort_description));
|
||||
|
||||
query_plan.addStep(std::move(aggregating_step));
|
||||
}
|
||||
|
||||
@ -2216,27 +2158,47 @@ void InterpreterSelectQuery::executeTotalsAndHaving(
|
||||
query_plan.addStep(std::move(totals_having_step));
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modificator modificator)
|
||||
{
|
||||
const auto & header_before_transform = query_plan.getCurrentDataStream().header;
|
||||
|
||||
ColumnNumbers keys;
|
||||
|
||||
for (const auto & key : query_analyzer->aggregationKeys())
|
||||
keys.push_back(header_before_transform.getPositionByName(key.name));
|
||||
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
|
||||
AggregatorParamsPtr params_ptr;
|
||||
initAggregatorParams(header_before_transform, params_ptr, query_analyzer->aggregates(), false, settings, 0, 0);
|
||||
auto transform_params = std::make_shared<AggregatingTransformParams>(*params_ptr, true);
|
||||
Aggregator::Params params(
|
||||
header_before_transform,
|
||||
keys,
|
||||
query_analyzer->aggregates(),
|
||||
false,
|
||||
settings.max_rows_to_group_by,
|
||||
settings.group_by_overflow_mode,
|
||||
0,
|
||||
0,
|
||||
settings.max_bytes_before_external_group_by,
|
||||
settings.empty_result_for_aggregation_by_empty_set,
|
||||
context->getTemporaryVolume(),
|
||||
settings.max_threads,
|
||||
settings.min_free_disk_space_for_temporary_data,
|
||||
settings.compile_aggregate_expressions,
|
||||
settings.min_count_to_compile_aggregate_expression);
|
||||
|
||||
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
|
||||
|
||||
QueryPlanStepPtr step;
|
||||
if (modificator == Modificator::ROLLUP)
|
||||
step = std::make_unique<RollupStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
|
||||
else if (modificator == Modificator::CUBE)
|
||||
step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
|
||||
else
|
||||
step = std::make_unique<GroupingSetsStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
|
||||
step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
|
||||
|
||||
query_plan.addStep(std::move(step));
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description)
|
||||
{
|
||||
if (!expression)
|
||||
|
@ -14,7 +14,6 @@
|
||||
#include <Storages/TableLockHolder.h>
|
||||
|
||||
#include <Columns/FilterDescription.h>
|
||||
#include "Interpreters/ActionsDAG.h"
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
@ -30,7 +29,6 @@ class QueryPlan;
|
||||
|
||||
struct TreeRewriterResult;
|
||||
using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
|
||||
using AggregatorParamsPtr = std::unique_ptr<Aggregator::Params>;
|
||||
|
||||
|
||||
/** Interprets the SELECT query. Returns the stream of blocks with the results of the query before `to_stage` stage.
|
||||
@ -128,12 +126,6 @@ private:
|
||||
|
||||
/// Different stages of query execution.
|
||||
|
||||
void initAggregatorParams(
|
||||
const Block & current_data_stream_header,
|
||||
AggregatorParamsPtr & params_ptr,
|
||||
const AggregateDescriptions & aggregates,
|
||||
bool overflow_row, const Settings & settings,
|
||||
size_t group_by_two_level_threshold, size_t group_by_two_level_threshold_bytes);
|
||||
void executeFetchColumns(QueryProcessingStage::Enum processing_stage, QueryPlan & query_plan);
|
||||
void executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter);
|
||||
void executeAggregation(
|
||||
@ -164,8 +156,7 @@ private:
|
||||
enum class Modificator
|
||||
{
|
||||
ROLLUP = 0,
|
||||
CUBE = 1,
|
||||
GROUPING_SETS = 2
|
||||
CUBE = 1
|
||||
};
|
||||
|
||||
void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator);
|
||||
|
@ -201,22 +201,10 @@ GroupByKeysInfo getGroupByKeysInfo(const ASTs & group_by_keys)
|
||||
/// filling set with short names of keys
|
||||
for (const auto & group_key : group_by_keys)
|
||||
{
|
||||
/// for grouping sets case
|
||||
if (group_key->as<ASTExpressionList>())
|
||||
{
|
||||
const auto express_list_ast = group_key->as<const ASTExpressionList &>();
|
||||
for (const auto & group_elem : express_list_ast.children)
|
||||
{
|
||||
data.key_names.insert(group_elem->getColumnName());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (group_key->as<ASTFunction>())
|
||||
data.has_function = true;
|
||||
if (group_key->as<ASTFunction>())
|
||||
data.has_function = true;
|
||||
|
||||
data.key_names.insert(group_key->getColumnName());
|
||||
}
|
||||
data.key_names.insert(group_key->getColumnName());
|
||||
}
|
||||
|
||||
return data;
|
||||
|
@ -26,15 +26,7 @@ void ASTExpressionList::formatImpl(const FormatSettings & settings, FormatState
|
||||
settings.ostr << ' ';
|
||||
}
|
||||
|
||||
if (frame.surround_each_list_element_with_parens)
|
||||
settings.ostr << "(";
|
||||
|
||||
FormatStateStacked frame_nested = frame;
|
||||
frame_nested.surround_each_list_element_with_parens = false;
|
||||
(*it)->formatImpl(settings, state, frame_nested);
|
||||
|
||||
if (frame.surround_each_list_element_with_parens)
|
||||
settings.ostr << ")";
|
||||
(*it)->formatImpl(settings, state, frame);
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,7 +41,6 @@ void ASTExpressionList::formatImplMultiline(const FormatSettings & settings, For
|
||||
}
|
||||
|
||||
++frame.indent;
|
||||
|
||||
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
|
||||
{
|
||||
if (it != children.begin())
|
||||
@ -63,15 +54,7 @@ void ASTExpressionList::formatImplMultiline(const FormatSettings & settings, For
|
||||
|
||||
FormatStateStacked frame_nested = frame;
|
||||
frame_nested.expression_list_always_start_on_new_line = false;
|
||||
frame_nested.surround_each_list_element_with_parens = false;
|
||||
|
||||
if (frame.surround_each_list_element_with_parens)
|
||||
settings.ostr << "(";
|
||||
|
||||
(*it)->formatImpl(settings, state, frame_nested);
|
||||
|
||||
if (frame.surround_each_list_element_with_parens)
|
||||
settings.ostr << ")";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,12 +114,9 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
|
||||
if (groupBy())
|
||||
{
|
||||
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "GROUP BY" << (s.hilite ? hilite_none : "");
|
||||
if (!group_by_with_grouping_sets)
|
||||
{
|
||||
s.one_line
|
||||
s.one_line
|
||||
? groupBy()->formatImpl(s, state, frame)
|
||||
: groupBy()->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);
|
||||
}
|
||||
}
|
||||
|
||||
if (group_by_with_rollup)
|
||||
@ -128,18 +125,6 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
|
||||
if (group_by_with_cube)
|
||||
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH CUBE" << (s.hilite ? hilite_none : "");
|
||||
|
||||
if (group_by_with_grouping_sets)
|
||||
{
|
||||
frame.surround_each_list_element_with_parens = true;
|
||||
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "GROUPING SETS" << (s.hilite ? hilite_none : "");
|
||||
s.ostr << " (";
|
||||
s.one_line
|
||||
? groupBy()->formatImpl(s, state, frame)
|
||||
: groupBy()->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);
|
||||
s.ostr << ")";
|
||||
frame.surround_each_list_element_with_parens = false;
|
||||
}
|
||||
|
||||
if (group_by_with_totals)
|
||||
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << (s.one_line ? "" : " ") << "WITH TOTALS" << (s.hilite ? hilite_none : "");
|
||||
|
||||
|
@ -83,7 +83,6 @@ public:
|
||||
bool group_by_with_rollup = false;
|
||||
bool group_by_with_cube = false;
|
||||
bool group_by_with_constant_keys = false;
|
||||
bool group_by_with_grouping_sets = false;
|
||||
bool limit_with_ties = false;
|
||||
|
||||
ASTPtr & refSelect() { return getExpression(Expression::SELECT); }
|
||||
|
@ -749,61 +749,13 @@ bool ParserNotEmptyExpressionList::parseImpl(Pos & pos, ASTPtr & node, Expected
|
||||
return nested_parser.parse(pos, node, expected) && !node->children.empty();
|
||||
}
|
||||
|
||||
|
||||
bool ParserOrderByExpressionList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
return ParserList(std::make_unique<ParserOrderByElement>(), std::make_unique<ParserToken>(TokenType::Comma), false)
|
||||
.parse(pos, node, expected);
|
||||
}
|
||||
|
||||
bool ParserGroupingSetsExpressionListElements::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
auto command_list = std::make_shared<ASTExpressionList>();
|
||||
node = command_list;
|
||||
|
||||
ParserToken s_comma(TokenType::Comma);
|
||||
ParserToken s_open(TokenType::OpeningRoundBracket);
|
||||
ParserToken s_close(TokenType::ClosingRoundBracket);
|
||||
ParserExpressionWithOptionalAlias p_expression(false);
|
||||
ParserList p_command(std::make_unique<ParserExpressionWithOptionalAlias>(false),
|
||||
std::make_unique<ParserToken>(TokenType::Comma), true);
|
||||
|
||||
do
|
||||
{
|
||||
Pos begin = pos;
|
||||
ASTPtr command;
|
||||
if (!s_open.ignore(pos, expected))
|
||||
{
|
||||
pos = begin;
|
||||
if (!p_expression.parse(pos, command, expected))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
auto list = std::make_shared<ASTExpressionList>(',');
|
||||
list->children.push_back(command);
|
||||
command = std::move(list);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!p_command.parse(pos, command, expected))
|
||||
return false;
|
||||
|
||||
if (!s_close.ignore(pos, expected))
|
||||
break;
|
||||
}
|
||||
|
||||
command_list->children.push_back(command);
|
||||
}
|
||||
while (s_comma.ignore(pos, expected));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ParserGroupingSetsExpressionList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
ParserGroupingSetsExpressionListElements grouping_sets_elements;
|
||||
return grouping_sets_elements.parse(pos, node, expected);
|
||||
|
||||
}
|
||||
|
||||
bool ParserTTLExpressionList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
{
|
||||
|
@ -511,20 +511,6 @@ protected:
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
||||
};
|
||||
|
||||
class ParserGroupingSetsExpressionList : public IParserBase
|
||||
{
|
||||
protected:
|
||||
const char * getName() const override { return "grouping sets expression"; }
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
||||
};
|
||||
|
||||
class ParserGroupingSetsExpressionListElements : public IParserBase
|
||||
{
|
||||
protected:
|
||||
const char * getName() const override { return "grouping sets expression elements"; }
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
|
||||
};
|
||||
|
||||
|
||||
/// Parser for key-value pair, where value can be list of pairs.
|
||||
class ParserKeyValuePair : public IParserBase
|
||||
|
@ -224,7 +224,6 @@ public:
|
||||
bool need_parens = false;
|
||||
bool expression_list_always_start_on_new_line = false; /// Line feed and indent before expression list even if it's of single element.
|
||||
bool expression_list_prepend_whitespace = false; /// Prepend whitespace (if it is required)
|
||||
bool surround_each_list_element_with_parens = false;
|
||||
const IAST * current_select = nullptr;
|
||||
};
|
||||
|
||||
|
@ -50,7 +50,6 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
ParserKeyword s_by("BY");
|
||||
ParserKeyword s_rollup("ROLLUP");
|
||||
ParserKeyword s_cube("CUBE");
|
||||
ParserKeyword s_grouping_sets("GROUPING SETS");
|
||||
ParserKeyword s_top("TOP");
|
||||
ParserKeyword s_with_ties("WITH TIES");
|
||||
ParserKeyword s_offset("OFFSET");
|
||||
@ -66,7 +65,6 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
ParserNotEmptyExpressionList exp_list_for_select_clause(true); /// Allows aliases without AS keyword.
|
||||
ParserExpressionWithOptionalAlias exp_elem(false);
|
||||
ParserOrderByExpressionList order_list;
|
||||
ParserGroupingSetsExpressionList grouping_sets_list;
|
||||
|
||||
ParserToken open_bracket(TokenType::OpeningRoundBracket);
|
||||
ParserToken close_bracket(TokenType::ClosingRoundBracket);
|
||||
@ -186,39 +184,24 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
select_query->group_by_with_rollup = true;
|
||||
else if (s_cube.ignore(pos, expected))
|
||||
select_query->group_by_with_cube = true;
|
||||
else if (s_grouping_sets.ignore(pos, expected))
|
||||
select_query->group_by_with_grouping_sets = true;
|
||||
|
||||
if ((select_query->group_by_with_rollup || select_query->group_by_with_cube || select_query->group_by_with_grouping_sets) &&
|
||||
!open_bracket.ignore(pos, expected))
|
||||
if ((select_query->group_by_with_rollup || select_query->group_by_with_cube) && !open_bracket.ignore(pos, expected))
|
||||
return false;
|
||||
|
||||
if (select_query->group_by_with_grouping_sets)
|
||||
{
|
||||
if (!grouping_sets_list.parse(pos, group_expression_list, expected))
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!exp_list.parse(pos, group_expression_list, expected))
|
||||
return false;
|
||||
}
|
||||
if (!exp_list.parse(pos, group_expression_list, expected))
|
||||
return false;
|
||||
|
||||
|
||||
if ((select_query->group_by_with_rollup || select_query->group_by_with_cube || select_query->group_by_with_grouping_sets) &&
|
||||
!close_bracket.ignore(pos, expected))
|
||||
if ((select_query->group_by_with_rollup || select_query->group_by_with_cube) && !close_bracket.ignore(pos, expected))
|
||||
return false;
|
||||
}
|
||||
|
||||
/// WITH ROLLUP, CUBE, GROUPING SETS or TOTALS
|
||||
/// WITH ROLLUP, CUBE or TOTALS
|
||||
if (s_with.ignore(pos, expected))
|
||||
{
|
||||
if (s_rollup.ignore(pos, expected))
|
||||
select_query->group_by_with_rollup = true;
|
||||
else if (s_cube.ignore(pos, expected))
|
||||
select_query->group_by_with_cube = true;
|
||||
else if (s_grouping_sets.ignore(pos, expected))
|
||||
select_query->group_by_with_grouping_sets = true;
|
||||
else if (s_totals.ignore(pos, expected))
|
||||
select_query->group_by_with_totals = true;
|
||||
else
|
||||
|
@ -56,8 +56,6 @@ private:
|
||||
|
||||
Processors aggregating;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("AggregatingStep");
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,46 +0,0 @@
|
||||
#include <Processors/QueryPlan/GroupingSetsStep.h>
|
||||
#include <Processors/Transforms/GroupingSetsTransform.h>
|
||||
#include "QueryPipeline/QueryPipelineBuilder.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
static ITransformingStep::Traits getTraits()
|
||||
{
|
||||
return ITransformingStep::Traits
|
||||
{
|
||||
{
|
||||
.preserves_distinct_columns = false,
|
||||
.returns_single_stream = true,
|
||||
.preserves_number_of_streams = false,
|
||||
.preserves_sorting = false,
|
||||
},
|
||||
{
|
||||
.preserves_number_of_rows = false,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
GroupingSetsStep::GroupingSetsStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
|
||||
: ITransformingStep(input_stream_, params_->getHeader(), getTraits())
|
||||
, params(std::move(params_))
|
||||
{
|
||||
/// Aggregation keys are distinct
|
||||
for (auto key : params->params.keys)
|
||||
output_stream->distinct_columns.insert(params->params.src_header.getByPosition(key).name);
|
||||
}
|
||||
|
||||
void GroupingSetsStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
pipeline.resize(1);
|
||||
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
|
||||
{
|
||||
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
|
||||
return nullptr;
|
||||
|
||||
return std::make_shared<GroupingSetsTransform>(header, std::move(params));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
#pragma once
|
||||
#include <Processors/QueryPlan/ITransformingStep.h>
|
||||
#include <QueryPipeline/SizeLimits.h>
|
||||
#include "QueryPipeline/QueryPipelineBuilder.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct AggregatingTransformParams;
|
||||
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
|
||||
|
||||
class GroupingSetsStep : public ITransformingStep
|
||||
{
|
||||
public:
|
||||
GroupingSetsStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_);
|
||||
|
||||
String getName() const override { return "GroupingSets"; }
|
||||
|
||||
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
private:
|
||||
AggregatingTransformParamsPtr params;
|
||||
};
|
||||
|
||||
}
|
@ -1,76 +0,0 @@
|
||||
#include <Processors/Transforms/GroupingSetsTransform.h>
|
||||
#include <Processors/Transforms/TotalsHavingTransform.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
GroupingSetsTransform::GroupingSetsTransform(Block header, AggregatingTransformParamsPtr params_)
|
||||
: IAccumulatingTransform(std::move(header), params_->getHeader())
|
||||
, params(std::move(params_))
|
||||
, keys(params->params.keys)
|
||||
, keys_vector(params->params.keys_vector)
|
||||
, keys_vector_idx(0)
|
||||
{
|
||||
}
|
||||
|
||||
void GroupingSetsTransform::consume(Chunk chunk)
|
||||
{
|
||||
consumed_chunks.emplace_back(std::move(chunk));
|
||||
}
|
||||
|
||||
Chunk GroupingSetsTransform::merge(Chunks && chunks, bool final)
|
||||
{
|
||||
BlocksList grouping_sets_blocks;
|
||||
for (auto & chunk : chunks)
|
||||
grouping_sets_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
|
||||
|
||||
auto grouping_sets_block = params->aggregator.mergeBlocks(grouping_sets_blocks, final);
|
||||
auto num_rows = grouping_sets_block.rows();
|
||||
return Chunk(grouping_sets_block.getColumns(), num_rows);
|
||||
}
|
||||
|
||||
Chunk GroupingSetsTransform::generate()
|
||||
{
|
||||
if (!consumed_chunks.empty())
|
||||
{
|
||||
if (consumed_chunks.size() > 1)
|
||||
grouping_sets_chunk = merge(std::move(consumed_chunks), false);
|
||||
else
|
||||
grouping_sets_chunk = std::move(consumed_chunks.front());
|
||||
|
||||
consumed_chunks.clear();
|
||||
|
||||
auto num_rows = grouping_sets_chunk.getNumRows();
|
||||
|
||||
current_columns = grouping_sets_chunk.getColumns();
|
||||
current_zero_columns.clear();
|
||||
|
||||
for (auto key : keys)
|
||||
current_zero_columns.emplace(key, current_columns[key]->cloneEmpty()->cloneResized(num_rows));
|
||||
}
|
||||
|
||||
Chunk gen_chunk;
|
||||
|
||||
if (!current_columns.empty() && keys_vector_idx < keys_vector.size())
|
||||
{
|
||||
auto columns = current_columns;
|
||||
std::set<size_t> key_vector(keys_vector[keys_vector_idx].begin(), keys_vector[keys_vector_idx].end());
|
||||
|
||||
for (auto key : keys)
|
||||
{
|
||||
if (!key_vector.contains(key))
|
||||
columns[key] = current_zero_columns[key];
|
||||
}
|
||||
|
||||
Chunks chunks;
|
||||
chunks.emplace_back(std::move(columns), current_columns.front()->size());
|
||||
gen_chunk = merge(std::move(chunks), false);
|
||||
|
||||
++keys_vector_idx;
|
||||
}
|
||||
|
||||
finalizeChunk(gen_chunk);
|
||||
return gen_chunk;
|
||||
}
|
||||
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
#pragma once
|
||||
#include <Processors/IAccumulatingTransform.h>
|
||||
#include <Processors/Transforms/AggregatingTransform.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class GroupingSetsTransform : public IAccumulatingTransform
|
||||
{
|
||||
public:
|
||||
GroupingSetsTransform(Block header, AggregatingTransformParamsPtr params);
|
||||
String getName() const override { return "GroupingSetsTransform"; }
|
||||
|
||||
protected:
|
||||
void consume(Chunk chunk) override;
|
||||
Chunk generate() override;
|
||||
|
||||
private:
|
||||
AggregatingTransformParamsPtr params;
|
||||
const ColumnNumbers & keys;
|
||||
const ColumnNumbersList & keys_vector;
|
||||
|
||||
Chunks consumed_chunks;
|
||||
Chunk grouping_sets_chunk;
|
||||
Columns current_columns;
|
||||
std::unordered_map<size_t, ColumnPtr> current_zero_columns;
|
||||
|
||||
UInt64 keys_vector_idx = 0;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("GroupingSetsTransform");
|
||||
|
||||
Chunk merge(Chunks && chunks, bool final);
|
||||
};
|
||||
|
||||
}
|
@ -497,7 +497,7 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
|
||||
for (size_t i = 1; i < output_ports.size(); ++i)
|
||||
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes");
|
||||
|
||||
// Temporarily skip this check. TotalsHavingTransform may return finalized totals but not finalized data.
|
||||
// Temporarily skip this check. TotaslHavingTransform may return finalized totals but not finalized data.
|
||||
// if (totals_port)
|
||||
// assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
|
||||
|
||||
|
@ -4,7 +4,6 @@
|
||||
#include <QueryPipeline/PipelineResourcesHolder.h>
|
||||
#include <QueryPipeline/Chain.h>
|
||||
#include <QueryPipeline/SizeLimits.h>
|
||||
#include <base/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -137,8 +136,6 @@ private:
|
||||
/// It is needed for debug. See QueryPipelineProcessorsCollector.
|
||||
Processors * collected_processors = nullptr;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("Pipe");
|
||||
|
||||
/// This methods are for QueryPipeline. It is allowed to complete graph only there.
|
||||
/// So, we may be sure that Pipe always has output port if not empty.
|
||||
bool isCompleted() const { return !empty() && output_ports.empty(); }
|
||||
|
@ -1,89 +0,0 @@
|
||||
1 0 1 4500
|
||||
1 0 3 4700
|
||||
1 0 5 4900
|
||||
1 0 7 5100
|
||||
1 0 9 5300
|
||||
1 1 0 4500
|
||||
1 2 0 5100
|
||||
1 3 0 4700
|
||||
1 4 0 5300
|
||||
1 5 0 4900
|
||||
2 0 2 4600
|
||||
2 0 4 4800
|
||||
2 0 6 5000
|
||||
2 0 8 5200
|
||||
2 0 10 5400
|
||||
2 1 0 5000
|
||||
2 2 0 4600
|
||||
2 3 0 5200
|
||||
2 4 0 4800
|
||||
2 5 0 5400
|
||||
0 0 1 1 4500
|
||||
0 0 2 2 4600
|
||||
0 0 3 3 4700
|
||||
0 0 4 4 4800
|
||||
0 0 5 5 4900
|
||||
0 0 6 6 5000
|
||||
0 0 7 7 5100
|
||||
0 0 8 8 5200
|
||||
0 0 9 9 5300
|
||||
0 0 10 10 5400
|
||||
1 1 0 0 4500
|
||||
1 2 0 0 5100
|
||||
1 3 0 0 4700
|
||||
1 4 0 0 5300
|
||||
1 5 0 0 4900
|
||||
2 1 0 0 5000
|
||||
2 2 0 0 4600
|
||||
2 3 0 0 5200
|
||||
2 4 0 0 4800
|
||||
2 5 0 0 5400
|
||||
0 0 0 49500
|
||||
0 0 1 4500
|
||||
0 0 2 4600
|
||||
0 0 3 4700
|
||||
0 0 4 4800
|
||||
0 0 5 4900
|
||||
0 0 6 5000
|
||||
0 0 7 5100
|
||||
0 0 8 5200
|
||||
0 0 9 5300
|
||||
0 0 10 5400
|
||||
1 1 0 4500
|
||||
1 2 0 5100
|
||||
1 3 0 4700
|
||||
1 4 0 5300
|
||||
1 5 0 4900
|
||||
2 1 0 5000
|
||||
2 2 0 4600
|
||||
2 3 0 5200
|
||||
2 4 0 4800
|
||||
2 5 0 5400
|
||||
1 0 24500
|
||||
1 1 4500
|
||||
1 3 4700
|
||||
1 5 4900
|
||||
1 7 5100
|
||||
1 9 5300
|
||||
2 0 25000
|
||||
2 2 4600
|
||||
2 4 4800
|
||||
2 6 5000
|
||||
2 8 5200
|
||||
2 10 5400
|
||||
|
||||
0 0 49500
|
||||
1 0 24500
|
||||
1 1 4500
|
||||
1 3 4700
|
||||
1 5 4900
|
||||
1 7 5100
|
||||
1 9 5300
|
||||
2 0 25000
|
||||
2 2 4600
|
||||
2 4 4800
|
||||
2 6 5000
|
||||
2 8 5200
|
||||
2 10 5400
|
||||
|
||||
0 0 49500
|
@ -1,44 +0,0 @@
|
||||
DROP TABLE IF EXISTS grouping_sets;
|
||||
|
||||
CREATE TABLE grouping_sets(fact_1_id Int32, fact_2_id Int32, fact_3_id Int32, fact_4_id Int32, sales_value Int32) ENGINE = Memory;
|
||||
|
||||
SELECT fact_1_id, fact_3_id, sum(sales_value), count() from grouping_sets GROUP BY GROUPING SETS(fact_1_id, fact_3_id) ORDER BY fact_1_id, fact_3_id;
|
||||
|
||||
INSERT INTO grouping_sets
|
||||
SELECT
|
||||
number % 2 + 1 AS fact_1_id,
|
||||
number % 5 + 1 AS fact_2_id,
|
||||
number % 10 + 1 AS fact_3_id,
|
||||
number % 10 + 1 AS fact_4_id,
|
||||
number % 100 AS sales_value
|
||||
FROM system.numbers limit 1000;
|
||||
|
||||
SELECT fact_1_id, fact_2_id, fact_3_id, SUM(sales_value) AS sales_value from grouping_sets
|
||||
GROUP BY GROUPING SETS ((fact_1_id, fact_2_id), (fact_1_id, fact_3_id))
|
||||
ORDER BY fact_1_id, fact_2_id, fact_3_id;
|
||||
|
||||
SELECT fact_1_id, fact_2_id, fact_3_id, fact_4_id, SUM(sales_value) AS sales_value from grouping_sets
|
||||
GROUP BY GROUPING SETS ((fact_1_id, fact_2_id), (fact_3_id, fact_4_id))
|
||||
ORDER BY fact_1_id, fact_2_id, fact_3_id, fact_4_id;
|
||||
|
||||
SELECT fact_1_id, fact_2_id, fact_3_id, SUM(sales_value) AS sales_value from grouping_sets
|
||||
GROUP BY GROUPING SETS ((fact_1_id, fact_2_id), (fact_3_id), ())
|
||||
ORDER BY fact_1_id, fact_2_id, fact_3_id;
|
||||
|
||||
SELECT
|
||||
fact_1_id,
|
||||
fact_3_id,
|
||||
SUM(sales_value) AS sales_value
|
||||
FROM grouping_sets
|
||||
GROUP BY grouping sets ((fact_1_id), (fact_1_id, fact_3_id)) WITH TOTALS
|
||||
ORDER BY fact_1_id, fact_3_id;
|
||||
|
||||
SELECT
|
||||
fact_1_id,
|
||||
fact_3_id,
|
||||
SUM(sales_value) AS sales_value
|
||||
FROM grouping_sets
|
||||
GROUP BY grouping sets (fact_1_id, (fact_1_id, fact_3_id)) WITH TOTALS
|
||||
ORDER BY fact_1_id, fact_3_id;
|
||||
|
||||
DROP TABLE grouping_sets;
|
Loading…
Reference in New Issue
Block a user