Improve performance of external aggregation with a lot of temporary files

This commit is contained in:
Maksim Kita 2023-10-11 13:17:12 +03:00
parent 80aa9cad51
commit f9033bdf31
2 changed files with 84 additions and 57 deletions

View File

@ -20,37 +20,9 @@ GroupingAggregatedTransform::GroupingAggregatedTransform(
, num_inputs(num_inputs_)
, params(std::move(params_))
, last_bucket_number(num_inputs, -1)
, read_from_input(num_inputs, false)
{
}
void GroupingAggregatedTransform::readFromAllInputs()
{
auto in = inputs.begin();
read_from_all_inputs = true;
for (size_t i = 0; i < num_inputs; ++i, ++in)
{
if (in->isFinished())
continue;
if (read_from_input[i])
continue;
in->setNeeded();
if (!in->hasData())
{
read_from_all_inputs = false;
continue;
}
auto chunk = in->pull();
read_from_input[i] = true;
addChunk(std::move(chunk), i);
}
}
void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows)
{
auto & output = outputs.front();
@ -119,7 +91,7 @@ bool GroupingAggregatedTransform::tryPushOverflowData()
return true;
}
IProcessor::Status GroupingAggregatedTransform::prepare()
IProcessor::Status GroupingAggregatedTransform::prepare(const PortNumbers & updated_input_ports, const PortNumbers &)
{
/// Check can output.
auto & output = outputs.front();
@ -137,30 +109,36 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
/// Read first time from each input to understand if we have two-level aggregation.
if (!read_from_all_inputs)
{
readFromAllInputs();
if (!read_from_all_inputs)
read_from_all_inputs = true;
auto in = inputs.begin();
index_to_input.resize(num_inputs);
for (size_t i = 0; i < num_inputs; ++i, ++in)
{
index_to_input[i] = in;
if (in->isFinished())
continue;
in->setNeeded();
if (!in->hasData())
{
wait_input_ports_numbers.insert(i);
continue;
}
auto chunk = in->pull();
addChunk(std::move(chunk), i);
}
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
if (!wait_input_ports_numbers.empty())
return Status::NeedData;
}
/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
/// Check can push (to avoid data caching).
if (!output.canPush())
{
for (auto & input : inputs)
input.setNotNeeded();
return Status::PortFull;
}
bool pushed_to_output = false;
/// Output if has data.
if (has_two_level)
pushed_to_output = tryPushTwoLevelData();
auto need_input = [this](size_t input_num)
{
if (last_bucket_number[input_num] < current_bucket)
@ -169,6 +147,51 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket;
};
if (!wait_input_ports_numbers.empty())
{
for (const auto & updated_input_port_number : updated_input_ports)
{
auto & input = index_to_input[updated_input_port_number];
// input->setNeeded();
if (!input->hasData())
{
wait_input_ports_numbers.erase(updated_input_port_number);
continue;
}
auto chunk = input->pull();
addChunk(std::move(chunk), updated_input_port_number);
if (!input->isFinished() && need_input(updated_input_port_number))
continue;
wait_input_ports_numbers.erase(updated_input_port_number);
}
if (!output.canPush())
return Status::PortFull;
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
if (!wait_input_ports_numbers.empty())
return Status::NeedData;
}
if (!output.canPush())
return Status::PortFull;
/// Convert single level to two levels if have two-level input.
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
bool pushed_to_output = false;
/// Output if has data.
if (has_two_level)
pushed_to_output = tryPushTwoLevelData();
/// Read next bucket if can.
for (; ; ++current_bucket)
{
@ -190,6 +213,7 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
if (!in->hasData())
{
wait_input_ports_numbers.insert(input_num);
need_data = true;
continue;
}
@ -197,13 +221,16 @@ IProcessor::Status GroupingAggregatedTransform::prepare()
auto chunk = in->pull();
addChunk(std::move(chunk), input_num);
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
if (!in->isFinished() && need_input(input_num))
{
wait_input_ports_numbers.insert(input_num);
need_data = true;
}
}
if (has_two_level && !single_level_chunks.empty())
return Status::Ready;
if (finished)
{
all_inputs_finished = true;

View File

@ -1,4 +1,5 @@
#pragma once
#include <Core/SortDescription.h>
#include <Interpreters/Aggregator.h>
#include <Processors/IProcessor.h>
@ -67,7 +68,7 @@ public:
void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; }
protected:
Status prepare() override;
Status prepare(const PortNumbers & updated_input_ports, const PortNumbers &) override;
void work() override;
private:
@ -84,15 +85,14 @@ private:
bool all_inputs_finished = false;
bool read_from_all_inputs = false;
std::vector<bool> read_from_input;
std::vector<InputPorts::iterator> index_to_input;
std::unordered_set<uint64_t> wait_input_ports_numbers;
/// If we aggregate partitioned data several chunks might be produced for the same bucket: one for each partition.
bool expect_several_chunks_for_single_bucket_per_source = true;
/// Add chunk read from input to chunks_map, overflow_chunks or single_level_chunks according to it's chunk info.
void addChunk(Chunk chunk, size_t input);
/// Read from all inputs first chunk. It is needed to detect if any source has two-level aggregation.
void readFromAllInputs();
/// Push chunks if all inputs has single level.
bool tryPushSingleLevelData();
/// Push chunks from ready bucket if has one.