From 0286b60ed6ac26fb8caa5509ee7802d77ecd7526 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 8 May 2020 22:46:52 +0300 Subject: [PATCH] return multiple blocks --- src/Interpreters/InterpreterSelectQuery.cpp | 2 +- .../AggregatingInOrderTransform.cpp | 195 +++++++++--------- .../Transforms/AggregatingInOrderTransform.h | 10 +- src/Storages/ReadInOrderOptimizer.cpp | 6 +- 4 files changed, 113 insertions(+), 100 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index a5a409d0f1d..8066a4e4c4a 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1752,7 +1752,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const pipeline.resize(1); pipeline.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, transform_params, group_by_descr, group_by_descr); + return std::make_shared(header, transform_params, group_by_descr, group_by_descr, settings.max_block_size); }); pipeline.enableQuotaForCurrentStreams(); diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.cpp b/src/Processors/Transforms/AggregatingInOrderTransform.cpp index c75aff97938..3030fccc431 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.cpp +++ b/src/Processors/Transforms/AggregatingInOrderTransform.cpp @@ -6,8 +6,9 @@ namespace DB AggregatingInOrderTransform::AggregatingInOrderTransform( Block header, AggregatingTransformParamsPtr params_, SortDescription & sort_description_, - SortDescription & group_by_description_) + SortDescription & group_by_description_, size_t max_block_size_) : IProcessor({std::move(header)}, {params_->getHeader()}) + , max_block_size(max_block_size_) , params(std::move(params_)) , sort_description(sort_description_) , group_by_description(group_by_description_) @@ -15,8 +16,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform( , many_data(std::make_shared(1)) , variants(*many_data->variants[0]) { -// std::cerr << "AggregatingInOrderTransform\n"; - Block res_header = params->getHeader(); /// Replace column names to column position in description_sorted. @@ -28,18 +27,6 @@ AggregatingInOrderTransform::AggregatingInOrderTransform( column_description.column_name.clear(); } } - res_key_columns.resize(params->params.keys_size); - res_aggregate_columns.resize(params->params.aggregates_size); - - for (size_t i = 0; i < params->params.keys_size; ++i) - { - res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn(); - } - - for (size_t i = 0; i < params->params.aggregates_size; ++i) - { - res_aggregate_columns[i] = params->aggregator.aggregate_functions[i]->getReturnType()->createColumn(); - } } AggregatingInOrderTransform::~AggregatingInOrderTransform() = default; @@ -58,12 +45,9 @@ static bool less(const MutableColumns & lhs, const Columns & rhs, size_t i, size return false; } -/// TODO maybe move all things inside the Aggregator? void AggregatingInOrderTransform::consume(Chunk chunk) { -// std::cerr << "\nchunk " << x++ << " of size " << chunk.getNumRows() << "\n"; -// sz += chunk.getNumRows(); /// Find the position of last already read key in current chunk. size_t rows = chunk.getNumRows(); @@ -79,15 +63,25 @@ void AggregatingInOrderTransform::consume(Chunk chunk) } Aggregator::AggregateFunctionInstructions aggregate_function_instructions; - params->aggregator.prepareAggregateInstructions(chunk.detachColumns(), aggregate_columns, materialized_columns, aggregate_function_instructions); + params->aggregator.prepareAggregateInstructions(chunk.getColumns(), aggregate_columns, materialized_columns, aggregate_function_instructions); size_t key_end = 0; size_t key_begin = 0; if (!res_block_size) { -// std::cerr << "Creating first state with key " << key_begin << "\n"; - LOG_TRACE(log, "AggregatingInOrder"); + res_key_columns.resize(params->params.keys_size); + res_aggregate_columns.resize(params->params.aggregates_size); + + for (size_t i = 0; i < params->params.keys_size; ++i) + { + res_key_columns[i] = params->getHeader().safeGetByPosition(i).type->createColumn(); + } + + for (size_t i = 0; i < params->params.aggregates_size; ++i) + { + res_aggregate_columns[i] = params->aggregator.aggregate_functions[i]->getReturnType()->createColumn(); + } params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns); ++res_block_size; } @@ -98,27 +92,19 @@ void AggregatingInOrderTransform::consume(Chunk chunk) while (key_end != rows) { high = rows; - /// Find the first position of new key in current chunk while (high - low > 1) { mid = (low + high) / 2; -// std::cerr << "Comparing last key and row " << mid << "\n"; if (!less(res_key_columns, key_columns, res_block_size - 1, mid, group_by_description)) - { low = mid; - } else - { high = mid; - } } - key_end = high; if (key_begin != key_end) { -// std::cerr << "Executing from " << key_begin << " to " << key_end << "\n"; /// Add data to the state if segment is not empty (Empty when we were looking for last key in new block and haven't found it) params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool); } @@ -127,19 +113,98 @@ void AggregatingInOrderTransform::consume(Chunk chunk) if (key_begin != rows) { -// std::cerr << "Finalizing the last state.\n"; /// We finalize last key aggregation states if a new key found (Not found if high == rows) params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns); -// std::cerr << "Creating state with key " << key_begin << "\n"; + if (res_block_size == max_block_size) { + Columns source_columns = chunk.detachColumns(); + + for (auto & source_column : source_columns) + source_column = source_column->cut(key_begin, rows - key_begin); + + current_chunk = Chunk(source_columns, rows - key_begin); + block_end_reached = true; + need_generate = true; + res_block_size = 0; + return; + } + /// We create a new state for the new key and update res_key_columns params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns); ++res_block_size; } } - + block_end_reached = false; } + +void AggregatingInOrderTransform::work() +{ + if (is_consume_finished || need_generate) + { + generate(); + } + else + { + consume(std::move(current_chunk)); + } +} + +/// TODO less complicated +IProcessor::Status AggregatingInOrderTransform::prepare() +{ + auto & output = outputs.front(); + auto & input = inputs.back(); + + /// Check can output. + if (output.isFinished()) + { + input.close(); + return Status::Finished; + } + + if (!output.canPush()) + { + input.setNotNeeded(); + return Status::PortFull; + } + + if (block_end_reached) + { + if (need_generate) + { + return Status::Ready; + } + else + { + output.push(std::move(to_push_chunk)); + return Status::Ready; + } + } + if (!block_end_reached) + { + if (is_consume_finished) + { + output.push(std::move(to_push_chunk)); + output.finish(); + return Status::Finished; + } + if (input.isFinished()) + { + is_consume_finished = true; + return Status::Ready; + } + } + if (!input.hasData()) + { + input.setNeeded(); + return Status::NeedData; + } + current_chunk = input.pull(!is_consume_finished); + return Status::Ready; +} + + /// Convert block to chunk. /// Adds additional info about aggregation. Chunk convertToChunk(const Block & block) @@ -155,71 +220,10 @@ Chunk convertToChunk(const Block & block) return chunk; } -void AggregatingInOrderTransform::work() -{ - if (is_consume_finished) - { - generate(); - } - else - { - consume(std::move(current_chunk)); - } -} - - -IProcessor::Status AggregatingInOrderTransform::prepare() -{ - auto & output = outputs.front(); - - /// Last output is current. All other outputs should already be closed. - auto & input = inputs.back(); - - /// Check can output. - if (output.isFinished()) - { - input.close(); - return Status::Finished; - } - - if (!output.canPush()) - { - input.setNotNeeded(); - return Status::PortFull; - } - - /// Get chunk from input. - if (input.isFinished() && !is_consume_finished) - { - is_consume_finished = true; - return Status::Ready; - } - - if (is_consume_finished) - { - /// TODO many blocks - output.push(std::move(current_chunk)); - output.finish(); - return Status::Finished; - } - - if (!input.hasData()) - { - input.setNeeded(); - return Status::NeedData; - } - - current_chunk = input.pull(); - return Status::Ready; -} - void AggregatingInOrderTransform::generate() { -// std::cerr << sz << "\n"; -// std::cerr << "\nFinalizing the last state in generate().\n"; - - if (res_block_size) + if (res_block_size && is_consume_finished) params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns); LOG_TRACE(log, "Aggregated"); @@ -233,7 +237,8 @@ void AggregatingInOrderTransform::generate() { res.getByPosition(i + res_key_columns.size()).column = std::move(res_aggregate_columns[i]); } - current_chunk = convertToChunk(res); + to_push_chunk = convertToChunk(res); + need_generate = false; } } diff --git a/src/Processors/Transforms/AggregatingInOrderTransform.h b/src/Processors/Transforms/AggregatingInOrderTransform.h index 5928ab97972..9b919c00bd8 100644 --- a/src/Processors/Transforms/AggregatingInOrderTransform.h +++ b/src/Processors/Transforms/AggregatingInOrderTransform.h @@ -11,8 +11,8 @@ class AggregatingInOrderTransform : public IProcessor { public: - AggregatingInOrderTransform(Block header, AggregatingTransformParamsPtr params, - SortDescription & sort_description, SortDescription & group_by_description); + AggregatingInOrderTransform(Block header, AggregatingTransformParamsPtr params, SortDescription & sort_description, + SortDescription & group_by_description, size_t max_block_size); ~AggregatingInOrderTransform() override; @@ -29,7 +29,8 @@ private: // size_t x = 1; // size_t sz = 0; - size_t res_block_size{}; + size_t max_block_size; + size_t res_block_size = 0; MutableColumns res_key_columns; MutableColumns res_aggregate_columns; @@ -44,9 +45,12 @@ private: ManyAggregatedDataPtr many_data; AggregatedDataVariants & variants; + bool need_generate = false; + bool block_end_reached = false; bool is_consume_finished = false; Chunk current_chunk; + Chunk to_push_chunk; Logger * log = &Logger::get("AggregatingInOrderTransform"); }; diff --git a/src/Storages/ReadInOrderOptimizer.cpp b/src/Storages/ReadInOrderOptimizer.cpp index e164f1928cf..5bbe5be9928 100644 --- a/src/Storages/ReadInOrderOptimizer.cpp +++ b/src/Storages/ReadInOrderOptimizer.cpp @@ -55,6 +55,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora int read_direction = required_sort_description.at(0).direction; size_t prefix_size = std::min(required_sort_description.size(), sorting_key_columns.size()); + for (size_t i = 0; i < prefix_size; ++i) { if (forbidden_columns.count(required_sort_description[i].column_name)) @@ -71,7 +72,6 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora bool found_function = false; for (const auto & action : elements_actions[i]->getActions()) { - std::cerr << action.toString() << "\n"; if (action.type != ExpressionAction::APPLY_FUNCTION) continue; @@ -82,6 +82,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora } else found_function = true; + if (action.argument_names.size() != 1 || action.argument_names.at(0) != sorting_key_columns[i]) { current_direction = 0; @@ -94,6 +95,7 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora current_direction = 0; break; } + auto monotonicity = func.getMonotonicityForRange(*func.getArgumentTypes().at(0), {}, {}); if (!monotonicity.is_monotonic) { @@ -106,8 +108,10 @@ InputSortingInfoPtr ReadInOrderOptimizer::getInputOrder(const StoragePtr & stora if (!found_function) current_direction = 0; + if (!current_direction || (i > 0 && current_direction != read_direction)) break; + if (i == 0) read_direction = current_direction;