diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 957c462684a..826f016cce9 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -909,7 +909,12 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS } if (query.limitLength()) - executePreLimit(pipeline); + { + if constexpr (pipeline_with_processors) + executePreLimit(pipeline, true); + else + executePreLimit(pipeline); + } } // If there is no global subqueries, we can run subqueries only when receive them on server. @@ -975,13 +980,20 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS /** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT, * limiting the number of rows in each up to `offset + limit`. */ + bool has_prelimit = false; if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.hasLimitBy() && !settings.extremes) { - executePreLimit(pipeline); + if constexpr (pipeline_with_processors) + executePreLimit(pipeline, false); + else + executePreLimit(pipeline); + + has_prelimit = true; } - bool need_merge_streams = need_second_distinct_pass || query.limitLength() || query.limitBy(); + bool need_merge_streams = need_second_distinct_pass || query.limitBy() + || (!pipeline_with_processors && query.limitLength()); /// Don't merge streams for pre-limit more. if constexpr (!pipeline_with_processors) if (pipeline.hasDelayedStream()) @@ -1017,7 +1029,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS */ executeExtremes(pipeline); - executeLimit(pipeline); + if (!(pipeline_with_processors && has_prelimit)) /// Limit is no longer needed if there is prelimit. + executeLimit(pipeline); } } @@ -2250,20 +2263,22 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline) } /// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined. -void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline) +void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset) { auto & query = getSelectQuery(); /// If there is LIMIT if (query.limitLength()) { auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context); - pipeline.addSimpleTransform([&, limit = limit_length + limit_offset](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr - { - if (stream_type == QueryPipeline::StreamType::Totals) - return nullptr; - return std::make_shared(header, limit, 0); - }); + if (do_not_skip_offset) + { + limit_length += limit_offset; + limit_offset = 0; + } + + auto limit = std::make_shared(pipeline.getHeader(), limit_length, limit_offset, pipeline.getNumStreams()); + pipeline.addPipe({std::move(limit)}); } } @@ -2465,7 +2480,7 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline) return nullptr; return std::make_shared( - header, limit_length, limit_offset, always_read_till_end, query.limit_with_ties, order_descr); + header, limit_length, limit_offset, 1, always_read_till_end, query.limit_with_ties, order_descr); }); } } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 94d847ea550..414d18a5a27 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -191,7 +191,7 @@ private: void executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr sorting_info); void executeWithFill(QueryPipeline & pipeline); void executeMergeSorted(QueryPipeline & pipeline); - void executePreLimit(QueryPipeline & pipeline); + void executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset); void executeLimitBy(QueryPipeline & pipeline); void executeLimit(QueryPipeline & pipeline); void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); diff --git a/dbms/src/Processors/LimitTransform.cpp b/dbms/src/Processors/LimitTransform.cpp index 5ef56554e3d..219a663198f 100644 --- a/dbms/src/Processors/LimitTransform.cpp +++ b/dbms/src/Processors/LimitTransform.cpp @@ -4,16 +4,39 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + LimitTransform::LimitTransform( - const Block & header_, size_t limit_, size_t offset_, + const Block & header_, size_t limit_, size_t offset_, size_t num_streams, bool always_read_till_end_, bool with_ties_, - const SortDescription & description_) - : IProcessor({header_}, {header_}) - , input(inputs.front()), output(outputs.front()) + SortDescription description_) + : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) , limit(limit_), offset(offset_) , always_read_till_end(always_read_till_end_) - , with_ties(with_ties_), description(description_) + , with_ties(with_ties_), description(std::move(description_)) { + if (num_streams != 1 && with_ties) + throw Exception("Cannot use LimitTransform with multiple ports and ties.", ErrorCodes::LOGICAL_ERROR); + + ports_data.resize(num_streams); + + size_t cur_stream = 0; + for (auto & input : inputs) + { + ports_data[cur_stream].input_port = &input; + ++cur_stream; + } + + cur_stream = 0; + for (auto & output : outputs) + { + ports_data[cur_stream].output_port = &output; + ++cur_stream; + } + for (const auto & desc : description) { if (!desc.column_name.empty()) @@ -37,8 +60,86 @@ Chunk LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) } +IProcessor::Status LimitTransform::prepare( + const PortNumbers & updated_input_ports, + const PortNumbers & updated_output_ports) +{ + bool has_full_port = false; + + auto process_pair = [&](size_t pos) + { + auto status = preparePair(ports_data[pos]); + + switch (status) + { + case IProcessor::Status::Finished: + { + if (!ports_data[pos].is_finished) + { + ports_data[pos].is_finished = true; + ++num_finished_port_pairs; + } + + return; + } + case IProcessor::Status::PortFull: + { + has_full_port = true; + return; + } + case IProcessor::Status::NeedData: + return; + default: + throw Exception( + "Unexpected status for LimitTransform::preparePair : " + IProcessor::statusToName(status), + ErrorCodes::LOGICAL_ERROR); + + } + }; + + for (auto pos : updated_input_ports) + process_pair(pos); + + for (auto pos : updated_output_ports) + process_pair(pos); + + /// All ports are finished. It may happen even before we reached the limit (has less data then limit). + if (num_finished_port_pairs == ports_data.size()) + return Status::Finished; + + /// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data. + /// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 + if ((rows_read >= offset + limit) && !previous_row_chunk && !always_read_till_end) + { + for (auto & input : inputs) + input.close(); + + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + if (has_full_port) + return Status::PortFull; + + return Status::NeedData; +} + LimitTransform::Status LimitTransform::prepare() { + if (ports_data.size() != 1) + throw Exception("prepare without arguments is not supported for multi-port LimitTransform.", + ErrorCodes::LOGICAL_ERROR); + + return prepare({0}, {0}); +} + +LimitTransform::Status LimitTransform::preparePair(PortsData & data) +{ + auto & output = *data.output_port; + auto & input = *data.input_port; + /// Check can output. bool output_finished = false; if (output.isFinished()) @@ -57,17 +158,9 @@ LimitTransform::Status LimitTransform::prepare() return Status::PortFull; } - /// Push block if can. - if (!output_finished && has_block && block_processed) - { - output.push(std::move(current_chunk)); - has_block = false; - block_processed = false; - } - /// Check if we are done with pushing. - bool pushing_is_finished = (rows_read >= offset + limit) && !previous_row_chunk; - if (pushing_is_finished) + bool is_limit_reached = (rows_read >= offset + limit) && !previous_row_chunk; + if (is_limit_reached) { if (!always_read_till_end) { @@ -89,18 +182,15 @@ LimitTransform::Status LimitTransform::prepare() if (!input.hasData()) return Status::NeedData; - current_chunk = input.pull(true); - has_block = true; + data.current_chunk = input.pull(true); - auto rows = current_chunk.getNumRows(); + auto rows = data.current_chunk.getNumRows(); rows_before_limit_at_least += rows; /// Skip block (for 'always_read_till_end' case). - if (pushing_is_finished) + if (is_limit_reached || output_finished) { - current_chunk.clear(); - has_block = false; - + data.current_chunk.clear(); if (input.isFinished()) { output.finish(); @@ -118,8 +208,7 @@ LimitTransform::Status LimitTransform::prepare() if (rows_read <= offset) { - current_chunk.clear(); - has_block = false; + data.current_chunk.clear(); if (input.isFinished()) { @@ -132,36 +221,34 @@ LimitTransform::Status LimitTransform::prepare() return Status::NeedData; } - /// Return the whole block. if (rows_read >= offset + rows && rows_read <= offset + limit) { - if (output.hasData()) - return Status::PortFull; + /// Return the whole chunk. - /// Save the last row of current block to check if next block begins with the same row (for WITH TIES). + /// Save the last row of current chunk to check if next block begins with the same row (for WITH TIES). if (with_ties && rows_read == offset + limit) - previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_chunk.getNumRows() - 1); - - output.push(std::move(current_chunk)); - has_block = false; - - return Status::PortFull; + previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, data.current_chunk.getNumRows() - 1); } + else + /// This function may be heavy to execute in prepare. But it happens no more then twice, and make code simpler. + splitChunk(data); bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit; /// No more data is needed. if (!always_read_till_end && (rows_read >= offset + limit) && !may_need_more_data_for_ties) input.close(); - return Status::Ready; + output.push(std::move(data.current_chunk)); + + return Status::PortFull; } -void LimitTransform::work() +void LimitTransform::splitChunk(PortsData & data) { - auto current_chunk_sort_columns = extractSortColumns(current_chunk.getColumns()); - size_t num_rows = current_chunk.getNumRows(); - size_t num_columns = current_chunk.getNumColumns(); + auto current_chunk_sort_columns = extractSortColumns(data.current_chunk.getColumns()); + size_t num_rows = data.current_chunk.getNumRows(); + size_t num_columns = data.current_chunk.getNumColumns(); if (previous_row_chunk && rows_read >= offset + limit) { @@ -173,7 +260,7 @@ void LimitTransform::work() break; } - auto columns = current_chunk.detachColumns(); + auto columns = data.current_chunk.detachColumns(); if (current_row_num < num_rows) { @@ -182,8 +269,7 @@ void LimitTransform::work() columns[i] = columns[i]->cut(0, current_row_num); } - current_chunk.setColumns(std::move(columns), current_row_num); - block_processed = true; + data.current_chunk.setColumns(std::move(columns), current_row_num); return; } @@ -201,7 +287,7 @@ void LimitTransform::work() if (with_ties && length) { size_t current_row_num = start + length; - previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_row_num - 1); + previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1); for (; current_row_num < num_rows; ++current_row_num) { @@ -216,19 +302,14 @@ void LimitTransform::work() } if (length == num_rows) - { - block_processed = true; return; - } - auto columns = current_chunk.detachColumns(); + auto columns = data.current_chunk.detachColumns(); for (size_t i = 0; i < num_columns; ++i) columns[i] = columns[i]->cut(start, length); - current_chunk.setColumns(std::move(columns), length); - - block_processed = true; + data.current_chunk.setColumns(std::move(columns), length); } ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const diff --git a/dbms/src/Processors/LimitTransform.h b/dbms/src/Processors/LimitTransform.h index f37bf8aba15..1e7ec3bf322 100644 --- a/dbms/src/Processors/LimitTransform.h +++ b/dbms/src/Processors/LimitTransform.h @@ -6,48 +6,66 @@ namespace DB { +/// Implementation for LIMIT N OFFSET M +/// This processor support multiple inputs and outputs (the same number). +/// Each pair of input and output port works independently. +/// The reason to have multiple ports is to be able to stop all sources when limit is reached, in a query like: +/// SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 +/// +/// always_read_till_end - read all data from input ports even if limit was reached. +/// with_ties, description - implementation of LIMIT WITH TIES. It works only for single port. class LimitTransform : public IProcessor { private: - InputPort & input; - OutputPort & output; size_t limit; size_t offset; - size_t rows_read = 0; /// including the last read block bool always_read_till_end; - bool has_block = false; - bool block_processed = false; - Chunk current_chunk; - - UInt64 rows_before_limit_at_least = 0; - bool with_ties; const SortDescription description; Chunk previous_row_chunk; /// for WITH TIES, contains only sort columns - std::vector sort_column_positions; + + size_t rows_read = 0; /// including the last read block + size_t rows_before_limit_at_least = 0; + + /// State of port's pair. + /// Chunks from different port pairs are not mixed for berret cache locality. + struct PortsData + { + Chunk current_chunk; + + InputPort * input_port = nullptr; + OutputPort * output_port = nullptr; + bool is_finished = false; + }; + + std::vector ports_data; + size_t num_finished_port_pairs = 0; + Chunk makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const; ColumnRawPtrs extractSortColumns(const Columns & columns) const; bool sortColumnsEqualAt(const ColumnRawPtrs & current_chunk_sort_columns, size_t current_chunk_row_num) const; public: LimitTransform( - const Block & header_, size_t limit_, size_t offset_, + const Block & header_, size_t limit_, size_t offset_, size_t num_streams = 1, bool always_read_till_end_ = false, bool with_ties_ = false, - const SortDescription & description_ = {}); + SortDescription description_ = {}); String getName() const override { return "Limit"; } - Status prepare() override; - void work() override; + Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; + Status prepare() override; /// Compatibility for TreeExecutor. + Status preparePair(PortsData & data); + void splitChunk(PortsData & data); - InputPort & getInputPort() { return input; } - OutputPort & getOutputPort() { return output; } + InputPort & getInputPort() { return inputs.front(); } + OutputPort & getOutputPort() { return outputs.front(); } - UInt64 getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } + size_t getRowsBeforeLimitAtLeast() const { return rows_before_limit_at_least; } }; } diff --git a/dbms/src/Storages/System/StorageSystemNumbers.cpp b/dbms/src/Storages/System/StorageSystemNumbers.cpp index f5a5bb58b72..6e7959f455b 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/System/StorageSystemNumbers.cpp @@ -167,7 +167,7 @@ Pipes StorageSystemNumbers::read( { /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly. res.back().addSimpleTransform(std::make_shared( - res.back().getHeader(), *limit * (i + 1) / num_streams - *limit * i / num_streams, 0, false)); + res.back().getHeader(), *limit * (i + 1) / num_streams - *limit * i / num_streams, 0)); } } diff --git a/dbms/tests/performance/pre_limit_no_sorting.xml b/dbms/tests/performance/pre_limit_no_sorting.xml new file mode 100644 index 00000000000..e93aef049aa --- /dev/null +++ b/dbms/tests/performance/pre_limit_no_sorting.xml @@ -0,0 +1,15 @@ + + + + 10 + 200 + + + 100 + 1000 + + + + SELECT sum(number) FROM (select number from system.numbers_mt limit 1000000000) + + diff --git a/dbms/tests/queries/0_stateless/01097_pre_limit.reference b/dbms/tests/queries/0_stateless/01097_pre_limit.reference new file mode 100644 index 00000000000..749fce669df --- /dev/null +++ b/dbms/tests/queries/0_stateless/01097_pre_limit.reference @@ -0,0 +1 @@ +1000000 diff --git a/dbms/tests/queries/0_stateless/01097_pre_limit.sql b/dbms/tests/queries/0_stateless/01097_pre_limit.sql new file mode 100644 index 00000000000..03b9f441dd7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01097_pre_limit.sql @@ -0,0 +1 @@ +SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1