From 43ff4e669c7c01a40bb1b843a0a6b7da4287723a Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 11 Mar 2020 15:19:30 +0300 Subject: [PATCH 01/20] Use common read roes counter for all pre-limit streams. --- dbms/src/Interpreters/InterpreterSelectQuery.cpp | 11 ++++++++--- dbms/src/Processors/LimitTransform.cpp | 15 +++++++++------ dbms/src/Processors/LimitTransform.h | 13 +++++++++++-- dbms/src/Storages/System/StorageSystemNumbers.cpp | 2 +- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 8285978c7bd..f1cf699aea3 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -982,7 +982,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS executePreLimit(pipeline); } - bool need_merge_streams = need_second_distinct_pass || query.limitLength() || query.limitBy(); + bool need_merge_streams = need_second_distinct_pass || query.limitBy() + || (!pipeline_with_processors && query.limitLength()); /// Don't merge streams for pre-limit more. if constexpr (!pipeline_with_processors) if (pipeline.hasDelayedStream()) @@ -2265,13 +2266,17 @@ void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline) /// If there is LIMIT if (query.limitLength()) { + LimitTransform::LimitStatePtr state; + if (pipeline.getNumStreams() > 1) + state = std::make_shared(); + auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context); pipeline.addSimpleTransform([&, limit = limit_length + limit_offset](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr { if (stream_type == QueryPipeline::StreamType::Totals) return nullptr; - return std::make_shared(header, limit, 0); + return std::make_shared(header, limit, 0, state); }); } } @@ -2474,7 +2479,7 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline) return nullptr; return std::make_shared( - header, limit_length, limit_offset, always_read_till_end, query.limit_with_ties, order_descr); + header, limit_length, limit_offset, nullptr, always_read_till_end, query.limit_with_ties, order_descr); }); } } diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index 5ef56554e3d..b1a100e3559 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -5,14 +5,14 @@ namespace DB { LimitTransform::LimitTransform( - const Block & header_, size_t limit_, size_t offset_, + const Block & header_, size_t limit_, size_t offset_, LimitStatePtr limit_state_, bool always_read_till_end_, bool with_ties_, - const SortDescription & description_) + SortDescription description_) : IProcessor({header_}, {header_}) , input(inputs.front()), output(outputs.front()) - , limit(limit_), offset(offset_) + , limit(limit_), offset(offset_), limit_state(std::move(limit_state_)) , always_read_till_end(always_read_till_end_) - , with_ties(with_ties_), description(description_) + , with_ties(with_ties_), description(std::move(description_)) { for (const auto & desc : description) { @@ -113,8 +113,11 @@ LimitTransform::Status LimitTransform::prepare() } /// Process block. - - rows_read += rows; + if (limit_state) + /// Note: maybe memory_order_relaxed is enough. It is needed to be proven. + rows_read = limit_state->total_read_rows.fetch_add(rows, std::memory_order_acq_rel) + rows; + else + rows_read += rows; if (rows_read <= offset) { diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index f37bf8aba15..2472adb14d8 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -8,12 +8,21 @@ namespace DB class LimitTransform : public IProcessor { +public: + struct LimitState + { + std::atomic total_read_rows = 0; + }; + + using LimitStatePtr = std::shared_ptr; + private: InputPort & input; OutputPort & output; size_t limit; size_t offset; + LimitStatePtr limit_state; size_t rows_read = 0; /// including the last read block bool always_read_till_end; @@ -35,9 +44,9 @@ private: public: LimitTransform( - const Block & header_, size_t limit_, size_t offset_, + const Block & header_, size_t limit_, size_t offset_, LimitStatePtr limit_state_ = nullptr, bool always_read_till_end_ = false, bool with_ties_ = false, - const SortDescription & description_ = {}); + SortDescription description_ = {}); String getName() const override { return "Limit"; } diff --git a/dbms/src/Storages/System/StorageSystemNumbers.cpp b/dbms/src/Storages/System/StorageSystemNumbers.cpp index f5a5bb58b72..6e7959f455b 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/System/StorageSystemNumbers.cpp @@ -167,7 +167,7 @@ Pipes StorageSystemNumbers::read( { /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly. res.back().addSimpleTransform(std::make_shared( - res.back().getHeader(), *limit * (i + 1) / num_streams - *limit * i / num_streams, 0, false)); + res.back().getHeader(), *limit * (i + 1) / num_streams - *limit * i / num_streams, 0)); } } From 926724ffd5d57f74531babb321cbdc6c86e1ebdd Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 11 Mar 2020 16:07:38 +0300 Subject: [PATCH 02/20] Fix limit transform. --- dbms/src/Processors/LimitTransform.cpp | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index b1a100e3559..c1d4b0dbded 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -65,6 +65,10 @@ LimitTransform::Status LimitTransform::prepare() block_processed = false; } + /// Update the number of read rows. + if (limit_state) + rows_read = limit_state->total_read_rows.load(std::memory_order_acquire); + /// Check if we are done with pushing. bool pushing_is_finished = (rows_read >= offset + limit) && !previous_row_chunk; if (pushing_is_finished) @@ -95,15 +99,25 @@ LimitTransform::Status LimitTransform::prepare() auto rows = current_chunk.getNumRows(); rows_before_limit_at_least += rows; + if (limit_state) + /// Note: maybe memory_order_relaxed is enough. It is needed to be proven. + rows_read = limit_state->total_read_rows.fetch_add(rows, std::memory_order_acq_rel) + rows; + else + rows_read += rows; + + /// rows_read could be updated after previous load. Recheck flag again. + pushing_is_finished = (rows_read >= offset + limit) && !previous_row_chunk; + /// Skip block (for 'always_read_till_end' case). if (pushing_is_finished) { current_chunk.clear(); has_block = false; - if (input.isFinished()) + if (input.isFinished() || !always_read_till_end) { output.finish(); + input.close(); return Status::Finished; } @@ -113,12 +127,6 @@ LimitTransform::Status LimitTransform::prepare() } /// Process block. - if (limit_state) - /// Note: maybe memory_order_relaxed is enough. It is needed to be proven. - rows_read = limit_state->total_read_rows.fetch_add(rows, std::memory_order_acq_rel) + rows; - else - rows_read += rows; - if (rows_read <= offset) { current_chunk.clear(); From 325990caf9a4a77a75d1e75bb5ee49fe99328227 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 11 Mar 2020 16:17:50 +0300 Subject: [PATCH 03/20] Fil limit transform. --- dbms/src/Processors/LimitTransform.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index c1d4b0dbded..ff5a4847ea7 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -70,8 +70,8 @@ LimitTransform::Status LimitTransform::prepare() rows_read = limit_state->total_read_rows.load(std::memory_order_acquire); /// Check if we are done with pushing. - bool pushing_is_finished = (rows_read >= offset + limit) && !previous_row_chunk; - if (pushing_is_finished) + bool is_limit_reached = (rows_read >= offset + limit) && !previous_row_chunk; + if (is_limit_reached) { if (!always_read_till_end) { @@ -105,11 +105,11 @@ LimitTransform::Status LimitTransform::prepare() else rows_read += rows; - /// rows_read could be updated after previous load. Recheck flag again. - pushing_is_finished = (rows_read >= offset + limit) && !previous_row_chunk; + /// rows_read could be updated after previous load. Recalculate flag again. + is_limit_reached = (rows_read >= offset + limit + rows) && !previous_row_chunk; /// Skip block (for 'always_read_till_end' case). - if (pushing_is_finished) + if (is_limit_reached) { current_chunk.clear(); has_block = false; From 969ed1fcd820b6610530995f59445ea1dd458d4c Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 11 Mar 2020 16:39:48 +0300 Subject: [PATCH 04/20] Do not add limit to pipeline if there is prelimit. --- dbms/src/Interpreters/InterpreterSelectQuery.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index f1cf699aea3..867e7d62273 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -976,10 +976,12 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS /** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT, * limiting the number of rows in each up to `offset + limit`. */ + bool has_prelimit = false; if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.hasLimitBy() && !settings.extremes) { executePreLimit(pipeline); + has_prelimit = true; } bool need_merge_streams = need_second_distinct_pass || query.limitBy() @@ -1019,7 +1021,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS */ executeExtremes(pipeline); - executeLimit(pipeline); + if (!(pipeline_with_processors && has_prelimit)) /// Limit is no longer needed if there is prelimit. + executeLimit(pipeline); } } From 78d350bbaa9db8eacc0015f2d8f7c3f957879bfc Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 11 Mar 2020 21:35:14 +0300 Subject: [PATCH 05/20] Added comment. --- dbms/src/Processors/LimitTransform.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index 2472adb14d8..eea7aa4bf24 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -9,6 +9,8 @@ namespace DB class LimitTransform : public IProcessor { public: + /// Common counter of read rows. It is shared between several streams. + /// Is used for pre-limit. Needed to skip main limit phase and avoid the resizing pipeline to single stream. struct LimitState { std::atomic total_read_rows = 0; From aa72aa540cbc0e2d1c149e682d556463a700c2d9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 11 Mar 2020 21:49:06 +0300 Subject: [PATCH 06/20] Added perftest. --- dbms/tests/performance/pre_limit_no_sorting.xml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 dbms/tests/performance/pre_limit_no_sorting.xml diff --git a/dbms/tests/performance/pre_limit_no_sorting.xml b/dbms/tests/performance/pre_limit_no_sorting.xml new file mode 100644 index 00000000000..e93aef049aa --- /dev/null +++ b/dbms/tests/performance/pre_limit_no_sorting.xml @@ -0,0 +1,15 @@ + + + + 10 + 200 + + + 100 + 1000 + + + + SELECT sum(number) FROM (select number from system.numbers_mt limit 1000000000) + + From 560576e1c9e68866b5c8ed9723cfd27c7ca55386 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 15:49:42 +0300 Subject: [PATCH 07/20] Update LimitTransform. --- dbms/src/Processors/LimitTransform.cpp | 59 ++++++++++++++++++++++++++ dbms/src/Processors/LimitTransform.h | 19 +++++++++ 2 files changed, 78 insertions(+) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index ff5a4847ea7..2113157d48a 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -263,5 +263,64 @@ bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort return true; } + +LimitReachedCheckingTransform::LimitReachedCheckingTransform( + const Block & header_, size_t num_streams, + size_t limit, size_t offset, LimitTransform::LimitStatePtr limit_state_) + : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) + , max_total_rows_to_read(limit + offset), limit_state(std::move(limit_state_)) +{ + input_ports.reserve(num_streams); + output_ports.reserve(num_streams); + + for (auto & input : inputs) + input_ports.emplace_back(&input); + + for (auto & output : outputs) + output_ports.emplace_back(&output); +} + + +static IProcessor::Status processPair(InputPort * input, OutputPort * output) +{ + /// Check can output. + if (output->isFinished()) + { + input->close(); + return IProcessor::Status::Finished; + } + + if (!output->canPush()) + return IProcessor::Status::PortFull; + + /// Check input has data.. + if (input->isFinished()) + { + output->finish(); + return IProcessor::Status::Finished; + } + + input->setNeeded(); + + if (input->hasData()) + output->push(input->pull(true)); + + /// Input can be finished after pull. check it again. + if (input->isFinished()) + { + output->finish(); + return IProcessor::Status::Finished; + } + + return IProcessor::Status::PortFull; +} + +IProcessor::Status LimitReachedCheckingTransform::prepare( + const PortNumbers & updated_input_ports, + const PortNumbers & updated_output_ports) +{ + +} + } diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index eea7aa4bf24..51d75d00153 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -61,4 +61,23 @@ public: UInt64 getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } }; +class LimitReachedCheckingTransform : public IProcessor +{ +public: + LimitReachedCheckingTransform( + const Block & header_, size_t num_streams, + size_t limit, size_t offset, LimitTransform::LimitStatePtr limit_state_); + + String getName() const override { return "Limit"; } + + Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; + +private: + size_t max_total_rows_to_read; + LimitTransform::LimitStatePtr limit_state; + + std::vector input_ports; + std::vector output_ports; +}; + } From c580e428258c716d4edcbad6abf8380fcc216f88 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 18:29:35 +0300 Subject: [PATCH 08/20] Update LimitTransform. --- .../Interpreters/InterpreterSelectQuery.cpp | 15 +- dbms/src/Processors/LimitTransform.cpp | 171 +++++++++--------- dbms/src/Processors/LimitTransform.h | 49 ++--- 3 files changed, 96 insertions(+), 139 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 867e7d62273..32512972090 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -2269,18 +2269,9 @@ void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline) /// If there is LIMIT if (query.limitLength()) { - LimitTransform::LimitStatePtr state; - if (pipeline.getNumStreams() > 1) - state = std::make_shared(); - auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context); - pipeline.addSimpleTransform([&, limit = limit_length + limit_offset](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr - { - if (stream_type == QueryPipeline::StreamType::Totals) - return nullptr; - - return std::make_shared(header, limit, 0, state); - }); + auto limit = std::make_shared(pipeline.getHeader(), limit_length, limit_offset, pipeline.getNumStreams()); + pipeline.addPipe({std::move(limit)}); } } @@ -2482,7 +2473,7 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline) return nullptr; return std::make_shared( - header, limit_length, limit_offset, nullptr, always_read_till_end, query.limit_with_ties, order_descr); + header, limit_length, limit_offset, 1, always_read_till_end, query.limit_with_ties, order_descr); }); } } diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index 2113157d48a..beb3da9c6f9 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -5,15 +5,23 @@ namespace DB { LimitTransform::LimitTransform( - const Block & header_, size_t limit_, size_t offset_, LimitStatePtr limit_state_, + const Block & header_, size_t limit_, size_t offset_, size_t num_streams, bool always_read_till_end_, bool with_ties_, SortDescription description_) : IProcessor({header_}, {header_}) - , input(inputs.front()), output(outputs.front()) - , limit(limit_), offset(offset_), limit_state(std::move(limit_state_)) + , limit(limit_), offset(offset_) , always_read_till_end(always_read_till_end_) , with_ties(with_ties_), description(std::move(description_)) { + input_ports.reserve(num_streams); + output_ports.reserve(num_streams); + + for (auto & input : inputs) + input_ports.emplace_back(&input); + + for (auto & output : outputs) + output_ports.emplace_back(&output); + for (const auto & desc : description) { if (!desc.column_name.empty()) @@ -37,7 +45,61 @@ Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) } -LimitTransform::Status LimitTransform::prepare() +IProcessor::Status LimitTransform::prepare( + const PortNumbers & updated_input_ports, + const PortNumbers & updated_output_ports) +{ + bool has_full_port = false; + + auto process_pair = [&](size_t pos) + { + auto status = preparePair(*input_ports[pos], *output_ports[pos]); + + switch (status) + { + case IProcessor::Status::Finished: + { + if (!is_port_pair_finished[pos]) + { + is_port_pair_finished[pos] = true; + ++num_finished_port_pairs; + } + + return; + } + case IProcessor::Status::PortFull: + { + has_full_port = true; + return; + } + case IProcessor::Status::NeedData: + return; + default: + throw Exception( + "Unexpected status for LimitTransform::preparePair : " + IProcessor::statusToName(status), + ErrorCodes::LOGICAL_ERROR); + + } + + __builtin_unreachable(); + }; + + for (auto pos : updated_input_ports) + process_pair(pos); + + for (auto pos : updated_output_ports) + process_pair(pos); + + if (num_finished_port_pairs == input_ports.size()) + return Status::Finished; + + if (has_full_port) + return Status::PortFull; + + return Status::NeedData; +} + +LimitTransform::Status LimitTransform::preparePair(InputPort & input, OutputPort & output) { /// Check can output. bool output_finished = false; @@ -65,10 +127,6 @@ LimitTransform::Status LimitTransform::prepare() block_processed = false; } - /// Update the number of read rows. - if (limit_state) - rows_read = limit_state->total_read_rows.load(std::memory_order_acquire); - /// Check if we are done with pushing. bool is_limit_reached = (rows_read >= offset + limit) && !previous_row_chunk; if (is_limit_reached) @@ -99,25 +157,15 @@ LimitTransform::Status LimitTransform::prepare() auto rows = current_chunk.getNumRows(); rows_before_limit_at_least += rows; - if (limit_state) - /// Note: maybe memory_order_relaxed is enough. It is needed to be proven. - rows_read = limit_state->total_read_rows.fetch_add(rows, std::memory_order_acq_rel) + rows; - else - rows_read += rows; - - /// rows_read could be updated after previous load. Recalculate flag again. - is_limit_reached = (rows_read >= offset + limit + rows) && !previous_row_chunk; - /// Skip block (for 'always_read_till_end' case). if (is_limit_reached) { current_chunk.clear(); has_block = false; - if (input.isFinished() || !always_read_till_end) + if (input.isFinished()) { output.finish(); - input.close(); return Status::Finished; } @@ -127,6 +175,9 @@ LimitTransform::Status LimitTransform::prepare() } /// Process block. + + rows_read += rows; + if (rows_read <= offset) { current_chunk.clear(); @@ -143,32 +194,33 @@ LimitTransform::Status LimitTransform::prepare() return Status::NeedData; } - /// Return the whole block. + if (output.hasData()) + return Status::PortFull; + if (rows_read >= offset + rows && rows_read <= offset + limit) { - if (output.hasData()) - return Status::PortFull; + /// Return the whole chunk. - /// Save the last row of current block to check if next block begins with the same row (for WITH TIES). + /// Save the last row of current chunk to check if next block begins with the same row (for WITH TIES). if (with_ties && rows_read == offset + limit) previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_chunk.getNumRows() - 1); - - output.push(std::move(current_chunk)); - has_block = false; - - return Status::PortFull; } + else + splitChunk(); bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit; /// No more data is needed. if (!always_read_till_end && (rows_read >= offset + limit) && !may_need_more_data_for_ties) input.close(); - return Status::Ready; + output.push(std::move(current_chunk)); + has_block = false; + + return Status::PortFull; } -void LimitTransform::work() +void LimitTransform::splitChunk() { auto current_chunk_sort_columns = extractSortColumns(current_chunk.getColumns()); size_t num_rows = current_chunk.getNumRows(); @@ -263,64 +315,5 @@ bool LimitTransform::sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort return true; } - -LimitReachedCheckingTransform::LimitReachedCheckingTransform( - const Block & header_, size_t num_streams, - size_t limit, size_t offset, LimitTransform::LimitStatePtr limit_state_) - : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) - , max_total_rows_to_read(limit + offset), limit_state(std::move(limit_state_)) -{ - input_ports.reserve(num_streams); - output_ports.reserve(num_streams); - - for (auto & input : inputs) - input_ports.emplace_back(&input); - - for (auto & output : outputs) - output_ports.emplace_back(&output); -} - - -static IProcessor::Status processPair(InputPort * input, OutputPort * output) -{ - /// Check can output. - if (output->isFinished()) - { - input->close(); - return IProcessor::Status::Finished; - } - - if (!output->canPush()) - return IProcessor::Status::PortFull; - - /// Check input has data.. - if (input->isFinished()) - { - output->finish(); - return IProcessor::Status::Finished; - } - - input->setNeeded(); - - if (input->hasData()) - output->push(input->pull(true)); - - /// Input can be finished after pull. check it again. - if (input->isFinished()) - { - output->finish(); - return IProcessor::Status::Finished; - } - - return IProcessor::Status::PortFull; -} - -IProcessor::Status LimitReachedCheckingTransform::prepare( - const PortNumbers & updated_input_ports, - const PortNumbers & updated_output_ports) -{ - -} - } diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index 51d75d00153..2e823fc9fc8 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -8,23 +8,10 @@ namespace DB class LimitTransform : public IProcessor { -public: - /// Common counter of read rows. It is shared between several streams. - /// Is used for pre-limit. Needed to skip main limit phase and avoid the resizing pipeline to single stream. - struct LimitState - { - std::atomic total_read_rows = 0; - }; - - using LimitStatePtr = std::shared_ptr; - private: - InputPort & input; - OutputPort & output; size_t limit; size_t offset; - LimitStatePtr limit_state; size_t rows_read = 0; /// including the last read block bool always_read_till_end; @@ -40,44 +27,30 @@ private: Chunk previous_row_chunk; /// for WITH TIES, contains only sort columns std::vector sort_column_positions; + + std::vector input_ports; + std::vector output_ports; + + std::vector is_port_pair_finished; + size_t num_finished_port_pairs = 0; + Chunk makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const; ColumnRawPtrs extractSortColumns(const Columns & columns) const; bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const; public: LimitTransform( - const Block & header_, size_t limit_, size_t offset_, LimitStatePtr limit_state_ = nullptr, + const Block & header_, size_t limit_, size_t offset_, size_t num_streams = 1, bool always_read_till_end_ = false, bool with_ties_ = false, SortDescription description_ = {}); String getName() const override { return "Limit"; } - Status prepare() override; - void work() override; - - InputPort & getInputPort() { return input; } - OutputPort & getOutputPort() { return output; } + Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; + Status preparePair(InputPort &, OutputPort &); + void splitChunk(); UInt64 getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } }; -class LimitReachedCheckingTransform : public IProcessor -{ -public: - LimitReachedCheckingTransform( - const Block & header_, size_t num_streams, - size_t limit, size_t offset, LimitTransform::LimitStatePtr limit_state_); - - String getName() const override { return "Limit"; } - - Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; - -private: - size_t max_total_rows_to_read; - LimitTransform::LimitStatePtr limit_state; - - std::vector input_ports; - std::vector output_ports; -}; - } From 67518b777fb801f8246411c5b2510ece42a7fc2f Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 18:33:13 +0300 Subject: [PATCH 09/20] Update LimitTransform. --- dbms/src/Processors/LimitTransform.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index beb3da9c6f9..7ece4aec35c 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -22,6 +22,8 @@ LimitTransform::LimitTransform( for (auto & output : outputs) output_ports.emplace_back(&output); + is_port_pair_finished.assign(num_streams, false); + for (const auto & desc : description) { if (!desc.column_name.empty()) From 75185e5b40108fcdc5b3ed8b6bea0d6cd6615dee Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 18:37:21 +0300 Subject: [PATCH 10/20] Update LimitTransform. --- dbms/src/Processors/LimitTransform.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index 7ece4aec35c..b7318c055f8 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -8,7 +8,7 @@ LimitTransform::LimitTransform( const Block & header_, size_t limit_, size_t offset_, size_t num_streams, bool always_read_till_end_, bool with_ties_, SortDescription description_) - : IProcessor({header_}, {header_}) + : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) , limit(limit_), offset(offset_) , always_read_till_end(always_read_till_end_) , with_ties(with_ties_), description(std::move(description_)) From 4b0cac25d14c02d272375337e3bf554d4e68ae27 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 18:52:27 +0300 Subject: [PATCH 11/20] Update LimitTransform. --- dbms/src/Processors/LimitTransform.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index b7318c055f8..a4a084f089d 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -92,9 +92,23 @@ IProcessor::Status LimitTransform::prepare( for (auto pos : updated_output_ports) process_pair(pos); + /// All ports are finished. It may happen even before we reached the limit (has less data then limit). if (num_finished_port_pairs == input_ports.size()) return Status::Finished; + /// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data. + /// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 + if ((rows_read >= offset + limit) && !previous_row_chunk && !always_read_till_end) + { + for (auto & input : inputs) + input.close(); + + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + if (has_full_port) return Status::PortFull; From b1488952c2c929e5bf6f9a754f069a25d30d3eee Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 18:58:53 +0300 Subject: [PATCH 12/20] Added test. --- dbms/tests/queries/0_stateless/01097_pre_limit.reference | 1 + dbms/tests/queries/0_stateless/01097_pre_limit.sql | 1 + 2 files changed, 2 insertions(+) create mode 100644 dbms/tests/queries/0_stateless/01097_pre_limit.reference create mode 100644 dbms/tests/queries/0_stateless/01097_pre_limit.sql diff --git a/dbms/tests/queries/0_stateless/01097_pre_limit.reference b/dbms/tests/queries/0_stateless/01097_pre_limit.reference new file mode 100644 index 00000000000..749fce669df --- /dev/null +++ b/dbms/tests/queries/0_stateless/01097_pre_limit.reference @@ -0,0 +1 @@ +1000000 diff --git a/dbms/tests/queries/0_stateless/01097_pre_limit.sql b/dbms/tests/queries/0_stateless/01097_pre_limit.sql new file mode 100644 index 00000000000..03b9f441dd7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01097_pre_limit.sql @@ -0,0 +1 @@ +SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 From 77bf8805ee4cf973e3f89e08a4b106fc5b9f5918 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 19:59:49 +0300 Subject: [PATCH 13/20] Update LimitTransform. --- dbms/src/Processors/LimitTransform.cpp | 92 +++++++++++--------------- dbms/src/Processors/LimitTransform.h | 37 +++++++---- 2 files changed, 63 insertions(+), 66 deletions(-) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index a4a084f089d..b0971dcebaf 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -13,16 +13,24 @@ LimitTransform::LimitTransform( , always_read_till_end(always_read_till_end_) , with_ties(with_ties_), description(std::move(description_)) { - input_ports.reserve(num_streams); - output_ports.reserve(num_streams); + if (num_streams != 1 && with_ties) + throw Exception("Cannot use LimitTransform with multiple ports and ties.", ErrorCodes::LOGICAL_ERROR); + ports_data.resize(num_streams); + + size_t cur_stream = 0; for (auto & input : inputs) - input_ports.emplace_back(&input); + { + ports_data[cur_stream].input_port = &input; + ++cur_stream; + } + cur_stream = 0; for (auto & output : outputs) - output_ports.emplace_back(&output); - - is_port_pair_finished.assign(num_streams, false); + { + ports_data[cur_stream].output_port = &output; + ++cur_stream; + } for (const auto & desc : description) { @@ -55,15 +63,15 @@ IProcessor::Status LimitTransform::prepare( auto process_pair = [&](size_t pos) { - auto status = preparePair(*input_ports[pos], *output_ports[pos]); + auto status = preparePair(ports_data[pos]); switch (status) { case IProcessor::Status::Finished: { - if (!is_port_pair_finished[pos]) + if (!ports_data[pos].is_finished) { - is_port_pair_finished[pos] = true; + ports_data[pos].is_finished = true; ++num_finished_port_pairs; } @@ -82,8 +90,6 @@ IProcessor::Status LimitTransform::prepare( ErrorCodes::LOGICAL_ERROR); } - - __builtin_unreachable(); }; for (auto pos : updated_input_ports) @@ -93,7 +99,7 @@ IProcessor::Status LimitTransform::prepare( process_pair(pos); /// All ports are finished. It may happen even before we reached the limit (has less data then limit). - if (num_finished_port_pairs == input_ports.size()) + if (num_finished_port_pairs == ports_data.size()) return Status::Finished; /// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data. @@ -115,8 +121,11 @@ IProcessor::Status LimitTransform::prepare( return Status::NeedData; } -LimitTransform::Status LimitTransform::preparePair(InputPort & input, OutputPort & output) +LimitTransform::Status LimitTransform::preparePair(PortsData & data) { + auto & output = *data.output_port; + auto & input = *data.input_port; + /// Check can output. bool output_finished = false; if (output.isFinished()) @@ -135,14 +144,6 @@ LimitTransform::Status LimitTransform::preparePair(InputPort & input, OutputPort return Status::PortFull; } - /// Push block if can. - if (!output_finished && has_block && block_processed) - { - output.push(std::move(current_chunk)); - has_block = false; - block_processed = false; - } - /// Check if we are done with pushing. bool is_limit_reached = (rows_read >= offset + limit) && !previous_row_chunk; if (is_limit_reached) @@ -167,18 +168,15 @@ LimitTransform::Status LimitTransform::preparePair(InputPort & input, OutputPort if (!input.hasData()) return Status::NeedData; - current_chunk = input.pull(true); - has_block = true; + data.current_chunk = input.pull(true); - auto rows = current_chunk.getNumRows(); + auto rows = data.current_chunk.getNumRows(); rows_before_limit_at_least += rows; /// Skip block (for 'always_read_till_end' case). - if (is_limit_reached) + if (is_limit_reached || output_finished) { - current_chunk.clear(); - has_block = false; - + data.current_chunk.clear(); if (input.isFinished()) { output.finish(); @@ -196,8 +194,7 @@ LimitTransform::Status LimitTransform::preparePair(InputPort & input, OutputPort if (rows_read <= offset) { - current_chunk.clear(); - has_block = false; + data.current_chunk.clear(); if (input.isFinished()) { @@ -210,37 +207,34 @@ LimitTransform::Status LimitTransform::preparePair(InputPort & input, OutputPort return Status::NeedData; } - if (output.hasData()) - return Status::PortFull; - if (rows_read >= offset + rows && rows_read <= offset + limit) { /// Return the whole chunk. /// Save the last row of current chunk to check if next block begins with the same row (for WITH TIES). if (with_ties && rows_read == offset + limit) - previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_chunk.getNumRows() - 1); + previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, data.current_chunk.getNumRows() - 1); } else - splitChunk(); + /// This function may be heavy to execute in prepare. But it happens no more then twice, and make code simpler. + splitChunk(data); bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit; /// No more data is needed. if (!always_read_till_end && (rows_read >= offset + limit) && !may_need_more_data_for_ties) input.close(); - output.push(std::move(current_chunk)); - has_block = false; + output.push(std::move(data.current_chunk)); return Status::PortFull; } -void LimitTransform::splitChunk() +void LimitTransform::splitChunk(PortsData & data) { - auto current_chunk_sort_columns = extractSortColumns(current_chunk.getColumns()); - size_t num_rows = current_chunk.getNumRows(); - size_t num_columns = current_chunk.getNumColumns(); + auto current_chunk_sort_columns = extractSortColumns(data.current_chunk.getColumns()); + size_t num_rows = data.current_chunk.getNumRows(); + size_t num_columns = data.current_chunk.getNumColumns(); if (previous_row_chunk && rows_read >= offset + limit) { @@ -252,7 +246,7 @@ void LimitTransform::splitChunk() break; } - auto columns = current_chunk.detachColumns(); + auto columns = data.current_chunk.detachColumns(); if (current_row_num < num_rows) { @@ -261,8 +255,7 @@ void LimitTransform::splitChunk() columns[i] = columns[i]->cut(0, current_row_num); } - current_chunk.setColumns(std::move(columns), current_row_num); - block_processed = true; + data.current_chunk.setColumns(std::move(columns), current_row_num); return; } @@ -280,7 +273,7 @@ void LimitTransform::splitChunk() if (with_ties && length) { size_t current_row_num = start + length; - previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_row_num - 1); + previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1); for (; current_row_num < num_rows; ++current_row_num) { @@ -295,19 +288,14 @@ void LimitTransform::splitChunk() } if (length == num_rows) - { - block_processed = true; return; - } - auto columns = current_chunk.detachColumns(); + auto columns = data.current_chunk.detachColumns(); for (size_t i = 0; i < num_columns; ++i) columns[i] = columns[i]->cut(start, length); - current_chunk.setColumns(std::move(columns), length); - - block_processed = true; + data.current_chunk.setColumns(std::move(columns), length); } ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index 2e823fc9fc8..c1b7e7ed378 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -6,32 +6,41 @@ namespace DB { +/// Implementation for LIMIT N OFFSET M +/// This processor support multiple inputs and outputs (the same number). +/// Each pair of input and output port works independently. +/// The reason to have multiple ports is to be able to stop all sources when limit is reached, in a query like: +/// ELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 +/// +/// always_read_till_end - read all data from input ports even if limit was reached. +/// with_ties, description - implementation of LIMIT WITH TIES. It works only for single port. class LimitTransform : public IProcessor { private: size_t limit; size_t offset; - size_t rows_read = 0; /// including the last read block bool always_read_till_end; - bool has_block = false; - bool block_processed = false; - Chunk current_chunk; - - UInt64 rows_before_limit_at_least = 0; - bool with_ties; const SortDescription description; Chunk previous_row_chunk; /// for WITH TIES, contains only sort columns - std::vector sort_column_positions; - std::vector input_ports; - std::vector output_ports; + size_t rows_read = 0; /// including the last read block + size_t rows_before_limit_at_least = 0; - std::vector is_port_pair_finished; + struct PortsData + { + Chunk current_chunk; + + InputPort * input_port = nullptr; + OutputPort * output_port = nullptr; + bool is_finished = false; + }; + + std::vector ports_data; size_t num_finished_port_pairs = 0; Chunk makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const; @@ -47,10 +56,10 @@ public: String getName() const override { return "Limit"; } Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; - Status preparePair(InputPort &, OutputPort &); - void splitChunk(); + Status preparePair(PortsData & data); + void splitChunk(PortsData & data); - UInt64 getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } + size_t getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } }; } From 4a98887e27f1269936d543da89b04e2685524810 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 20:02:12 +0300 Subject: [PATCH 14/20] Update LimitTransform. --- dbms/src/Processors/LimitTransform.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index b0971dcebaf..fd80cd5c1bd 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -4,6 +4,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + LimitTransform::LimitTransform( const Block & header_, size_t limit_, size_t offset_, size_t num_streams, bool always_read_till_end_, bool with_ties_, From 4ea2361f377afc364d238d34b34689ed4e94290e Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 12 Mar 2020 22:04:12 +0300 Subject: [PATCH 15/20] Added comment. --- dbms/src/Processors/LimitTransform.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index c1b7e7ed378..f7a5a8d364d 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -31,6 +31,8 @@ private: size_t rows_read = 0; /// including the last read block size_t rows_before_limit_at_least = 0; + /// State of port's pair. + /// Chunks from different port pairs are not mixed for berret cache locality. struct PortsData { Chunk current_chunk; From 9caa6b3f1eac87516827e3b881de6d1fd1f759c4 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 13 Mar 2020 16:44:08 +0300 Subject: [PATCH 16/20] Try fix tests. --- .../Interpreters/InterpreterSelectQuery.cpp | 22 ++++++++++++++++--- .../src/Interpreters/InterpreterSelectQuery.h | 2 +- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 32512972090..e04dd5ab13b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -910,7 +910,12 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS } if (query.limitLength()) - executePreLimit(pipeline); + { + if constexpr (pipeline_with_processors) + executePreLimit(pipeline, true); + else + executePreLimit(pipeline); + } } // If there is no global subqueries, we can run subqueries only when receive them on server. @@ -980,7 +985,11 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.hasLimitBy() && !settings.extremes) { - executePreLimit(pipeline); + if constexpr (pipeline_with_processors) + executePreLimit(pipeline, false); + else + executePreLimit(pipeline); + has_prelimit = true; } @@ -2263,13 +2272,20 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline) } /// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined. -void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline) +void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset) { auto & query = getSelectQuery(); /// If there is LIMIT if (query.limitLength()) { auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context); + + if (do_not_skip_offset) + { + limit_length += limit_offset; + limit_offset = 0; + } + auto limit = std::make_shared(pipeline.getHeader(), limit_length, limit_offset, pipeline.getNumStreams()); pipeline.addPipe({std::move(limit)}); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 882c5a71e2a..06bca9dbe48 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -191,7 +191,7 @@ private: void executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info); void executeWithFill(QueryPipeline & pipeline); void executeMergeSorted(QueryPipeline & pipeline); - void executePreLimit(QueryPipeline & pipeline); + void executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset); void executeLimitBy(QueryPipeline & pipeline); void executeLimit(QueryPipeline & pipeline); void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); From 68dc95486b5a2d82efdc25f50cd30b1cdbd5f979 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 13 Mar 2020 16:50:34 +0300 Subject: [PATCH 17/20] Fix build. --- dbms/src/Processors/LimitTransform.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index f7a5a8d364d..3398e86b15a 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -61,6 +61,9 @@ public: Status preparePair(PortsData & data); void splitChunk(PortsData & data); + InputPort & getInputPort() { return inputs.front(); } + OutputPort & getOutputPort() { return outputs.front(); } + size_t getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } }; From 62e06c7813684c26d7def524e1bb9f04d65067cc Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 13 Mar 2020 20:41:39 +0300 Subject: [PATCH 18/20] Try fix tests. --- dbms/src/Processors/LimitTransform.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index 3398e86b15a..fcd432debcd 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -58,6 +58,16 @@ public: String getName() const override { return "Limit"; } Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; + + /// Compatibility for TreeExecutor. + Status prepare() override + { + if (ports_data.size() != 1) + throw Exception("", ErrorCodes::LOGICAL_ERROR); + + return prepare({0}, {0}); + } + Status preparePair(PortsData & data); void splitChunk(PortsData & data); From 75fa33fa04d096a5c1d2e91037c162f2430c15c3 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 13 Mar 2020 21:43:21 +0300 Subject: [PATCH 19/20] Try fix tests. --- dbms/src/Processors/LimitTransform.cpp | 9 +++++++++ dbms/src/Processors/LimitTransform.h | 11 +---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index fd80cd5c1bd..219a663198f 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -126,6 +126,15 @@ IProcessor::Status LimitTransform::prepare( return Status::NeedData; } +LimitTransform::Status LimitTransform::prepare() +{ + if (ports_data.size() != 1) + throw Exception("prepare without arguments is not supported for multi-port LimitTransform.", + ErrorCodes::LOGICAL_ERROR); + + return prepare({0}, {0}); +} + LimitTransform::Status LimitTransform::preparePair(PortsData & data) { auto & output = *data.output_port; diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index fcd432debcd..2035ed308f1 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -58,16 +58,7 @@ public: String getName() const override { return "Limit"; } Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; - - /// Compatibility for TreeExecutor. - Status prepare() override - { - if (ports_data.size() != 1) - throw Exception("", ErrorCodes::LOGICAL_ERROR); - - return prepare({0}, {0}); - } - + Status prepare() override; /// Compatibility for TreeExecutor. Status preparePair(PortsData & data); void splitChunk(PortsData & data); From 95c83ec95a3211d75801671d952de3a20a83c157 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Sat, 14 Mar 2020 17:51:32 +0300 Subject: [PATCH 20/20] Fix typo. --- dbms/src/Processors/LimitTransform.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index 2035ed308f1..1e7ec3bf322 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -10,7 +10,7 @@ namespace DB /// This processor support multiple inputs and outputs (the same number). /// Each pair of input and output port works independently. /// The reason to have multiple ports is to be able to stop all sources when limit is reached, in a query like: -/// ELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 +/// SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 /// /// always_read_till_end - read all data from input ports even if limit was reached. /// with_ties, description - implementation of LIMIT WITH TIES. It works only for single port.