improve performance of aggregation in order of sorting key

This commit is contained in:
Anton Popov 2021-01-22 05:34:08 +03:00
parent 4b857c0879
commit 573edbcd11
10 changed files with 253 additions and 40 deletions

View File

@ -366,4 +366,20 @@ private:
}
};
template <typename TLeftColumns, typename TRightColumns>
bool less(const TLeftColumns & lhs, const TRightColumns & rhs, size_t i, size_t j, const SortDescription & descr)
{
for (const auto & elem : descr)
{
size_t ind = elem.column_number;
int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
}

View File

@ -0,0 +1,134 @@
#include <Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Core/SortCursor.h>
#include <ext/range.h>
namespace DB
{
FinishAggregatingInOrderAlgorithm::State::State(
const Chunk & chunk, const SortDescription & desc)
: num_rows(chunk.getNumRows())
, all_columns(chunk.getColumns())
{
sorting_columns.reserve(desc.size());
for (const auto & column_desc : desc)
sorting_columns.emplace_back(all_columns[column_desc.column_number].get());
}
FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_,
size_t max_block_size_)
: merged_data(header_.cloneEmptyColumns(), false, max_block_size_)
, header(header_)
, num_inputs(num_inputs_)
, params(params_)
, description(description_)
{
/// Replace column names in description to positions.
for (auto & column_description : description)
{
if (!column_description.column_name.empty())
{
column_description.column_number = header_.getPositionByName(column_description.column_name);
column_description.column_name.clear();
}
}
}
void FinishAggregatingInOrderAlgorithm::initialize(Inputs inputs)
{
current_inputs = std::move(inputs);
states.reserve(num_inputs);
for (size_t i = 0; i < num_inputs; ++i)
states.emplace_back(current_inputs[i].chunk, description);
}
void FinishAggregatingInOrderAlgorithm::consume(Input & input, size_t source_num)
{
states[source_num] = State{input.chunk, description};
}
IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
{
std::optional<size_t> best_input;
for (size_t i = 0; i < num_inputs; ++i)
{
if (!states[i].isValid())
continue;
if (!best_input
|| less(states[i].sorting_columns, states[*best_input].sorting_columns,
states[i].num_rows - 1, states[*best_input].num_rows - 1, description))
{
best_input = i;
}
}
if (!best_input)
return Status{merged_data.pull(), true};
auto & best_state = states[*best_input];
best_state.to_row = states[*best_input].num_rows;
for (size_t i = 0; i < num_inputs; ++i)
{
if (!states[i].isValid() || i == *best_input)
continue;
auto indices = ext::range(states[i].current_row, states[i].num_rows);
auto it = std::upper_bound(indices.begin(), indices.end(), best_state.num_rows - 1,
[&](size_t lhs_pos, size_t rhs_pos)
{
return less(best_state.sorting_columns, states[i].sorting_columns, lhs_pos, rhs_pos, description);
});
states[i].to_row = (it == indices.end() ? states[i].num_rows : *it);
}
auto aggregated = aggregate();
for (size_t i = 0; i < aggregated.rows(); ++i)
merged_data.insertRow(aggregated.getColumns(), i, aggregated.rows());
Status status(*best_input);
if (merged_data.hasEnoughRows())
status.chunk = merged_data.pull();
return status;
}
Block FinishAggregatingInOrderAlgorithm::aggregate()
{
BlocksList blocks;
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & state = states[i];
if (!state.isValid())
continue;
if (state.current_row == 0 && state.to_row == state.num_rows)
{
blocks.emplace_back(header.cloneWithColumns(states[i].all_columns));
}
else
{
Columns new_columns;
new_columns.reserve(state.all_columns.size());
for (const auto & column : state.all_columns)
new_columns.emplace_back(column->cut(state.current_row, state.to_row - state.current_row));
blocks.emplace_back(header.cloneWithColumns(new_columns));
}
states[i].current_row = states[i].to_row;
}
return params->aggregator.mergeBlocks(blocks, false);
}
}

View File

@ -0,0 +1,53 @@
#pragma once
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/Algorithms/MergedData.h>
#include <Core/SortDescription.h>
#include <Core/Block.h>
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class FinishAggregatingInOrderAlgorithm final : public IMergingAlgorithm
{
public:
FinishAggregatingInOrderAlgorithm(
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_,
size_t max_block_size_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
Status merge() override;
struct State
{
size_t num_rows;
Columns all_columns;
ColumnRawPtrs sorting_columns;
size_t current_row = 0;
size_t to_row = 0;
State(const Chunk & chunk, const SortDescription & description);
bool isValid() const { return current_row < num_rows; }
};
private:
Block aggregate();
MergedData merged_data;
Block header;
size_t num_inputs;
AggregatingTransformParamsPtr params;
SortDescription description;
Inputs current_inputs;
std::vector<State> states;
};
}

View File

@ -21,7 +21,8 @@ public:
/// Pull will be called at next prepare call.
void flush() { need_flush = true; }
void insertRow(const ColumnRawPtrs & raw_columns, size_t row, size_t block_size)
template <typename TColumns>
void insertRow(const TColumns & raw_columns, size_t row, size_t block_size)
{
size_t num_columns = raw_columns.size();
for (size_t i = 0; i < num_columns; ++i)

View File

@ -0,0 +1,34 @@
#pragma once
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h>
namespace DB
{
class ColumnAggregateFunction;
/// Implementation of IMergingTransform via FinishAggregatingInOrderAlgorithm.
class FinishAggregatingInOrderTransform final : public IMergingTransform<FinishAggregatingInOrderAlgorithm>
{
public:
FinishAggregatingInOrderTransform(
const Block & header,
size_t num_inputs,
AggregatingTransformParamsPtr params,
SortDescription description,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,
params,
std::move(description),
max_block_size)
{
}
String getName() const override { return "AggregatingSortedTransform"; }
};
}

View File

@ -3,6 +3,7 @@
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/FinishAggregatingInOrderTransform.h>
namespace DB
{
@ -95,9 +96,10 @@ void AggregatingStep::transformPipeline(QueryPipeline & pipeline)
}
}
auto transform = std::make_shared<AggregatingSortedTransform>(
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
transform_params,
group_by_sort_description,
max_block_size);

View File

@ -1,5 +1,8 @@
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Core/SortCursor.h>
#include <ext/range.h>
namespace DB
{
@ -46,21 +49,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
AggregatingInOrderTransform::~AggregatingInOrderTransform() = default;
static bool less(const MutableColumns & lhs, const Columns & rhs, size_t i, size_t j, const SortDescription & descr)
{
for (const auto & elem : descr)
{
size_t ind = elem.column_number;
int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void AggregatingInOrderTransform::consume(Chunk chunk)
{
size_t rows = chunk.getNumRows();

View File

@ -88,5 +88,4 @@ private:
AggregatingTransformParamsPtr params;
};
}

View File

@ -37,20 +37,6 @@ FinishSortingTransform::FinishSortingTransform(
description_sorted.assign(description.begin(), description.begin() + prefix_size);
}
static bool less(const Columns & lhs, const Columns & rhs, size_t i, size_t j, const SortDescription & descr)
{
for (const auto & elem : descr)
{
size_t ind = elem.column_number;
int res = elem.direction * lhs[ind]->compareAt(i, j, *rhs[ind], elem.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void FinishSortingTransform::consume(Chunk chunk)
{
generated_prefix = false;

View File

@ -9,12 +9,12 @@ INSERT INTO pk_order(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 1
-- Order after group by in order is determined
SELECT a, b FROM pk_order GROUP BY a, b;
SELECT a FROM pk_order GROUP BY a;
SELECT a, b FROM pk_order GROUP BY a, b ORDER BY a, b;
SELECT a FROM pk_order GROUP BY a ORDER BY a;
SELECT a, b, sum(c), avg(d) FROM pk_order GROUP BY a, b;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY a;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY -a;
SELECT a, b, sum(c), avg(d) FROM pk_order GROUP BY a, b ORDER BY a, b;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY a ORDER BY a;
SELECT a, sum(c), avg(d) FROM pk_order GROUP BY -a ORDER BY a;
DROP TABLE IF EXISTS pk_order;
@ -26,8 +26,8 @@ INSERT INTO pk_order
set max_block_size = 1;
SELECT d, max(b) FROM pk_order GROUP BY d, a LIMIT 5;
SELECT d, avg(a) FROM pk_order GROUP BY toString(d) LIMIT 5;
SELECT toStartOfHour(d) as d1, min(a), max(b) FROM pk_order GROUP BY d1 LIMIT 5;
SELECT d, max(b) FROM pk_order GROUP BY d, a ORDER BY d, a LIMIT 5;
SELECT d, avg(a) FROM pk_order GROUP BY toString(d) ORDER BY toString(d) LIMIT 5;
SELECT toStartOfHour(d) as d1, min(a), max(b) FROM pk_order GROUP BY d1 ORDER BY d1 LIMIT 5;
DROP TABLE pk_order;