Updated AggregatingTransform.

This commit is contained in:
Nikolai Kochetov 2019-03-07 18:43:39 +03:00
parent 6cedacb055
commit f08a10542e
2 changed files with 182 additions and 54 deletions

View File

@ -3,6 +3,8 @@
#include <Common/ClickHouseRevision.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <Processors/ISource.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
namespace ProfileEvents
{
@ -12,6 +14,74 @@ namespace ProfileEvents
namespace DB
{
namespace
{
class SourceFromNativeStream : public ISource
{
public:
SourceFromNativeStream(const Block & header, const std::string & path)
: ISource(header), file_in(path), compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header, 0))
{
block_in->readPrefix();
}
String getName() const override { return "SourceFromNativeStream"; }
Chunk generate() override
{
if (!block_in)
return {};
auto block = block_in->read();
if (!block)
{
block_in->readSuffix();
block_in.reset();
return {};
}
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
private:
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
};
class SourceFromInputStream : public ISource
{
public:
SourceFromInputStream(Block header, BlockInputStreamPtr stream)
: ISource(std::move(header)), stream(std::move(stream)) {}
String getName() const override { return "SourceFromInputStream"; }
protected:
Chunk generate() override
{
auto block = stream->read();
if (!block)
return {};
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
private:
BlockInputStreamPtr stream;
};
}
AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformParamsPtr params_)
: AggregatingTransform(std::move(header), std::move(params_)
, std::make_unique<ManyAggregatedData>(1), 0, 1, 1)
@ -21,7 +91,7 @@ AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformPar
AggregatingTransform::AggregatingTransform(
Block header, AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_,
size_t current_variant, size_t temporary_data_merge_threads, size_t max_threads)
: IAccumulatingTransform(std::move(header), params_->getHeader()), params(std::move(params_))
: IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_))
, key(params->params.keys_size)
, key_columns(params->params.keys_size)
, aggregate_columns(params->params.aggregates_size)
@ -34,6 +104,89 @@ AggregatingTransform::AggregatingTransform(
AggregatingTransform::~AggregatingTransform() = default;
IProcessor::Status AggregatingTransform::prepare()
{
auto & output = outputs.front();
/// Last output is current. All other outputs should already be closed.
auto & input = inputs.back();
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Finish data processing, prepare to generating.
if (!is_consume_finished && !is_generate_initialized)
return Status::Ready;
if (is_generate_initialized && !is_pipeline_created)
return Status::ExpandPipeline;
/// Only possible while consuming.
if (read_current_chunk)
return Status::Ready;
/// Get chunk from input.
if (input.isFinished())
{
if (is_consume_finished)
{
output.finish();
return Status::Finished;
}
else
{
/// Finish data processing and create another pipe.
is_consume_finished = true;
return Status::Ready;
}
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
current_chunk = input.pull();
read_current_chunk = true;
if (is_consume_finished)
{
output.push(std::move(current_chunk));
read_current_chunk = false;
return Status::PortFull;
}
return Status::Ready;
}
void AggregatingTransform::work()
{
if (is_consume_finished)
initGenerate();
else
{
consume(std::move(current_chunk));
read_current_chunk = false;
}
}
Processors AggregatingTransform::expandPipeline()
{
auto & out = processors.back()->getOutputs().front();
inputs.emplace_back(out.getHeader(), this);
connect(out, inputs.back());
is_pipeline_created = true;
return std::move(processors);
}
void AggregatingTransform::consume(Chunk chunk)
{
LOG_TRACE(log, "Aggregating");
@ -41,10 +194,10 @@ void AggregatingTransform::consume(Chunk chunk)
src_rows += chunk.getNumRows();
src_bytes += chunk.bytes();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns());
if (!params->aggregator.executeOnBlock(block, variants, key_columns, aggregate_columns, key, no_more_keys))
finishConsume();
is_consume_finished = true;
}
void AggregatingTransform::initGenerate()
@ -55,7 +208,7 @@ void AggregatingTransform::initGenerate()
/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set)
params->aggregator.executeOnBlock(getInputPort().getHeader(), variants, key_columns, aggregate_columns, key, no_more_keys);
params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, key, no_more_keys);
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = variants.sizeWithoutOverflowRow();
@ -79,7 +232,8 @@ void AggregatingTransform::initGenerate()
if (!params->aggregator.hasTemporaryFiles())
{
impl = params->aggregator.mergeAndConvertToBlocks(many_data->variants, params->final, max_threads);
auto stream = params->aggregator.mergeAndConvertToBlocks(many_data->variants, params->final, max_threads);
processors.emplace_back(std::make_shared<SourceFromInputStream>(stream->getHeader(), std::move(stream)));
}
else
{
@ -102,51 +256,28 @@ void AggregatingTransform::initGenerate()
}
}
auto & header = outputs.front().getHeader();
const auto & files = params->aggregator.getTemporaryFiles();
BlockInputStreams input_streams;
for (const auto & file : files.files)
{
temporary_inputs.emplace_back(std::make_unique<TemporaryFileStream>(file->path()));
input_streams.emplace_back(temporary_inputs.back()->block_in);
}
processors.emplace_back(std::make_unique<SourceFromNativeStream>(header, file->path()));
LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size "
<< (files.sum_size_compressed / 1048576.0) << " MiB compressed, "
<< (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed.");
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(
input_streams, params->params, params->final, temporary_data_merge_threads, temporary_data_merge_threads);
auto pipe = createMergingAggregatedMemoryEfficientPipe(
header, params, files.files.size(), temporary_data_merge_threads);
auto input = pipe.front()->getInputs().begin();
for (auto & processor : processors)
connect(processor->getOutputs().front(), *(input++));
processors.insert(processors.end(), pipe.begin(), pipe.end());
}
is_generate_initialized = true;
}
Chunk AggregatingTransform::generate()
{
if (!is_generate_initialized)
initGenerate();
if (!impl)
return {};
auto block = impl->read();
if (!block)
return {};
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
AggregatingTransform::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
}

View File

@ -41,10 +41,10 @@ struct ManyAggregatedData
}
};
using AggregatingTransformParamsPtr = std::unique_ptr<AggregatingTransformParams>;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
using ManyAggregatedDataPtr = std::shared_ptr<ManyAggregatedData>;
class AggregatingTransform : public IAccumulatingTransform
class AggregatingTransform : public IProcessor
{
public:
AggregatingTransform(Block header, AggregatingTransformParamsPtr params_);
@ -56,21 +56,16 @@ public:
~AggregatingTransform() override;
String getName() const override { return "AggregatingTransform"; }
Status prepare() override;
void work() override;
Processors expandPipeline() override;
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
void consume(Chunk chunk);
private:
/// To read the data that was flushed into the temporary data file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
explicit TemporaryFileStream(const std::string & path);
};
Processors processors;
AggregatingTransformParamsPtr params;
Logger * log = &Logger::get("AggregatingTransform");
@ -85,9 +80,6 @@ private:
size_t max_threads = 1;
size_t temporary_data_merge_threads = 1;
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
std::unique_ptr<IBlockInputStream> impl;
/// TODO: calculate time only for aggregation.
Stopwatch watch;
@ -95,6 +87,11 @@ private:
UInt64 src_bytes = 0;
bool is_generate_initialized = false;
bool is_consume_finished = false;
bool is_pipeline_created = false;
Chunk current_chunk;
bool read_current_chunk = false;
void initGenerate();
};