grouping sets development

This commit is contained in:
MaxTheHuman 2021-05-16 22:44:20 +03:00 committed by Dmitry Novik
parent d2258decf5
commit ddd1799743
7 changed files with 52 additions and 26 deletions

View File

@ -881,7 +881,7 @@ public:
ColumnNumbers keys;
const ColumnNumbersTwoDimension keys_vector;
const AggregateDescriptions aggregates;
const size_t keys_size;
size_t keys_size;
const size_t aggregates_size;
/// The settings of approximate calculation of GROUP BY.
@ -942,6 +942,7 @@ public:
/// two dimensional vector of aggregating keys in params
Params(
const Block & src_header_,
const ColumnNumbers & keys_,
const ColumnNumbersTwoDimension & keys_vector_, const AggregateDescriptions & aggregates_,
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
@ -954,6 +955,7 @@ public:
const Block & intermediate_header_ = {})
: src_header(src_header_),
intermediate_header(intermediate_header_),
keys(keys_),
keys_vector(keys_vector_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),

View File

@ -390,6 +390,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
if (select_query->group_by_with_grouping_sets)
{
aggregation_keys_list.push_back({key});
aggregation_keys.push_back(key);
LOG_DEBUG(poco_log, "pushed grouping set of 1 column: " + key.name);
}
else

View File

@ -1049,11 +1049,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
* but there is an ORDER or LIMIT,
* then we will perform the preliminary sorting and LIMIT on the remote server.
*/
LOG_DEBUG(log, "in preliminary_sort()");
if (!expressions.second_stage
&& !expressions.need_aggregate
&& !expressions.hasHaving()
&& !expressions.has_window)
{
LOG_DEBUG(log, "in if in preliminary_sort()");
if (expressions.has_order_by)
executeOrder(
query_plan,
@ -1225,6 +1227,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
}
LOG_DEBUG(log, "ran executeAggregation, before preliminary_sort()");
preliminary_sort();
@ -1361,6 +1364,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
bool apply_offset = options.to_stage != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
if (apply_prelimit)
{
LOG_DEBUG(log, "before prelimit");
executePreLimit(query_plan, /* do_not_skip_offset= */!apply_offset);
}
@ -1372,16 +1376,19 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (!from_aggregation_stage && expressions.hasLimitBy())
{
LOG_DEBUG(log, "before limit by");
executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY");
executeLimitBy(query_plan);
}
LOG_DEBUG(log, "before with fill");
executeWithFill(query_plan);
/// If we have 'WITH TIES', we need execute limit before projection,
/// because in that case columns from 'ORDER BY' are used.
if (query.limit_with_ties && apply_offset)
{
LOG_DEBUG(log, "before limit");
executeLimit(query_plan);
}
@ -1390,10 +1397,12 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (!to_aggregation_stage)
{
/// We must do projection after DISTINCT because projection may remove some columns.
LOG_DEBUG(log, "before projection");
executeProjection(query_plan, expressions.final_projection);
}
/// Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok.
LOG_DEBUG(log, "before extremes");
executeExtremes(query_plan);
bool limit_applied = apply_prelimit || (query.limit_with_ties && apply_offset);
@ -2056,6 +2065,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
const auto & header_before_aggregation = query_plan.getCurrentDataStream().header;
ColumnNumbers keys;
ColumnNumbers all_keys;
ColumnNumbersTwoDimension keys_vector;
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
@ -2066,6 +2076,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
for (const auto & key : aggregation_keys)
{
keys.push_back(header_before_aggregation.getPositionByName(key.name));
all_keys.push_back(header_before_aggregation.getPositionByName(key.name));
LOG_DEBUG(
log,
"GroupingSets add key with name {} and number {}",
@ -2088,37 +2099,22 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
}
}
LOG_DEBUG(log, "GroupingSets debug 1");
AggregateDescriptions aggregates = query_analyzer->aggregates();
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
LOG_DEBUG(log, "GroupingSets debug 2");
const Settings & settings = context->getSettingsRef();
// Aggregator::Params params(
// header_before_aggregation,
// keys,
// aggregates,
// overflow_row,
// settings.max_rows_to_group_by,
// settings.group_by_overflow_mode,
// settings.group_by_two_level_threshold,
// settings.group_by_two_level_threshold_bytes,
// settings.max_bytes_before_external_group_by,
// settings.empty_result_for_aggregation_by_empty_set
// || (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
// && query_analyzer->hasConstAggregationKeys()),
// context->getTemporaryVolume(),
// settings.max_threads,
// settings.min_free_disk_space_for_temporary_data,
// settings.compile_aggregate_expressions,
// settings.min_count_to_compile_aggregate_expression);
LOG_DEBUG(log, "GroupingSets debug 3");
std::shared_ptr<Aggregator::Params> params_ptr;
if (query.group_by_with_grouping_sets)
{
LOG_DEBUG(log, "GroupingSets debug 4");
params_ptr = std::make_shared<Aggregator::Params>(
header_before_aggregation,
all_keys,
keys_vector,
aggregates,
overflow_row,
@ -2157,7 +2153,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
LOG_DEBUG(log, "GroupingSets debug 5");
SortDescription group_by_sort_description;
if (group_by_info && settings.optimize_aggregation_in_order && !query.group_by_with_grouping_sets)
@ -2171,7 +2167,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
: static_cast<size_t>(settings.max_threads);
bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead();
LOG_DEBUG(log, "GroupingSets debug 6");
LOG_DEBUG(log, "GroupingSets step header structure: {}", query_plan.getCurrentDataStream().header.dumpStructure());
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
*params_ptr,
@ -2183,8 +2180,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
storage_has_evenly_distributed_read,
std::move(group_by_info),
std::move(group_by_sort_description));
LOG_DEBUG(log, "GroupingSets step header structure: {}", aggregating_step->getOutputStream().header.dumpStructure());
LOG_DEBUG(log, "GroupingSets debug 7");
query_plan.addStep(std::move(aggregating_step));
LOG_DEBUG(log, "GroupingSets debug 8");
}
void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool overflow_row, bool final)

View File

@ -501,6 +501,7 @@ void AggregatingTransform::work()
Processors AggregatingTransform::expandPipeline()
{
LOG_DEBUG(log, "in AggregatingTransform::expandPipeline");
auto & out = processors.back()->getOutputs().front();
inputs.emplace_back(out.getHeader(), this);
connect(out, inputs.back());

View File

@ -434,6 +434,8 @@ void Pipe::addTransform(ProcessorPtr transform)
void Pipe::addParallelTransforms(Processors transforms)
{
LOG_DEBUG(log, "Begin addParallelTransforms, have {} transforms", transforms.size());
if (output_ports.empty())
throw Exception("Cannot add parallel transforms to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
@ -452,6 +454,7 @@ void Pipe::addParallelTransforms(Processors transforms)
throw Exception("Each parallel transform should have one output port", ErrorCodes::LOGICAL_ERROR);
outputs.push_back(current_transform_outputs.front());
LOG_DEBUG(log, "addParallelTransforms, added inputs and outputs for processor {}", transform->getName());
}
if (inputs.size() != output_ports.size())
@ -464,6 +467,7 @@ void Pipe::addParallelTransforms(Processors transforms)
{
connect(*output_ports[next_output], input);
++next_output;
LOG_DEBUG(log, "addParallelTransforms connect current output to new input {}", next_output);
}
output_ports.clear();
@ -471,15 +475,22 @@ void Pipe::addParallelTransforms(Processors transforms)
for (auto & output : outputs)
{
LOG_DEBUG(log, "addParallelTransforms change outputs to new output");
LOG_DEBUG(log, "addParallelTransforms is output connected: {}", output.isConnected());
output_ports.emplace_back(&output);
}
/// do not check output formats because they are different in case of parallel aggregations
LOG_DEBUG(log, "addParallelTransforms do not check format");
if (collected_processors)
collected_processors->insert(collected_processors->end(), transforms.begin(), transforms.end());
processors.insert(processors.end(), transforms.begin(), transforms.end());
for (auto & transform_ptr : transforms)
{
processors.emplace_back(std::move(transform_ptr));
}
LOG_DEBUG(log, "addParallelTransforms inserted processors, now processors is of size {}", processors.size());
/// Should not change streams number, so maybe not need max_parallel_streams update
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
@ -512,6 +523,15 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
size_t next_output = 0;
for (auto & input : inputs)
{
LOG_DEBUG(log, "Pipe: is input connected {}", input.isConnected());
LOG_DEBUG(log, "Pipe: is output connected {}", output_ports[next_output]->isConnected());
if (output_ports[next_output]->isConnected())
{
if (output_ports[next_output]->getHeader())
LOG_DEBUG(log, "output header structure is: {}", output_ports[next_output]->getHeader().dumpStructure());
else
LOG_DEBUG(log, "could not retrieve info about output");
}
connect(*output_ports[next_output], input);
++next_output;
}

View File

@ -4,6 +4,7 @@
#include <QueryPipeline/PipelineResourcesHolder.h>
#include <QueryPipeline/Chain.h>
#include <QueryPipeline/SizeLimits.h>
#include <base/logger_useful.h>
namespace DB
{
@ -137,6 +138,8 @@ private:
/// It is needed for debug. See QueryPipelineProcessorsCollector.
Processors * collected_processors = nullptr;
Poco::Logger * log = &Poco::Logger::get("Pipe");
/// This methods are for QueryPipeline. It is allowed to complete graph only there.
/// So, we may be sure that Pipe always has output port if not empty.
bool isCompleted() const { return !empty() && output_ports.empty(); }

View File

@ -67,7 +67,7 @@ public:
/// Transform pipeline in general way.
void transform(const Transformer & transformer);
/// Add transform and connects it to outputs[stream_index] stream
/// Add transforms and connect it to outputs streams
void addParallelTransforms(Processors transform);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform);