From 100fe4c93cb4281d99c590084a52d61ffe4e2b97 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 19 Mar 2020 16:45:19 +0300 Subject: [PATCH] Fix build. --- .../Processors/Executors/PipelineExecutor.cpp | 2 - .../TreeExecutorBlockInputStream.cpp | 66 +++++++++++-------- .../Executors/TreeExecutorBlockInputStream.h | 4 +- 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/dbms/src/Processors/Executors/PipelineExecutor.cpp b/dbms/src/Processors/Executors/PipelineExecutor.cpp index fc33cbbfb3a..ea7478e3f80 100644 --- a/dbms/src/Processors/Executors/PipelineExecutor.cpp +++ b/dbms/src/Processors/Executors/PipelineExecutor.cpp @@ -488,8 +488,6 @@ void PipelineExecutor::execute(size_t num_threads) if (!all_processors_finished) throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR); - -std::cerr << dumpPipeline() << std::endl; } void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads) diff --git a/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp b/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp index d08edeaea80..52a77a6eda5 100644 --- a/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp +++ b/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.cpp @@ -120,6 +120,8 @@ void TreeExecutorBlockInputStream::init() connect(*totals_port, *input_totals_port); input_totals_port->setNeeded(); } + + initRowsBeforeLimit(); } void TreeExecutorBlockInputStream::execute(bool on_totals) @@ -206,42 +208,45 @@ void TreeExecutorBlockInputStream::execute(bool on_totals) } } -void TreeExecutorBlockInputStream::calcRowsBeforeLimit() +void TreeExecutorBlockInputStream::initRowsBeforeLimit() { - std::stack stack; - stack.push(root); + std::vector limits; + std::vector sources; - size_t rows_before_limit = 0; - bool has_limit = false; + struct StackEntry + { + IProcessor * processor; + bool visited_limit; + }; + + std::stack stack; + stack.push({root, false}); while (!stack.empty()) { - auto processor = stack.top(); + auto processor = stack.top().processor; + bool visited_limit = stack.top().visited_limit; stack.pop(); - if (auto * limit = typeid_cast(processor)) + if (!visited_limit) { - has_limit = true; - rows_before_limit += limit->getRowsBeforeLimitAtLeast(); - } - if (auto * source = typeid_cast(processor)) - { - if (auto & stream = source->getStream()) + if (auto * limit = typeid_cast(processor)) { - auto & profile_info = stream->getProfileInfo(); - if (profile_info.hasAppliedLimit()) - { - has_limit = true; - rows_before_limit += profile_info.getRowsBeforeLimit(); - } + visited_limit = true; + limits.emplace_back(limit); } + + if (auto * source = typeid_cast(processor)) + sources.emplace_back(source); } - if (auto * sorting = typeid_cast(processor)) + if (auto * sorting = typeid_cast(processor)) { - rows_before_limit += sorting->getNumReadRows(); - has_limit = true; + if (!rows_before_limit_at_least) + rows_before_limit_at_least = std::make_shared(); + + sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least); /// Don't go to children. Take rows_before_limit from last PartialSortingTransform. continue; @@ -250,12 +255,20 @@ void TreeExecutorBlockInputStream::calcRowsBeforeLimit() for (auto & child_port : processor->getInputs()) { auto * child_processor = &child_port.getOutputPort().getProcessor(); - stack.push(child_processor); + stack.push({child_processor, visited_limit}); } } - if (has_limit) - info.setRowsBeforeLimit(rows_before_limit); + if (!rows_before_limit_at_least && (!limits.empty() && !sources.empty())) + { + rows_before_limit_at_least = std::make_shared(); + + for (auto & limit : limits) + limit->setRowsBeforeLimitCounter(rows_before_limit_at_least); + + for (auto & source : sources) + source->setRowsBeforeLimitCounter(rows_before_limit_at_least); + } } Block TreeExecutorBlockInputStream::readImpl() @@ -271,7 +284,8 @@ Block TreeExecutorBlockInputStream::readImpl() totals = getHeader().cloneWithColumns(input_totals_port->pull().detachColumns()); } - calcRowsBeforeLimit(); + if (rows_before_limit_at_least && rows_before_limit_at_least->hasAppliedLimit()) + info.setRowsBeforeLimit(rows_before_limit_at_least->get()); return {}; } diff --git a/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.h b/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.h index 8170d8fdb50..3ab8dde6948 100644 --- a/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.h +++ b/dbms/src/Processors/Executors/TreeExecutorBlockInputStream.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include namespace DB { @@ -55,6 +56,7 @@ private: IProcessor * root = nullptr; std::unique_ptr input_port; std::unique_ptr input_totals_port; + RowsBeforeLimitCounterPtr rows_before_limit_at_least; /// Remember sources that support progress. std::vector sources_with_progress; @@ -65,7 +67,7 @@ private: /// Execute tree step-by-step until root returns next chunk or execution is finished. void execute(bool on_totals); - void calcRowsBeforeLimit(); + void initRowsBeforeLimit(); /// Moved from pipe. std::vector> interpreter_context;