mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Added MergingAggregatedSimpleTransform.
This commit is contained in:
parent
b85e554c49
commit
6cedacb055
@ -52,6 +52,9 @@ ISimpleTransform::Status ISimpleTransform::prepare()
|
||||
|
||||
current_chunk = input.pull();
|
||||
has_input = true;
|
||||
|
||||
if (set_input_not_needed_after_read)
|
||||
input.setNotNeeded();
|
||||
}
|
||||
|
||||
/// Now transform.
|
||||
|
@ -20,6 +20,11 @@ protected:
|
||||
bool transformed = false;
|
||||
const bool skip_empty_chunks;
|
||||
|
||||
/// Set input port NotNeeded after chunk was pulled.
|
||||
/// Input port will become needed again only after data was transformed.
|
||||
/// This allows to escape caching chunks in input port, which can lead to uneven data distribution.
|
||||
bool set_input_not_needed_after_read = false;
|
||||
|
||||
virtual void transform(Chunk & chunk) = 0;
|
||||
|
||||
public:
|
||||
@ -30,6 +35,8 @@ public:
|
||||
|
||||
InputPort & getInputPort() { return input; }
|
||||
OutputPort & getOutputPort() { return output; }
|
||||
|
||||
void setInputNotNeededAfterRead(bool value) { set_input_not_needed_after_read = value; }
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ public:
|
||||
size_t max_bytes_before_remerge_,
|
||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
|
||||
|
||||
~MergeSortingTransform();
|
||||
~MergeSortingTransform() override;
|
||||
|
||||
String getName() const override { return "MergeSortingTransform"; }
|
||||
|
||||
|
@ -0,0 +1,433 @@
|
||||
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
|
||||
|
||||
#include <Processors/ISimpleTransform.h>
|
||||
#include <Interpreters/Aggregator.h>
|
||||
#include <Processors/ResizeProcessor.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct ChunksToMerge : public ChunkInfo
|
||||
{
|
||||
std::unique_ptr<Chunks> chunks;
|
||||
Int32 bucket_num = -1;
|
||||
bool is_overflows = false;
|
||||
};
|
||||
|
||||
GroupingAggregatedTransform::GroupingAggregatedTransform(
|
||||
const Block & header, size_t num_inputs, AggregatingTransformParamsPtr params)
|
||||
: IProcessor(InputPorts(num_inputs, header), {header})
|
||||
, num_inputs(num_inputs)
|
||||
, params(std::move(params))
|
||||
, last_bucket_number(num_inputs, -1)
|
||||
, read_from_input(num_inputs, false)
|
||||
{
|
||||
}
|
||||
|
||||
void GroupingAggregatedTransform::readFromAllInputs()
|
||||
{
|
||||
auto in = inputs.begin();
|
||||
for (size_t i = 0; i < num_inputs; ++i, ++in)
|
||||
{
|
||||
if (in->isFinished())
|
||||
continue;
|
||||
|
||||
if (read_from_input[i])
|
||||
continue;
|
||||
|
||||
in->setNeeded();
|
||||
|
||||
if (!in->hasData())
|
||||
return;
|
||||
|
||||
auto chunk = in->pull();
|
||||
read_from_input[i] = true;
|
||||
addChunk(std::move(chunk), i);
|
||||
}
|
||||
|
||||
read_from_all_inputs = true;
|
||||
}
|
||||
|
||||
void GroupingAggregatedTransform::pushData(Chunks chunks, Int32 bucket, bool is_overflows)
|
||||
{
|
||||
auto & output = outputs.front();
|
||||
|
||||
auto info = std::make_shared<ChunksToMerge>();
|
||||
info->bucket_num = bucket;
|
||||
info->is_overflows = is_overflows;
|
||||
info->chunks = std::make_unique<Chunks>(std::move(chunks));
|
||||
|
||||
Chunk chunk;
|
||||
chunk.setChunkInfo(std::move(info));
|
||||
output.push(std::move(chunk));
|
||||
}
|
||||
|
||||
bool GroupingAggregatedTransform::tryPushTwoLevelData()
|
||||
{
|
||||
auto try_push_by_iter = [&](auto batch_it)
|
||||
{
|
||||
if (batch_it == chunks.end())
|
||||
return false;
|
||||
|
||||
Chunks & cur_chunks = batch_it->second;
|
||||
if (cur_chunks.empty())
|
||||
{
|
||||
chunks.erase(batch_it);
|
||||
return false;
|
||||
}
|
||||
|
||||
pushData(std::move(cur_chunks), current_bucket, false);
|
||||
chunks.erase(batch_it);
|
||||
return true;
|
||||
};
|
||||
|
||||
if (all_inputs_finished)
|
||||
{
|
||||
/// Chunks are sorted by bucket.
|
||||
while (!chunks.empty())
|
||||
if (try_push_by_iter(chunks.begin()))
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (; next_bucket_to_push < current_bucket; ++next_bucket_to_push)
|
||||
if (try_push_by_iter(chunks.find(next_bucket_to_push)))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool GroupingAggregatedTransform::tryPushSingleLevelData()
|
||||
{
|
||||
if (single_level_chunks.empty())
|
||||
return false;
|
||||
|
||||
pushData(single_level_chunks, -1, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool GroupingAggregatedTransform::tryPushOverflowData()
|
||||
{
|
||||
if (overflow_chunks.empty())
|
||||
return false;
|
||||
|
||||
pushData(overflow_chunks, -1, true);
|
||||
return true;
|
||||
}
|
||||
|
||||
IProcessor::Status GroupingAggregatedTransform::prepare()
|
||||
{
|
||||
/// Check can output.
|
||||
auto & output = outputs.front();
|
||||
|
||||
if (output.isFinished())
|
||||
{
|
||||
for (auto & input : inputs)
|
||||
input.close();
|
||||
|
||||
chunks.clear();
|
||||
last_bucket_number.clear();
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
/// Read first time from each input to understand if we have two-level aggregation.
|
||||
if (!read_from_all_inputs)
|
||||
{
|
||||
readFromAllInputs();
|
||||
if (!read_from_all_inputs)
|
||||
return Status::NeedData;
|
||||
}
|
||||
|
||||
/// Convert single level to two levels if have two-level input.
|
||||
if (has_two_level && !single_level_chunks.empty())
|
||||
return Status::Ready;
|
||||
|
||||
/// Check can push (to avoid data caching).
|
||||
if (!output.canPush())
|
||||
{
|
||||
for (auto & input : inputs)
|
||||
input.setNotNeeded();
|
||||
|
||||
return Status::PortFull;
|
||||
}
|
||||
|
||||
bool pushed_to_output = false;
|
||||
|
||||
/// Output if has data.
|
||||
if (has_two_level)
|
||||
pushed_to_output = tryPushTwoLevelData();
|
||||
|
||||
auto need_input = [this](size_t input_num)
|
||||
{
|
||||
if (last_bucket_number[input_num] < current_bucket)
|
||||
return true;
|
||||
|
||||
return expect_several_chunks_for_single_bucket_per_source && last_bucket_number[input_num] == current_bucket;
|
||||
};
|
||||
|
||||
/// Read next bucket if can.
|
||||
for (; ; ++current_bucket)
|
||||
{
|
||||
bool finished = true;
|
||||
bool need_data = false;
|
||||
|
||||
auto in = inputs.begin();
|
||||
for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in)
|
||||
{
|
||||
if (in->isFinished())
|
||||
continue;
|
||||
|
||||
finished = false;
|
||||
|
||||
if (!need_input(input_num))
|
||||
continue;
|
||||
|
||||
in->setNeeded();
|
||||
|
||||
if (!in->hasData())
|
||||
{
|
||||
need_data = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
auto chunk = in->pull();
|
||||
addChunk(std::move(chunk), input_num);
|
||||
|
||||
if (has_two_level && !single_level_chunks.empty())
|
||||
return Status::Ready;
|
||||
|
||||
if (need_input(input_num))
|
||||
need_data = true;
|
||||
}
|
||||
|
||||
if (finished)
|
||||
{
|
||||
all_inputs_finished = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (need_data)
|
||||
return Status::NeedData;
|
||||
}
|
||||
|
||||
if (pushed_to_output)
|
||||
return Status::PortFull;
|
||||
|
||||
if (has_two_level)
|
||||
{
|
||||
if (tryPushTwoLevelData())
|
||||
return Status::PortFull;
|
||||
|
||||
/// Sanity check. If new bucket was read, we should be able to push it.
|
||||
if (!all_inputs_finished)
|
||||
throw Exception("GroupingAggregatedTransform has read new two-level bucket, but couldn't push it.",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!all_inputs_finished)
|
||||
throw Exception("GroupingAggregatedTransform should have read all chunks for single level aggregation, "
|
||||
"but not all of the inputs are finished.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (tryPushSingleLevelData())
|
||||
return Status::PortFull;
|
||||
}
|
||||
|
||||
/// If we haven't pushed to output, then all data was read. Push overflows if have.
|
||||
if (tryPushOverflowData())
|
||||
return Status::PortFull;
|
||||
|
||||
output.finish();
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
void GroupingAggregatedTransform::addChunk(Chunk chunk, size_t input)
|
||||
{
|
||||
auto & info = chunk.getChunkInfo();
|
||||
if (!info)
|
||||
throw Exception("Chunk info was not set for chunk in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
|
||||
if (!agg_info)
|
||||
throw Exception("Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
Int32 bucket = agg_info->bucket_num;
|
||||
bool is_overflows = agg_info->is_overflows;
|
||||
|
||||
if (is_overflows)
|
||||
overflow_chunks.emplace_back(std::move(chunk));
|
||||
else if (bucket < 0)
|
||||
single_level_chunks.emplace_back(std::move(chunk));
|
||||
else
|
||||
{
|
||||
chunks[bucket].emplace_back(std::move(chunk));
|
||||
has_two_level = true;
|
||||
last_bucket_number[input] = bucket;
|
||||
}
|
||||
}
|
||||
|
||||
void GroupingAggregatedTransform::work()
|
||||
{
|
||||
if (!single_level_chunks.empty())
|
||||
{
|
||||
auto & header = getOutputs().front().getHeader();
|
||||
auto block = header.cloneWithColumns(single_level_chunks.back().detachColumns());
|
||||
single_level_chunks.pop_back();
|
||||
auto blocks = params->aggregator.convertBlockToTwoLevel(block);
|
||||
|
||||
for (auto & cur_block : blocks)
|
||||
{
|
||||
Int32 bucket = cur_block.info.bucket_num;
|
||||
chunks[bucket].emplace_back(Chunk(cur_block.getColumns(), cur_block.rows()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
MergingAggregatedBucketTransform::MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params)
|
||||
: ISimpleTransform({}, params->getHeader(), false), params(std::move(params))
|
||||
{
|
||||
setInputNotNeededAfterRead(true);
|
||||
}
|
||||
|
||||
void MergingAggregatedBucketTransform::transform(Chunk & chunk)
|
||||
{
|
||||
auto & info = chunk.getChunkInfo();
|
||||
auto * chunks_to_merge = typeid_cast<const ChunksToMerge *>(info.get());
|
||||
|
||||
if (!chunks_to_merge)
|
||||
throw Exception("MergingAggregatedSimpleTransform chunk must have ChunkInfo with type ChunksToMerge.",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
BlocksList blocks_list;
|
||||
for (auto & cur_chunk : *chunks_to_merge->chunks)
|
||||
blocks_list.emplace_back(getInputPort().getHeader().cloneWithColumns(cur_chunk.detachColumns()));
|
||||
|
||||
chunk.setChunkInfo(nullptr);
|
||||
|
||||
auto block = params->aggregator.mergeBlocks(blocks_list, params->final);
|
||||
size_t num_rows = block.rows();
|
||||
chunk.setColumns(block.getColumns(), num_rows);
|
||||
}
|
||||
|
||||
|
||||
SortingAggregatedTransform::SortingAggregatedTransform(size_t num_inputs, AggregatingTransformParamsPtr params)
|
||||
: IProcessor(InputPorts(num_inputs, params->getHeader()), {params->getHeader()})
|
||||
, num_inputs(num_inputs)
|
||||
, params(std::move(params))
|
||||
, last_bucket_number(num_inputs, -1)
|
||||
{
|
||||
}
|
||||
|
||||
bool SortingAggregatedTransform::tryPushChunk()
|
||||
{
|
||||
auto & output = outputs.front();
|
||||
|
||||
UInt32 min_bucket = last_bucket_number[0];
|
||||
for (auto & bucket : last_bucket_number)
|
||||
min_bucket = std::min<UInt32>(min_bucket, bucket);
|
||||
|
||||
auto it = chunks.find(min_bucket);
|
||||
if (it != chunks.end())
|
||||
{
|
||||
output.push(std::move(it->second));
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void SortingAggregatedTransform::addChunk(Chunk chunk)
|
||||
{
|
||||
auto & info = chunk.getChunkInfo();
|
||||
if (!info)
|
||||
throw Exception("Chunk info was not set for chunk in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
|
||||
if (!agg_info)
|
||||
throw Exception("Chunk should have AggregatedChunkInfo in GroupingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
Int32 bucket = agg_info->bucket_num;
|
||||
bool is_overflows = agg_info->is_overflows;
|
||||
|
||||
if (is_overflows)
|
||||
overflow_chunk = std::move(chunk);
|
||||
else
|
||||
chunks[bucket] = std::move(chunk);
|
||||
}
|
||||
|
||||
IProcessor::Status SortingAggregatedTransform::prepare()
|
||||
{
|
||||
/// Check can output.
|
||||
auto & output = outputs.front();
|
||||
|
||||
if (output.isFinished())
|
||||
{
|
||||
for (auto & input : inputs)
|
||||
input.close();
|
||||
|
||||
chunks.clear();
|
||||
last_bucket_number.clear();
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
/// Check can push (to avoid data caching).
|
||||
if (!output.canPush())
|
||||
{
|
||||
for (auto & input : inputs)
|
||||
input.setNotNeeded();
|
||||
|
||||
return Status::PortFull;
|
||||
}
|
||||
|
||||
/// Push if have min version.
|
||||
bool pushed_to_output = tryPushChunk();
|
||||
|
||||
bool need_data = false;
|
||||
bool all_finished = true;
|
||||
|
||||
/// Try read anything.
|
||||
auto in = inputs.begin();
|
||||
for (size_t input_num = 0; input_num < num_inputs; ++input_num, ++in)
|
||||
{
|
||||
if (in->isFinished())
|
||||
continue;
|
||||
|
||||
all_finished = false;
|
||||
|
||||
in->setNeeded();
|
||||
|
||||
if (!in->hasData())
|
||||
{
|
||||
need_data = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
auto chunk = in->pull();
|
||||
addChunk(std::move(chunk));
|
||||
}
|
||||
|
||||
if (pushed_to_output)
|
||||
return Status::PortFull;
|
||||
|
||||
if (tryPushChunk())
|
||||
return Status::PortFull;
|
||||
|
||||
if (need_data)
|
||||
return Status::NeedData;
|
||||
|
||||
if (!all_finished)
|
||||
throw Exception("SortingAggregatedTransform has read bucket, but couldn't push it.",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (overflow_chunk)
|
||||
{
|
||||
output.push(std::move(overflow_chunk));
|
||||
return Status::PortFull;
|
||||
}
|
||||
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,134 @@
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <Interpreters/Aggregator.h>
|
||||
#include <Processors/ISimpleTransform.h>
|
||||
#include <Processors/Transforms/AggregatingTransform.h>
|
||||
#include <Processors/ResizeProcessor.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Has several inputs and single output.
|
||||
/// Read from inputs chunks with partially aggregated data, group them by bucket number
|
||||
/// and write data from single bucket as single chunk.
|
||||
class GroupingAggregatedTransform : public IProcessor
|
||||
{
|
||||
public:
|
||||
GroupingAggregatedTransform(const Block & header, size_t num_inputs, AggregatingTransformParamsPtr params);
|
||||
|
||||
/// Special setting: in case if single source can return several chunks with same bucket.
|
||||
void allowSeveralChunksForSingleBucketPerSource() { expect_several_chunks_for_single_bucket_per_source = true; }
|
||||
|
||||
protected:
|
||||
Status prepare() override;
|
||||
void work() override;
|
||||
|
||||
private:
|
||||
size_t num_inputs;
|
||||
AggregatingTransformParamsPtr params;
|
||||
|
||||
std::vector<Int32> last_bucket_number;
|
||||
std::map<Int32, Chunks> chunks;
|
||||
Chunks overflow_chunks;
|
||||
Chunks single_level_chunks;
|
||||
Int32 current_bucket = 0;
|
||||
Int32 next_bucket_to_push = 0; /// Always <= current_bucket.
|
||||
bool has_two_level = false;
|
||||
|
||||
bool all_inputs_finished = false;
|
||||
bool read_from_all_inputs = false;
|
||||
std::vector<bool> read_from_input;
|
||||
|
||||
bool expect_several_chunks_for_single_bucket_per_source = false;
|
||||
|
||||
void addChunk(Chunk chunk, size_t input);
|
||||
void readFromAllInputs();
|
||||
bool tryPushSingleLevelData();
|
||||
bool tryPushTwoLevelData();
|
||||
bool tryPushOverflowData();
|
||||
void pushData(Chunks chunks, Int32 bucket, bool is_overflows);
|
||||
};
|
||||
|
||||
/// Merge aggregated data from single bucket.
|
||||
class MergingAggregatedBucketTransform : public ISimpleTransform
|
||||
{
|
||||
public:
|
||||
explicit MergingAggregatedBucketTransform(AggregatingTransformParamsPtr params);
|
||||
|
||||
protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
|
||||
private:
|
||||
AggregatingTransformParamsPtr params;
|
||||
};
|
||||
|
||||
/// Has several inputs and single output.
|
||||
/// Read from inputs merged bucket with aggregated data, sort them by bucket number and write to output.
|
||||
/// Presumption: inputs return chunks with increasing bucket number, there is at most one chunk per bucket.
|
||||
class SortingAggregatedTransform : public IProcessor
|
||||
{
|
||||
public:
|
||||
SortingAggregatedTransform(size_t num_inputs, AggregatingTransformParamsPtr params);
|
||||
Status prepare() override;
|
||||
|
||||
private:
|
||||
size_t num_inputs;
|
||||
AggregatingTransformParamsPtr params;
|
||||
std::vector<Int32> last_bucket_number;
|
||||
std::map<Int32, Chunk> chunks;
|
||||
Chunk overflow_chunk;
|
||||
|
||||
bool tryPushChunk();
|
||||
void addChunk(Chunk chunk);
|
||||
};
|
||||
|
||||
/// Creates piece of pipeline which performs memory efficient merging of partially aggregated data from several sources.
|
||||
/// First processor will have num_inputs, last - single output. You should connect them to create pipeline.
|
||||
Processors createMergingAggregatedMemoryEfficientPipe(
|
||||
Block header,
|
||||
AggregatingTransformParamsPtr params,
|
||||
size_t num_inputs,
|
||||
size_t num_merging_processors)
|
||||
{
|
||||
Processors processors;
|
||||
processors.reserve(num_merging_processors + 2);
|
||||
|
||||
auto grouping = std::make_shared<GroupingAggregatedTransform>(header, num_inputs, params);
|
||||
processors.emplace_back(std::move(grouping));
|
||||
|
||||
if (num_merging_processors <= 1)
|
||||
{
|
||||
/// --> GroupingAggregated --> MergingAggregatedBucket -->
|
||||
auto transform = std::make_shared<MergingAggregatedBucketTransform>(params);
|
||||
connect(processors.back()->getOutputs().front(), transform->getInputPort());
|
||||
|
||||
processors.emplace_back(std::move(transform));
|
||||
return processors;
|
||||
}
|
||||
|
||||
/// --> --> MergingAggregatedBucket -->
|
||||
/// --> GroupingAggregated --> ResizeProcessor --> MergingAggregatedBucket --> SortingAggregated -->
|
||||
/// --> --> MergingAggregatedBucket -->
|
||||
|
||||
auto resize = std::make_shared<ResizeProcessor>(header, 1, num_merging_processors);
|
||||
connect(processors.back()->getOutputs().front(), resize->getInputs().front());
|
||||
processors.emplace_back(std::move(resize));
|
||||
|
||||
auto sorting = std::make_shared<SortingAggregatedTransform>(num_merging_processors, params);
|
||||
auto out = processors.back()->getOutputs().begin();
|
||||
auto in = sorting->getInputs().begin();
|
||||
|
||||
for (size_t i = 0; i < num_merging_processors; ++i, ++in, ++out)
|
||||
{
|
||||
auto transform = std::make_shared<MergingAggregatedBucketTransform>(params);
|
||||
connect(*out, transform->getInputPort());
|
||||
connect(transform->getOutputPort(), *in);
|
||||
processors.emplace_back(std::move(transform));
|
||||
}
|
||||
|
||||
processors.emplace_back(std::move(sorting));
|
||||
return processors;
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user