diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 6c7a17ffd77..b8dedec15da 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -93,6 +93,7 @@ #include #include #include +#include #include #include #include @@ -659,16 +660,16 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, const Co return order_descr; } -static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context) +static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context, const std::string & expr) { const auto & [field, type] = evaluateConstantExpression(node, context); if (!isNativeNumber(type)) - throw Exception("Illegal type " + type->getName() + " of LIMIT expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION); + throw Exception("Illegal type " + type->getName() + " of " + expr + " expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION); Field converted = convertFieldToType(field, DataTypeUInt64()); if (converted.isNull()) - throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of LIMIT expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION); + throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of " + expr + " expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION); return converted.safeGet(); } @@ -681,11 +682,12 @@ static std::pair getLimitLengthAndOffset(const ASTSelectQuery & if (query.limitLength()) { - length = getLimitUIntValue(query.limitLength(), context); + length = getLimitUIntValue(query.limitLength(), context, "LIMIT"); if (query.limitOffset() && length) - offset = getLimitUIntValue(query.limitOffset(), context); + offset = getLimitUIntValue(query.limitOffset(), context, "OFFSET"); } - + else if (query.limitOffset()) + offset = getLimitUIntValue(query.limitOffset(), context, "OFFSET"); return {length, offset}; } @@ -693,7 +695,7 @@ static std::pair getLimitLengthAndOffset(const ASTSelectQuery & static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context) { /// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY, neither ARRAY JOIN. - if (!query.distinct && !query.limitBy() && !query.limit_with_ties && !query.arrayJoinExpressionList()) + if (!query.distinct && !query.limitBy() && !query.limit_with_ties && !query.arrayJoinExpressionList() && query.limitLength()) { auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context); return limit_length + limit_offset; @@ -1070,6 +1072,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (!(pipeline_with_processors && has_prelimit)) /// Limit is no longer needed if there is prelimit. executeLimit(pipeline); + + executeOffset(pipeline); } } @@ -2345,8 +2349,8 @@ void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline) Names columns; for (const auto & elem : query.limitBy()->children) columns.emplace_back(elem->getColumnName()); - UInt64 length = getLimitUIntValue(query.limitByLength(), *context); - UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context) : 0); + UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT"); + UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0); pipeline.transform([&](auto & stream) { @@ -2364,8 +2368,8 @@ void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline) for (const auto & elem : query.limitBy()->children) columns.emplace_back(elem->getColumnName()); - UInt64 length = getLimitUIntValue(query.limitByLength(), *context); - UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context) : 0); + UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT"); + UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0); pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr { @@ -2444,6 +2448,7 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline) } } +void InterpreterSelectQuery::executeOffset(Pipeline & /* pipeline */) {} void InterpreterSelectQuery::executeWithFill(Pipeline & pipeline) { @@ -2539,6 +2544,26 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline) } +void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline) +{ + auto & query = getSelectQuery(); + /// If there is not a LIMIT but an offset + if (!query.limitLength() && query.limitOffset()) + { + UInt64 limit_length; + UInt64 limit_offset; + std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context); + + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr + { + if (stream_type != QueryPipeline::StreamType::Main) + return nullptr; + return std::make_shared(header, limit_offset, 1); + }); + } +} + + void InterpreterSelectQuery::executeExtremes(Pipeline & pipeline) { if (!context->getSettingsRef().extremes) diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index c50f4a2f7b7..1415143dd63 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -185,6 +185,7 @@ private: void executeUnion(Pipeline & pipeline, Block header); void executeLimitBy(Pipeline & pipeline); void executeLimit(Pipeline & pipeline); + void executeOffset(Pipeline & pipeline); static void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression); void executeDistinct(Pipeline & pipeline, bool before_order, Names columns); void executeExtremes(Pipeline & pipeline); @@ -203,6 +204,7 @@ private: void executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset); void executeLimitBy(QueryPipeline & pipeline); void executeLimit(QueryPipeline & pipeline); + void executeOffset(QueryPipeline & pipeline); static void executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression); void executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns); void executeExtremes(QueryPipeline & pipeline); diff --git a/src/Parsers/ASTSelectQuery.cpp b/src/Parsers/ASTSelectQuery.cpp index 6a88700e3b8..9e65543babe 100644 --- a/src/Parsers/ASTSelectQuery.cpp +++ b/src/Parsers/ASTSelectQuery.cpp @@ -154,6 +154,11 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F if (limit_with_ties) s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << " WITH TIES" << (s.hilite ? hilite_none : ""); } + else if (limitOffset()) + { + s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "OFFSET " << (s.hilite ? hilite_none : ""); + limitOffset()->formatImpl(s, state, frame); + } if (settings()) { diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index 89579d07177..70a8b282a72 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -1107,6 +1107,7 @@ const char * ParserAlias::restricted_keywords[] = "HAVING", "ORDER", "LIMIT", + "OFFSET", "SETTINGS", "FORMAT", "UNION", diff --git a/src/Parsers/ParserSelectQuery.cpp b/src/Parsers/ParserSelectQuery.cpp index 927ac45001e..d2d7bbf9f21 100644 --- a/src/Parsers/ParserSelectQuery.cpp +++ b/src/Parsers/ParserSelectQuery.cpp @@ -241,6 +241,11 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (top_length && limit_length) throw Exception("Can not use TOP and LIMIT together", ErrorCodes::TOP_AND_LIMIT_TOGETHER); } + else if (s_offset.ignore(pos, expected)) + { + if (!exp_elem.parse(pos, limit_offset, expected)) + return false; + } /// Because TOP n in totally equals LIMIT n if (top_length) diff --git a/src/Processors/OffsetTransform.cpp b/src/Processors/OffsetTransform.cpp new file mode 100644 index 00000000000..f380a5a5159 --- /dev/null +++ b/src/Processors/OffsetTransform.cpp @@ -0,0 +1,186 @@ +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +OffsetTransform::OffsetTransform( + const Block & header_, size_t offset_, size_t num_streams) + : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) + , offset(offset_) +{ + ports_data.resize(num_streams); + + size_t cur_stream = 0; + for (auto & input : inputs) + { + ports_data[cur_stream].input_port = &input; + ++cur_stream; + } + + cur_stream = 0; + for (auto & output : outputs) + { + ports_data[cur_stream].output_port = &output; + ++cur_stream; + } +} + + +IProcessor::Status OffsetTransform::prepare( + const PortNumbers & updated_input_ports, + const PortNumbers & updated_output_ports) +{ + bool has_full_port = false; + + auto process_pair = [&](size_t pos) + { + auto status = preparePair(ports_data[pos]); + + switch (status) + { + case IProcessor::Status::Finished: + { + if (!ports_data[pos].is_finished) + { + ports_data[pos].is_finished = true; + ++num_finished_port_pairs; + } + + return; + } + case IProcessor::Status::PortFull: + { + has_full_port = true; + return; + } + case IProcessor::Status::NeedData: + return; + default: + throw Exception( + "Unexpected status for OffsetTransform::preparePair : " + IProcessor::statusToName(status), + ErrorCodes::LOGICAL_ERROR); + + } + }; + + for (auto pos : updated_input_ports) + process_pair(pos); + + for (auto pos : updated_output_ports) + process_pair(pos); + + /// All ports are finished. It may happen even before we reached the limit (has less data then limit). + if (num_finished_port_pairs == ports_data.size()) + return Status::Finished; + + if (has_full_port) + return Status::PortFull; + + return Status::NeedData; +} + +OffsetTransform::Status OffsetTransform::prepare() +{ + if (ports_data.size() != 1) + throw Exception("prepare without arguments is not supported for multi-port OffsetTransform.", + ErrorCodes::LOGICAL_ERROR); + + return prepare({0}, {0}); +} + +OffsetTransform::Status OffsetTransform::preparePair(PortsData & data) +{ + auto & output = *data.output_port; + auto & input = *data.input_port; + + /// Check can output. + bool output_finished = false; + if (output.isFinished()) + { + output_finished = true; + } + + if (!output_finished && !output.canPush()) + { + input.setNotNeeded(); + return Status::PortFull; + } + + /// Check can input. + + if (input.isFinished()) + { + output.finish(); + return Status::Finished; + } + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + data.current_chunk = input.pull(true); + + auto rows = data.current_chunk.getNumRows(); + + if (rows_before_limit_at_least) + rows_before_limit_at_least->add(rows); + + /// Process block. + + rows_read += rows; + + if (rows_read < offset) + { + data.current_chunk.clear(); + + if (input.isFinished()) + { + output.finish(); + return Status::Finished; + } + + /// Now, we pulled from input, and it must be empty. + input.setNeeded(); + return Status::NeedData; + } + + if (!(rows_read >= offset + rows)) + splitChunk(data); + + output.push(std::move(data.current_chunk)); + + return Status::PortFull; +} + + +void OffsetTransform::splitChunk(PortsData & data) const +{ + size_t num_rows = data.current_chunk.getNumRows(); + size_t num_columns = data.current_chunk.getNumColumns(); + + /// return a piece of the block + size_t start = std::max( + static_cast(0), + static_cast(offset) - static_cast(rows_read) + static_cast(num_rows)); + + size_t length = static_cast(rows_read) - static_cast(offset); + + if (length == num_rows) + return; + + auto columns = data.current_chunk.detachColumns(); + + for (size_t i = 0; i < num_columns; ++i) + columns[i] = columns[i]->cut(start, length); + + data.current_chunk.setColumns(std::move(columns), length); +} + +} + diff --git a/src/Processors/OffsetTransform.h b/src/Processors/OffsetTransform.h new file mode 100644 index 00000000000..3fee4e791a5 --- /dev/null +++ b/src/Processors/OffsetTransform.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +/// Implementation for OFFSET N (without limit) +/// This processor support multiple inputs and outputs (the same number). +/// Each pair of input and output port works independently. +class OffsetTransform : public IProcessor +{ +private: + + size_t offset; + + size_t rows_read = 0; /// including the last read block + RowsBeforeLimitCounterPtr rows_before_limit_at_least; + + /// State of port's pair. + /// Chunks from different port pairs are not mixed for berret cache locality. + struct PortsData + { + Chunk current_chunk; + + InputPort * input_port = nullptr; + OutputPort * output_port = nullptr; + bool is_finished = false; + }; + + std::vector ports_data; + size_t num_finished_port_pairs = 0; + +public: + OffsetTransform(const Block & header_, size_t offset_, size_t num_streams = 1); + + String getName() const override { return "Offset"; } + + Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; + Status prepare() override; /// Compatibility for TreeExecutor. + Status preparePair(PortsData & data); + void splitChunk(PortsData & data) const; + + InputPort & getInputPort() { return inputs.front(); } + OutputPort & getOutputPort() { return outputs.front(); } + + void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); } +}; + +} diff --git a/tests/queries/0_stateless/01272_offset_without_limit.reference b/tests/queries/0_stateless/01272_offset_without_limit.reference new file mode 100644 index 00000000000..780c35c9d70 --- /dev/null +++ b/tests/queries/0_stateless/01272_offset_without_limit.reference @@ -0,0 +1,45 @@ +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 +21 +22 +23 +24 +25 +26 +27 +28 +29 +30 +31 +32 +33 +34 +35 +36 +37 +38 +39 +40 +41 +42 +43 +44 +45 +46 +47 +48 +49 diff --git a/tests/queries/0_stateless/01272_offset_without_limit.sql b/tests/queries/0_stateless/01272_offset_without_limit.sql new file mode 100644 index 00000000000..769808b2edd --- /dev/null +++ b/tests/queries/0_stateless/01272_offset_without_limit.sql @@ -0,0 +1,13 @@ +DROP TABLE IF EXISTS offset_without_limit; + +CREATE TABLE offset_without_limit ( + value UInt32 +) Engine = MergeTree() + PRIMARY KEY value + ORDER BY value; + +INSERT INTO offset_without_limit SELECT * FROM system.numbers LIMIT 50; + +SELECT value FROM offset_without_limit ORDER BY value OFFSET 5; + +DROP TABLE offset_without_limit;