diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 16f7b75c036..1325d013632 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -656,16 +656,16 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, const Co return order_descr; } -static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context) +static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context, const std::string & expr) { const auto & [field, type] = evaluateConstantExpression(node, context); if (!isNativeNumber(type)) - throw Exception("Illegal type " + type->getName() + " of LIMIT expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION); + throw Exception("Illegal type " + type->getName() + " of " + expr + " expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION); Field converted = convertFieldToType(field, DataTypeUInt64()); if (converted.isNull()) - throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of LIMIT expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION); + throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of " + expr + " expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION); return converted.safeGet(); } @@ -677,9 +677,9 @@ static std::pair getLimitLengthAndOffset(const ASTSelectQuery & UInt64 offset = 0; if (query.limitLength()) - length = getLimitUIntValue(query.limitLength(), context); + length = getLimitUIntValue(query.limitLength(), context, "LIMIT"); if (query.limitOffset()) - offset = getLimitUIntValue(query.limitOffset(), context); + offset = getLimitUIntValue(query.limitOffset(), context, "OFFSET"); return {length, offset}; } @@ -2336,8 +2336,8 @@ void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline) Names columns; for (const auto & elem : query.limitBy()->children) columns.emplace_back(elem->getColumnName()); - UInt64 length = getLimitUIntValue(query.limitByLength(), *context); - UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context) : 0); + UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT"); + UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0); pipeline.transform([&](auto & stream) { @@ -2355,8 +2355,8 @@ void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline) for (const auto & elem : query.limitBy()->children) columns.emplace_back(elem->getColumnName()); - UInt64 length = getLimitUIntValue(query.limitByLength(), *context); - UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context) : 0); + UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT"); + UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0); pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr { @@ -2580,23 +2580,6 @@ void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline) /// If there is LIMIT if (!query.limitLength() && query.limitOffset()) { - /** Rare case: - * if there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels, - * then when using LIMIT, you should read the data to the end, rather than cancel the query earlier, - * because if you cancel the query, we will not get `totals` data from the remote server. - * - * Another case: - * if there is WITH TOTALS and there is no ORDER BY, then read the data to the end, - * otherwise TOTALS is counted according to incomplete data. - */ - bool always_read_till_end = false; - - if (query.group_by_with_totals && !query.orderBy()) - always_read_till_end = true; - - if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query)) - always_read_till_end = true; - UInt64 limit_length; UInt64 limit_offset; std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context); @@ -2615,7 +2598,7 @@ void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline) return nullptr; std::cout << "TRANSFORM" << std::endl; return std::make_shared( - header, limit_length, limit_offset, 1, always_read_till_end, query.limit_with_ties, order_descr); + header, limit_offset, 1, true, query.limit_with_ties, order_descr); }); } } diff --git a/src/Parsers/ParserSelectQuery.cpp b/src/Parsers/ParserSelectQuery.cpp index 313f890d826..02909314199 100644 --- a/src/Parsers/ParserSelectQuery.cpp +++ b/src/Parsers/ParserSelectQuery.cpp @@ -247,11 +247,6 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { if (!exp_elem.parse(pos, limit_offset, expected)) return false; - if (s_with_ties.ignore(pos, expected)) - { - limit_with_ties_occured = true; - select_query->limit_with_ties = true; - } } /// Because TOP n in totally equals LIMIT n diff --git a/src/Processors/OffsetTransform.cpp b/src/Processors/OffsetTransform.cpp index 87574c160a6..dda36aa8323 100644 --- a/src/Processors/OffsetTransform.cpp +++ b/src/Processors/OffsetTransform.cpp @@ -10,11 +10,11 @@ namespace ErrorCodes } OffsetTransform::OffsetTransform( - const Block & header_, size_t limit_, size_t offset_, size_t num_streams, + const Block & header_, size_t offset_, size_t num_streams, bool always_read_till_end_, bool with_ties_, SortDescription description_) : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) - , limit(limit_), offset(offset_) + , offset(offset_) , always_read_till_end(always_read_till_end_) , with_ties(with_ties_), description(std::move(description_)) { @@ -107,19 +107,6 @@ IProcessor::Status OffsetTransform::prepare( if (num_finished_port_pairs == ports_data.size()) return Status::Finished; - /// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data. - /// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 - // if ((rows_read >= offset) && !previous_row_chunk && !always_read_till_end) - // { - // for (auto & input : inputs) - // input.close(); - - // for (auto & output : outputs) - // output.finish(); - - //return Status::Finished; - //} - if (has_full_port) return Status::PortFull; @@ -145,11 +132,6 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data) if (output.isFinished()) { output_finished = true; - if (!always_read_till_end) - { - input.close(); - return Status::Finished; - } } if (!output_finished && !output.canPush()) @@ -167,8 +149,9 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data) } input.setNeeded(); - if (!input.hasData()) + if (!input.hasData()) { return Status::NeedData; + } data.current_chunk = input.pull(true); @@ -177,10 +160,14 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data) if (rows_before_limit_at_least) rows_before_limit_at_least->add(rows); - /// Skip block (for 'always_read_till_end' case). - if (output_finished) + /// Process block. + + rows_read += rows; + + if (rows_read < offset) { data.current_chunk.clear(); + if (input.isFinished()) { output.finish(); @@ -192,42 +179,9 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data) return Status::NeedData; } - /// Process block. - - rows_read += rows; - - //if (rows_read <= offset) - //{ - // data.current_chunk.clear(); - // - // if (input.isFinished()) - // { - // output.finish(); - // return Status::Finished; - // } - - /// Now, we pulled from input, and it must be empty. - // input.setNeeded(); - // return Status::NeedData; - //} - - if (rows_read >= offset + rows && rows_read <= offset) - { - /// Return the whole chunk. - - /// Save the last row of current chunk to check if next block begins with the same row (for WITH TIES). - if (with_ties && rows_read == offset + limit) - previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, data.current_chunk.getNumRows() - 1); - } - else - /// This function may be heavy to execute in prepare. But it happens no more then twice, and make code simpler. + if (!(rows_read >= offset + rows)) splitChunk(data); - //bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset; - /// No more data is needed. - //if (!always_read_till_end && (rows_read >= offset) && !may_need_more_data_for_ties) - // input.close(); - output.push(std::move(data.current_chunk)); return Status::PortFull; @@ -245,32 +199,7 @@ void OffsetTransform::splitChunk(PortsData & data) static_cast(0), static_cast(offset) - static_cast(rows_read) + static_cast(num_rows)); - //size_t length = std::min( - // static_cast(rows_read) - static_cast(offset), - // static_cast(offset) - static_cast(rows_read) + static_cast(num_rows)); - - size_t length = static_cast(num_rows); - std::cout << "===========================" << std::endl - << start << " " << length << std::endl - << static_cast(rows_read) << " " << static_cast(num_rows) << std::endl - << "===========================" << std::endl; - /// check if other rows in current block equals to last one in limit - if (with_ties && length) - { - size_t current_row_num = start + length; - previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1); - - for (; current_row_num < num_rows; ++current_row_num) - { - if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num)) - { - previous_row_chunk = {}; - break; - } - } - - length = current_row_num - start; - } + size_t length = static_cast(rows_read) - static_cast(offset); if (length == num_rows) return; @@ -283,6 +212,7 @@ void OffsetTransform::splitChunk(PortsData & data) data.current_chunk.setColumns(std::move(columns), length); } + ColumnRawPtrs OffsetTransform::extractSortColumns(const Columns & columns) const { ColumnRawPtrs res; diff --git a/src/Processors/OffsetTransform.h b/src/Processors/OffsetTransform.h index 9194960cd20..66361a53684 100644 --- a/src/Processors/OffsetTransform.h +++ b/src/Processors/OffsetTransform.h @@ -19,7 +19,6 @@ class OffsetTransform : public IProcessor { private: - size_t limit; size_t offset; bool always_read_till_end; @@ -52,11 +51,11 @@ private: public: OffsetTransform( - const Block & header_, size_t limit_, size_t offset_, size_t num_streams = 1, + const Block & header_, size_t offset_, size_t num_streams = 1, bool always_read_till_end_ = false, bool with_ties_ = false, SortDescription description_ = {}); - String getName() const override { return "Limit"; } + String getName() const override { return "Offset"; } Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; Status prepare() override; /// Compatibility for TreeExecutor.