feat grouping-sets: initial changes

This commit is contained in:
MaxTheHuman 2021-05-13 17:51:07 +03:00 committed by Dmitry Novik
parent ff0bc94055
commit 2bd07ef338
11 changed files with 167 additions and 7 deletions

View File

@ -8,5 +8,6 @@ namespace DB
{
using ColumnNumbers = std::vector<size_t>;
using ColumnNumbersTwoDimension = std::vector<ColumnNumbers>;
}

View File

@ -107,6 +107,8 @@ public:
std::optional<NameAndTypePair> tryGetByName(const std::string & name) const;
};
using TwoDimensionNamesAndTypesList = std::list<NamesAndTypesList>;
}
namespace std

View File

@ -879,6 +879,7 @@ public:
/// What to count.
const ColumnNumbers keys;
const ColumnNumbersTwoDimension keys_vector;
const AggregateDescriptions aggregates;
const size_t keys_size;
const size_t aggregates_size;
@ -938,6 +939,27 @@ public:
{
}
/// two dimensional vector of aggregating keys in params
Params(
const Block & src_header_,
const ColumnNumbersTwoDimension & keys_vector_, const AggregateDescriptions & aggregates_,
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
VolumePtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys_vector(keys_vector_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_volume(tmp_volume_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)

View File

@ -345,6 +345,12 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
if (getContext()->getSettingsRef().enable_positional_arguments)
replaceForPositionalArguments(group_asts[i], select_query, ASTSelectQuery::Expression::GROUP_BY);
if (select_query->group_by_with_grouping_sets)
{
LOG_DEBUG(poco_log, "analyzeAggregation: detect group by with grouping sets");
/// TODO
}
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);
const auto & column_name = group_asts[i]->getColumnName();
@ -380,6 +386,11 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
/// Aggregation keys are uniqued.
if (!unique_keys.count(key.name))
{
if (select_query->group_by_with_grouping_sets)
{
aggregation_keys_list.push_back({key});
}
unique_keys.insert(key.name);
aggregation_keys.push_back(key);

View File

@ -65,6 +65,7 @@ struct ExpressionAnalyzerData
bool has_aggregation = false;
NamesAndTypesList aggregation_keys;
bool has_const_aggregation_keys = false;
TwoDimensionNamesAndTypesList aggregation_keys_list;
AggregateDescriptions aggregate_descriptions;
WindowDescriptions window_descriptions;
@ -94,6 +95,8 @@ private:
explicit ExtractedSettings(const Settings & settings_);
};
Poco::Logger * poco_log = &Poco::Logger::get("ExpressionAnalyzer");
public:
/// Ctor for non-select queries. Generally its usage is:
/// auto actions = ExpressionAnalyzer(query, syntax, context).getActions();
@ -321,6 +324,7 @@ public:
const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
bool hasConstAggregationKeys() const { return has_const_aggregation_keys; }
const TwoDimensionNamesAndTypesList & aggregationKeysList() const { return aggregation_keys_list; }
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }
const PreparedSets & getPreparedSets() const { return prepared_sets; }

View File

@ -1182,8 +1182,18 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (expressions.need_aggregate)
{
executeAggregation(
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
if (query.group_by_with_grouping_sets)
{
LOG_DEBUG(log, "About to run executeGroupingSets");
executeGroupingSets(
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
}
else
{
LOG_DEBUG(log, "About to run executeAggregation");
executeAggregation(
query_plan, expressions.before_aggregation, aggregate_overflow_row, aggregate_final, query_info.input_order_info);
}
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
}
@ -1256,8 +1266,6 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
executeRollupOrCube(query_plan, Modificator::ROLLUP);
else if (query.group_by_with_cube)
executeRollupOrCube(query_plan, Modificator::CUBE);
else if (query.group_by_with_grouping_sets)
executeGroupingSets(query_plan);
if ((query.group_by_with_rollup || query.group_by_with_cube || query.group_by_with_grouping_sets) && expressions.hasHaving())
{
@ -2057,7 +2065,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
{
keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(log, "executeAggregation pushed back key with name {} and number {}", key.name, header_before_aggregation.getPositionByName(key.name));
}
AggregateDescriptions aggregates = query_analyzer->aggregates();
for (auto & descr : aggregates)
@ -2203,14 +2214,18 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
query_plan.addStep(std::move(step));
}
void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan)
void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
/*
const auto & header_before_transform = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
for (const auto & key : query_analyzer->aggregationKeys())
{
keys.push_back(header_before_transform.getPositionByName(key.name));
LOG_DEBUG(log, "executeGroupingSets pushed back key with name {} and number {}", key.name, header_before_transform.getPositionByName(key.name));
}
const Settings & settings = context->getSettingsRef();
@ -2226,6 +2241,87 @@ void InterpreterSelectQuery::executeGroupingSets(QueryPlan & query_plan)
step = std::make_unique<GroupingSetsStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
query_plan.addStep(std::move(step));
*/
auto expression_before_aggregation = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
expression_before_aggregation->setStepDescription("Before GROUP BY");
query_plan.addStep(std::move(expression_before_aggregation));
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
ColumnNumbersTwoDimension keys_vector;
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
{
keys.clear();
for (const auto & key : aggregation_keys)
{
keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(
log,
"executeGroupingSets add key with name {} and number {}",
key.name,
header_before_aggregation.getPositionByName(key.name));
}
keys_vector.push_back(keys);
}
AggregateDescriptions aggregates = query_analyzer->aggregates();
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
{
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
LOG_DEBUG(
log,
"executeGroupingSets add descr.atruments with name {} and number {}",
name,
header_before_aggregation.getPositionByName(name));
}
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(
header_before_aggregation,
keys_vector,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
SortDescription group_by_sort_description;
if (group_by_info && settings.optimize_aggregation_in_order)
group_by_sort_description = getSortDescriptionFromGroupBy(getSelectQuery());
else
group_by_info = nullptr;
auto merge_threads = max_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead();
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
params,
final,
settings.max_block_size,
settings.aggregation_in_order_max_block_bytes,
merge_threads,
temporary_data_merge_threads,
storage_has_evenly_distributed_read,
std::move(group_by_info),
std::move(group_by_sort_description));
query_plan.addStep(std::move(aggregating_step));
}
void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description)

View File

@ -14,6 +14,7 @@
#include <Storages/TableLockHolder.h>
#include <Columns/FilterDescription.h>
#include "Interpreters/ActionsDAG.h"
namespace Poco
{
@ -160,7 +161,7 @@ private:
};
void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator);
void executeGroupingSets(QueryPlan & query_plan);
void executeGroupingSets(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info);
/** If there is a SETTINGS section in the SELECT query, then apply settings from it.
*

View File

@ -19,6 +19,7 @@ GroupingSetsTransform::GroupingSetsTransform(Block header, AggregatingTransformP
Chunk GroupingSetsTransform::merge(Chunks && chunks, bool final)
{
LOG_DEBUG(log, "merge {} blocks", chunks.size());
BlocksList rollup_blocks;
for (auto & chunk : chunks)
rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
@ -31,12 +32,15 @@ Chunk GroupingSetsTransform::merge(Chunks && chunks, bool final)
void GroupingSetsTransform::consume(Chunk chunk)
{
consumed_chunks.emplace_back(std::move(chunk));
LOG_DEBUG(log, "consumed block, now consumed_chunks size is {}", consumed_chunks.size());
}
Chunk GroupingSetsTransform::generate()
{
LOG_DEBUG(log, "generate start, mask = {}", mask);
if (!consumed_chunks.empty())
{
LOG_DEBUG(log, "consumed_chunks not empty, size is {}", consumed_chunks.size());
if (consumed_chunks.size() > 1)
grouping_sets_chunk = merge(std::move(consumed_chunks), false);
else
@ -46,6 +50,7 @@ Chunk GroupingSetsTransform::generate()
auto num_rows = grouping_sets_chunk.getNumRows();
mask = (UInt64(1) << keys.size());
LOG_DEBUG(log, "changed mask, mask = {}", mask);
current_columns = grouping_sets_chunk.getColumns();
current_zero_columns.clear();
@ -56,9 +61,10 @@ Chunk GroupingSetsTransform::generate()
}
// auto gen_chunk = std::move(cube_chunk);
LOG_DEBUG(log, "before if mask");
if (mask > 1)
{
LOG_DEBUG(log, "in if mask > 1");
mask = mask >> 1;
auto columns = current_columns;
@ -72,6 +78,7 @@ Chunk GroupingSetsTransform::generate()
chunks.emplace_back(std::move(columns), current_columns.front()->size());
grouping_sets_chunk = merge(std::move(chunks), false);
}
LOG_DEBUG(log, "before gen_chunk");
auto gen_chunk = std::move(grouping_sets_chunk);
finalizeChunk(gen_chunk);

View File

@ -29,6 +29,8 @@ private:
UInt64 mask = 0;
Poco::Logger * log = &Poco::Logger::get("GroupingSetsTransform");
Chunk merge(Chunks && chunks, bool final);
};

View File

@ -119,6 +119,7 @@ TotalsHavingTransform::TotalsHavingTransform(
IProcessor::Status TotalsHavingTransform::prepare()
{
LOG_DEBUG(log, "TotalsHavingTransform::prepare()");
if (!finished_transform)
{
auto status = ISimpleTransform::prepare();
@ -143,20 +144,24 @@ IProcessor::Status TotalsHavingTransform::prepare()
totals_output.push(std::move(totals));
totals_output.finish();
LOG_DEBUG(log, "exit TotalsHavingTransform::prepare()");
return Status::Finished;
}
void TotalsHavingTransform::work()
{
LOG_DEBUG(log, "TotalsHavingTransform::work()");
if (finished_transform)
prepareTotals();
else
ISimpleTransform::work();
LOG_DEBUG(log, "exit TotalsHavingTransform::work()");
}
void TotalsHavingTransform::transform(Chunk & chunk)
{
/// Block with values not included in `max_rows_to_group_by`. We'll postpone it.
LOG_DEBUG(log, "TotalsHavingTransform::transform()");
if (overflow_row)
{
const auto & info = chunk.getChunkInfo();
@ -249,10 +254,12 @@ void TotalsHavingTransform::transform(Chunk & chunk)
}
passed_keys += chunk.getNumRows();
LOG_DEBUG(log, "exit TotalsHavingTransform::transform()");
}
void TotalsHavingTransform::addToTotals(const Chunk & chunk, const IColumn::Filter * filter)
{
LOG_DEBUG(log, "TotalsHavingTransform::addToTotals()");
auto num_columns = chunk.getNumColumns();
for (size_t col = 0; col < num_columns; ++col)
{
@ -284,10 +291,12 @@ void TotalsHavingTransform::addToTotals(const Chunk & chunk, const IColumn::Filt
}
}
}
LOG_DEBUG(log, "exit TotalsHavingTransform::addToTotals()");
}
void TotalsHavingTransform::prepareTotals()
{
LOG_DEBUG(log, "TotalsHavingTransform::prepareTotals()");
/// If totals_mode == AFTER_HAVING_AUTO, you need to decide whether to add aggregates to TOTALS for strings,
/// not passed max_rows_to_group_by.
if (overflow_aggregates)
@ -312,6 +321,7 @@ void TotalsHavingTransform::prepareTotals()
/// Note: after expression totals may have several rows if `arrayJoin` was used in expression.
totals = Chunk(block.getColumns(), num_rows);
}
LOG_DEBUG(log, "exit TotalsHavingTransform::prepareTotals()");
}
}

View File

@ -2,6 +2,8 @@
#include <Processors/ISimpleTransform.h>
#include <Common/Arena.h>
#include <base/logger_useful.h> // to be removed
// #include <Poco/Util/Application.h>
namespace DB
{
@ -74,6 +76,8 @@ private:
/// Here, total values are accumulated. After the work is finished, they will be placed in totals.
MutableColumns current_totals;
Poco::Logger * log = &Poco::Logger::get("TotalsHavingTransform");
};
void finalizeChunk(Chunk & chunk);