From ee22a3ad30952b0c523ebbf6e160315bf424fd8a Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 21 May 2020 23:34:53 +0300 Subject: [PATCH] Adding Info in FinalizingSimpleTransform --- src/Interpreters/InterpreterSelectQuery.cpp | 11 +++------ .../AggregatingInOrderTransform.cpp | 24 +++++++++---------- .../Transforms/AggregatingInOrderTransform.h | 16 ++++++++++--- .../Transforms/AggregatingTransform.h | 3 +-- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index a2ccf7d3a3a..283767ac500 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1749,7 +1749,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const /// Forget about current totals and extremes. They will be calculated again after aggregation if needed. pipeline.dropTotalsAndExtremes(); - /// TODO better case determination if (group_by_info && settings.optimize_aggregation_in_order) { auto & query = getSelectQuery(); @@ -1771,7 +1770,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const return std::make_shared(header, transform_params, group_by_descr, settings.max_block_size, many_data, counter++); }); - /// TODO remove code duplication for (auto & column_description : group_by_descr) { if (!column_description.column_name.empty()) @@ -1797,13 +1795,10 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const }); } - if (final) + pipeline.addSimpleTransform([&](const Block & header) { - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, transform_params); - }); - } + return std::make_shared(header, transform_params); + }); pipeline.enableQuotaForCurrentStreams(); return; diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index 5ebfb740c6b..58620fd3355 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -16,7 +16,7 @@ AggregatingInOrderTransform::AggregatingInOrderTransform( Block header, AggregatingTransformParamsPtr params_, const SortDescription & group_by_description_, size_t res_block_size_, ManyAggregatedDataPtr many_data_, size_t current_variant) - : IProcessor({std::move(header)}, {params_->getHeader(false)}) + : IProcessor({std::move(header)}, {params_->getCustomHeader(false)}) , res_block_size(res_block_size_) , params(std::move(params_)) , group_by_description(group_by_description_) @@ -24,7 +24,8 @@ AggregatingInOrderTransform::AggregatingInOrderTransform( , many_data(std::move(many_data_)) , variants(*many_data->variants[current_variant]) { - res_header = params->getHeader(false); + /// We won't finalize states in order to merge same states (generated due to multi-thread execution) in AggregatingSortedTransform + res_header = params->getCustomHeader(false); /// Replace column names to column position in description_sorted. for (auto & column_description : group_by_description) @@ -56,7 +57,6 @@ static bool less(const MutableColumns & lhs, const Columns & rhs, size_t i, size void AggregatingInOrderTransform::consume(Chunk chunk) { - /// Find the position of last already read key in current chunk. size_t rows = chunk.getNumRows(); if (rows == 0) return; @@ -75,7 +75,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk) size_t key_end = 0; size_t key_begin = 0; - + /// If we don't have a block we create it and fill with first key if (!cur_block_size) { res_key_columns.resize(params->params.keys_size); @@ -85,7 +85,6 @@ void AggregatingInOrderTransform::consume(Chunk chunk) { res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn(); } - for (size_t i = 0; i < params->params.aggregates_size; ++i) { res_aggregate_columns[i] = res_header.safeGetByPosition(i + params->params.keys_size).type->createColumn(); @@ -96,11 +95,11 @@ void AggregatingInOrderTransform::consume(Chunk chunk) size_t mid = 0; size_t high = 0; size_t low = -1; - + /// Will split block into segments with the same key while (key_end != rows) { high = rows; - /// Find the first position of new key in current chunk + /// Find the first position of new (not current) key in current chunk while (high - low > 1) { mid = (low + high) / 2; @@ -110,20 +109,18 @@ void AggregatingInOrderTransform::consume(Chunk chunk) high = mid; } key_end = high; - + /// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block. if (key_begin != key_end) { - /// Add data to the state if segment is not empty (Empty when we were looking for last key in new block and haven't found it) params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool); } low = key_begin = key_end; - + /// We finalize last key aggregation state if a new key found. if (key_begin != rows) { - /// We finalize last key aggregation states if a new key found (Not found if high == rows) params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns); - + /// If res_block_size is reached we have to stop consuming and generate the block. Save the extra rows into new chunk. if (cur_block_size == res_block_size) { Columns source_columns = chunk.detachColumns(); @@ -159,7 +156,7 @@ void AggregatingInOrderTransform::work() } } -/// TODO simplify prepare + IProcessor::Status AggregatingInOrderTransform::prepare() { auto & output = outputs.front(); @@ -196,6 +193,7 @@ IProcessor::Status AggregatingInOrderTransform::prepare() { output.push(std::move(to_push_chunk)); output.finish(); + LOG_TRACE(log, "Aggregated"); return Status::Finished; } if (input.isFinished()) diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.h b/src/Processors/Transforms/AggregatingInOrderTransform.h index 00e6f666ed7..08737d02706 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.h +++ b/src/Processors/Transforms/AggregatingInOrderTransform.h @@ -63,15 +63,25 @@ private: class FinalizingSimpleTransform : public ISimpleTransform { public: - FinalizingSimpleTransform(Block header, AggregatingTransformParamsPtr params) - : ISimpleTransform({std::move(header)}, {params->getHeader(true)}, true) {} + FinalizingSimpleTransform(Block header, AggregatingTransformParamsPtr params_) + : ISimpleTransform({std::move(header)}, {params_->getHeader()}, true) + , params(params_) {} void transform(Chunk & chunk) override { - finalizeChunk(chunk); + if (!chunk.getChunkInfo()) + { + auto info = std::make_shared(); + chunk.setChunkInfo(std::move(info)); + } + if (params->final) + finalizeChunk(chunk); } String getName() const override { return "FinalizingSimpleTransform"; } + +private: + AggregatingTransformParamsPtr params; }; diff --git a/src/Processors/Transforms/AggregatingTransform.h b/src/Processors/Transforms/AggregatingTransform.h index a14067a8e18..006e7c6226d 100644 --- a/src/Processors/Transforms/AggregatingTransform.h +++ b/src/Processors/Transforms/AggregatingTransform.h @@ -29,8 +29,7 @@ struct AggregatingTransformParams Block getHeader() const { return aggregator.getHeader(final); } - /// TODO remove that logic - Block getHeader(bool final_) const { return aggregator.getHeader(final_); } + Block getCustomHeader(bool final_) const { return aggregator.getHeader(final_); } }; struct ManyAggregatedData