diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index 7deae77ee1a..65c6d21417b 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include namespace ProfileEvents { @@ -12,6 +14,74 @@ namespace ProfileEvents namespace DB { +namespace +{ + class SourceFromNativeStream : public ISource + { + public: + SourceFromNativeStream(const Block & header, const std::string & path) + : ISource(header), file_in(path), compressed_in(file_in) + , block_in(std::make_shared(compressed_in, header, 0)) + { + block_in->readPrefix(); + } + + String getName() const override { return "SourceFromNativeStream"; } + + Chunk generate() override + { + if (!block_in) + return {}; + + auto block = block_in->read(); + if (!block) + { + block_in->readSuffix(); + block_in.reset(); + return {}; + } + + UInt64 num_rows = block.rows(); + return Chunk(block.getColumns(), num_rows); + } + + private: + ReadBufferFromFile file_in; + CompressedReadBuffer compressed_in; + BlockInputStreamPtr block_in; + }; + + class SourceFromInputStream : public ISource + { + public: + SourceFromInputStream(Block header, BlockInputStreamPtr stream) + : ISource(std::move(header)), stream(std::move(stream)) {} + + String getName() const override { return "SourceFromInputStream"; } + + protected: + Chunk generate() override + { + auto block = stream->read(); + if (!block) + return {}; + + auto info = std::make_shared(); + info->bucket_num = block.info.bucket_num; + info->is_overflows = block.info.is_overflows; + + UInt64 num_rows = block.rows(); + Chunk chunk(block.getColumns(), num_rows); + chunk.setChunkInfo(std::move(info)); + + return chunk; + } + + private: + BlockInputStreamPtr stream; + }; +} + AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformParamsPtr params_) : AggregatingTransform(std::move(header), std::move(params_) , std::make_unique(1), 0, 1, 1) @@ -21,7 +91,7 @@ AggregatingTransform::AggregatingTransform(Block header, AggregatingTransformPar AggregatingTransform::AggregatingTransform( Block header, AggregatingTransformParamsPtr params_, ManyAggregatedDataPtr many_data_, size_t current_variant, size_t temporary_data_merge_threads, size_t max_threads) - : IAccumulatingTransform(std::move(header), params_->getHeader()), params(std::move(params_)) + : IProcessor({std::move(header)}, {params_->getHeader()}), params(std::move(params_)) , key(params->params.keys_size) , key_columns(params->params.keys_size) , aggregate_columns(params->params.aggregates_size) @@ -34,6 +104,89 @@ AggregatingTransform::AggregatingTransform( AggregatingTransform::~AggregatingTransform() = default; +IProcessor::Status AggregatingTransform::prepare() +{ + auto & output = outputs.front(); + /// Last output is current. All other outputs should already be closed. + auto & input = inputs.back(); + + /// Check can output. + if (output.isFinished()) + { + input.close(); + return Status::Finished; + } + + if (!output.canPush()) + { + input.setNotNeeded(); + return Status::PortFull; + } + + /// Finish data processing, prepare to generating. + if (!is_consume_finished && !is_generate_initialized) + return Status::Ready; + + if (is_generate_initialized && !is_pipeline_created) + return Status::ExpandPipeline; + + /// Only possible while consuming. + if (read_current_chunk) + return Status::Ready; + + /// Get chunk from input. + if (input.isFinished()) + { + if (is_consume_finished) + { + output.finish(); + return Status::Finished; + } + else + { + /// Finish data processing and create another pipe. + is_consume_finished = true; + return Status::Ready; + } + } + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + current_chunk = input.pull(); + read_current_chunk = true; + + if (is_consume_finished) + { + output.push(std::move(current_chunk)); + read_current_chunk = false; + return Status::PortFull; + } + + return Status::Ready; +} + +void AggregatingTransform::work() +{ + if (is_consume_finished) + initGenerate(); + else + { + consume(std::move(current_chunk)); + read_current_chunk = false; + } +} + +Processors AggregatingTransform::expandPipeline() +{ + auto & out = processors.back()->getOutputs().front(); + inputs.emplace_back(out.getHeader(), this); + connect(out, inputs.back()); + is_pipeline_created = true; + return std::move(processors); +} + void AggregatingTransform::consume(Chunk chunk) { LOG_TRACE(log, "Aggregating"); @@ -41,10 +194,10 @@ void AggregatingTransform::consume(Chunk chunk) src_rows += chunk.getNumRows(); src_bytes += chunk.bytes(); - auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); + auto block = getInputs().front().getHeader().cloneWithColumns(chunk.detachColumns()); if (!params->aggregator.executeOnBlock(block, variants, key_columns, aggregate_columns, key, no_more_keys)) - finishConsume(); + is_consume_finished = true; } void AggregatingTransform::initGenerate() @@ -55,7 +208,7 @@ void AggregatingTransform::initGenerate() /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. if (variants.empty() && params->params.keys_size == 0 && !params->params.empty_result_for_aggregation_by_empty_set) - params->aggregator.executeOnBlock(getInputPort().getHeader(), variants, key_columns, aggregate_columns, key, no_more_keys); + params->aggregator.executeOnBlock(getInputs().front().getHeader(), variants, key_columns, aggregate_columns, key, no_more_keys); double elapsed_seconds = watch.elapsedSeconds(); size_t rows = variants.sizeWithoutOverflowRow(); @@ -79,7 +232,8 @@ void AggregatingTransform::initGenerate() if (!params->aggregator.hasTemporaryFiles()) { - impl = params->aggregator.mergeAndConvertToBlocks(many_data->variants, params->final, max_threads); + auto stream = params->aggregator.mergeAndConvertToBlocks(many_data->variants, params->final, max_threads); + processors.emplace_back(std::make_shared(stream->getHeader(), std::move(stream))); } else { @@ -102,51 +256,28 @@ void AggregatingTransform::initGenerate() } } + auto & header = outputs.front().getHeader(); + const auto & files = params->aggregator.getTemporaryFiles(); BlockInputStreams input_streams; for (const auto & file : files.files) - { - temporary_inputs.emplace_back(std::make_unique(file->path())); - input_streams.emplace_back(temporary_inputs.back()->block_in); - } + processors.emplace_back(std::make_unique(header, file->path())); LOG_TRACE(log, "Will merge " << files.files.size() << " temporary files of size " << (files.sum_size_compressed / 1048576.0) << " MiB compressed, " << (files.sum_size_uncompressed / 1048576.0) << " MiB uncompressed."); - impl = std::make_unique( - input_streams, params->params, params->final, temporary_data_merge_threads, temporary_data_merge_threads); + auto pipe = createMergingAggregatedMemoryEfficientPipe( + header, params, files.files.size(), temporary_data_merge_threads); + + auto input = pipe.front()->getInputs().begin(); + for (auto & processor : processors) + connect(processor->getOutputs().front(), *(input++)); + + processors.insert(processors.end(), pipe.begin(), pipe.end()); } is_generate_initialized = true; } -Chunk AggregatingTransform::generate() -{ - if (!is_generate_initialized) - initGenerate(); - - if (!impl) - return {}; - - auto block = impl->read(); - if (!block) - return {}; - - - auto info = std::make_shared(); - info->bucket_num = block.info.bucket_num; - info->is_overflows = block.info.is_overflows; - - UInt64 num_rows = block.rows(); - Chunk chunk(block.getColumns(), num_rows); - chunk.setChunkInfo(std::move(info)); - - return chunk; -} - -AggregatingTransform::TemporaryFileStream::TemporaryFileStream(const std::string & path) - : file_in(path), compressed_in(file_in), - block_in(std::make_shared(compressed_in, ClickHouseRevision::get())) {} - } diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.h b/dbms/src/Processors/Transforms/AggregatingTransform.h index d4298285223..bfc370f014f 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.h +++ b/dbms/src/Processors/Transforms/AggregatingTransform.h @@ -41,10 +41,10 @@ struct ManyAggregatedData } }; -using AggregatingTransformParamsPtr = std::unique_ptr; +using AggregatingTransformParamsPtr = std::shared_ptr; using ManyAggregatedDataPtr = std::shared_ptr; -class AggregatingTransform : public IAccumulatingTransform +class AggregatingTransform : public IProcessor { public: AggregatingTransform(Block header, AggregatingTransformParamsPtr params_); @@ -56,21 +56,16 @@ public: ~AggregatingTransform() override; String getName() const override { return "AggregatingTransform"; } + Status prepare() override; + void work() override; + Processors expandPipeline() override; protected: - void consume(Chunk chunk) override; - Chunk generate() override; + void consume(Chunk chunk); private: /// To read the data that was flushed into the temporary data file. - struct TemporaryFileStream - { - ReadBufferFromFile file_in; - CompressedReadBuffer compressed_in; - BlockInputStreamPtr block_in; - - explicit TemporaryFileStream(const std::string & path); - }; + Processors processors; AggregatingTransformParamsPtr params; Logger * log = &Logger::get("AggregatingTransform"); @@ -85,9 +80,6 @@ private: size_t max_threads = 1; size_t temporary_data_merge_threads = 1; - std::vector> temporary_inputs; - std::unique_ptr impl; - /// TODO: calculate time only for aggregation. Stopwatch watch; @@ -95,6 +87,11 @@ private: UInt64 src_bytes = 0; bool is_generate_initialized = false; + bool is_consume_finished = false; + bool is_pipeline_created = false; + + Chunk current_chunk; + bool read_current_chunk = false; void initGenerate(); };