From e94575e1da25cf598acb2c141e41b947ffaca596 Mon Sep 17 00:00:00 2001 From: dmitriiut Date: Sun, 21 Apr 2019 19:16:25 +0300 Subject: [PATCH] fix progress --- dbms/src/Common/ErrorCodes.cpp | 1 + dbms/src/Common/SharedBlockRowRef.h | 27 ++++++++++ dbms/src/Core/SortDescription.h | 4 +- .../DataStreams/FillingBlockInputStream.cpp | 18 +++++++ .../src/DataStreams/FillingBlockInputStream.h | 29 ++++++++++ .../src/DataStreams/LimitBlockInputStream.cpp | 43 ++------------- .../Interpreters/InterpreterSelectQuery.cpp | 54 ++++++++++++++++++- .../src/Interpreters/InterpreterSelectQuery.h | 1 + 8 files changed, 136 insertions(+), 41 deletions(-) create mode 100644 dbms/src/DataStreams/FillingBlockInputStream.cpp create mode 100644 dbms/src/DataStreams/FillingBlockInputStream.h diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 2468df697bb..ea21db02f6f 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -428,6 +428,7 @@ namespace ErrorCodes extern const int BAD_TTL_FILE = 451; extern const int WITH_TIES_WITHOUT_ORDER_BY = 452; extern const int INVALID_WITH_FILL_EXPRESSION = 453; + extern const int FILL_STEP_ZERO_VALUE = 454; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Common/SharedBlockRowRef.h b/dbms/src/Common/SharedBlockRowRef.h index c73bf5e6400..4fb24c49439 100644 --- a/dbms/src/Common/SharedBlockRowRef.h +++ b/dbms/src/Common/SharedBlockRowRef.h @@ -77,6 +77,33 @@ struct SharedBlockRowRef bool empty() const { return columns == nullptr; } size_t size() const { return empty() ? 0 : columns->size(); } + + /// gets pointers to all columns of block, which were used for ORDER BY + static ColumnRawPtrs getBlockColumns(const Block & block, const SortDescription description) + { + size_t size = description.size(); + ColumnRawPtrs res; + res.reserve(size); + + for (size_t i = 0; i < size; ++i) + { + const IColumn * column = !description[i].column_name.empty() + ? block.getByName(description[i].column_name).column.get() + : block.safeGetByPosition(description[i].column_number).column.get(); + res.emplace_back(column); + } + + return res; + } + + + static void setSharedBlockRowRef(SharedBlockRowRef &row_ref, SharedBlockPtr shared_block, ColumnRawPtrs *columns, + size_t row_num) + { + row_ref.row_num = row_num; + row_ref.columns = columns; + row_ref.shared_block = shared_block; + } }; diff --git a/dbms/src/Core/SortDescription.h b/dbms/src/Core/SortDescription.h index 1cccbc2d766..e0c658e61a4 100644 --- a/dbms/src/Core/SortDescription.h +++ b/dbms/src/Core/SortDescription.h @@ -22,8 +22,8 @@ struct SortColumnDescription std::shared_ptr collator; /// Collator for locale-specific comparison of strings bool with_fill; /// If true, all missed values in range [FROM, TO] will be filled /// Range [FROM, TO] respects sorting direction - double fill_from; /// Value >= FROM - double fill_to; /// Value + STEP <= TO + double fill_from; /// Fill value >= FROM + double fill_to; /// Fill value + STEP <= TO double fill_step; /// Default = 1 SortColumnDescription( diff --git a/dbms/src/DataStreams/FillingBlockInputStream.cpp b/dbms/src/DataStreams/FillingBlockInputStream.cpp new file mode 100644 index 00000000000..e8355204dff --- /dev/null +++ b/dbms/src/DataStreams/FillingBlockInputStream.cpp @@ -0,0 +1,18 @@ +#include "FillingBlockInputStream.h" + +FillingBlockInputStream::FillingBlockInputStream( + const BlockInputStreamPtr & input, const SortDescription & description_) + : description(description_) +{ + children.push_back(input); +} + + +Block FillingBlockInputStream::readImpl() +{ + Block res; + UInt64 rows = 0; + + + +} \ No newline at end of file diff --git a/dbms/src/DataStreams/FillingBlockInputStream.h b/dbms/src/DataStreams/FillingBlockInputStream.h new file mode 100644 index 00000000000..6afbeb1eafb --- /dev/null +++ b/dbms/src/DataStreams/FillingBlockInputStream.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/** Implements the WITH FILL part of ORDER BY operation. +*/ +class FillingBlockInputStream : public IBlockInputStream +{ +public: + FillingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & description_); + + String getName() const override { return "With fill"; } + + Block getHeader() const override { return children.at(0)->getHeader(); } + +protected: + Block readImpl() override; + +private: + const SortDescription description; + SharedBlockRowRef last_row_ref; +}; + + +} \ No newline at end of file diff --git a/dbms/src/DataStreams/LimitBlockInputStream.cpp b/dbms/src/DataStreams/LimitBlockInputStream.cpp index f3fb45f27cd..16c5c94f16c 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitBlockInputStream.cpp @@ -6,39 +6,6 @@ namespace DB { -namespace detail -{ - -/// gets pointers to all columns of block, which were used for ORDER BY -ColumnRawPtrs getBlockColumns(const Block & block, const SortDescription description) -{ - size_t size = description.size(); - ColumnRawPtrs res; - res.reserve(size); - - for (size_t i = 0; i < size; ++i) - { - const IColumn * column = !description[i].column_name.empty() - ? block.getByName(description[i].column_name).column.get() - : block.safeGetByPosition(description[i].column_number).column.get(); - res.emplace_back(column); - } - - return res; -} - - -void setSharedBlockRowRef(SharedBlockRowRef &row_ref, SharedBlockPtr shared_block, ColumnRawPtrs *columns, - size_t row_num) -{ - row_ref.row_num = row_num; - row_ref.columns = columns; - row_ref.shared_block = shared_block; -} - -} - - LimitBlockInputStream::LimitBlockInputStream( const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, @@ -69,13 +36,13 @@ Block LimitBlockInputStream::readImpl() pos += rows; SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); - ColumnRawPtrs columns = getBlockColumns(*ptr, description); + ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description); UInt64 len; for (len = 0; len < rows; ++len) { SharedBlockRowRef currentRow; - setSharedBlockRowRef(currentRow, ptr, &columns, len); + SharedBlockRowRef::setSharedBlockRowRef(currentRow, ptr, &columns, len); if (currentRow != ties_row_ref) { ties_row_ref.reset(); @@ -135,13 +102,13 @@ Block LimitBlockInputStream::readImpl() /// check if other rows in current block equals to last one in limit if (with_ties) { - ColumnRawPtrs columns = getBlockColumns(*ptr, description); - setSharedBlockRowRef(ties_row_ref, ptr, &columns, start + length - 1); + ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description); + SharedBlockRowRef::setSharedBlockRowRef(ties_row_ref, ptr, &columns, start + length - 1); for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i) { SharedBlockRowRef current_row; - setSharedBlockRowRef(current_row, ptr, &columns, i); + SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &columns, i); if (current_row == ties_row_ref) ++length; else diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 1b7cb4777e1..7e523015ccc 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -75,6 +76,7 @@ namespace ErrorCodes extern const int ARGUMENT_OUT_OF_BOUND; extern const int INVALID_LIMIT_EXPRESSION; extern const int INVALID_WITH_FILL_EXPRESSION; + extern const int FILL_STEP_ZERO_VALUE; } namespace @@ -715,6 +717,8 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt executeLimitBy(pipeline); } + executeWithFill(pipeline); + if (query.limitLength()) executePreLimit(pipeline); } @@ -840,6 +844,8 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt */ executeExtremes(pipeline); + executeWithFill(pipeline); + executeLimit(pipeline); } } @@ -915,6 +921,10 @@ static std::tuple getWithFillParameters(const ASTOrde fill_to = getWithFillFloatValue(node.fill_to, context); if (node.fill_step) fill_step = getWithFillFloatValue(node.fill_step, context); + + if (!fill_step) + throw Exception("STEP value can not be zero", ErrorCodes::FILL_STEP_ZERO_VALUE); + return {fill_from, fill_to, fill_step}; } @@ -1457,6 +1467,24 @@ SortDescription InterpreterSelectQuery::getSortDescription(const ASTSelectQuery if (order_by_elem.with_fill) { auto[fill_from, fill_to, fill_step] = getWithFillParameters(order_by_elem, context); + + if (order_by_elem.direction == -1) + { + /// if DESC, then STEP < 0, FROM > TO + fill_step = std::min(fill_step, fill_step * -1); + auto from = fill_from; + fill_from = std::max(fill_from, fill_to); + fill_to = std::min(from, fill_to); + } + else + { + /// if ASC, then STEP > 0, FROM < TO + fill_step = std::max(fill_step, fill_step * -1); + auto from = fill_from; + fill_from = std::min(fill_from, fill_to); + fill_to = std::max(from, fill_to); + } + order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator, true, fill_from, fill_to, fill_step); } @@ -1589,7 +1617,7 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline) auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context); pipeline.transform([&, limit = limit_length + limit_offset](auto & stream) { - stream = std::make_shared(stream, limit, 0, false); + stream = std::make_shared(stream, limit, 0, false, query.limit_with_ties, getSortDescription(query)); }); } } @@ -1675,6 +1703,30 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline) } +void InterpreterSelectQuery::executeWithFill(Pipeline & pipeline) +{ + auto & query = getSelectQuery(); + if (query.orderBy()) + { + SortDescription order_descr = getSortDescription(query); + SortDescription fill_descr; + for (auto & desc : order_descr) + { + if (desc.with_fill) + fill_descr.push_back(desc); + } + + if (!fill_descr.size()) + return; + + pipeline.transform([&](auto & stream) + { + stream = std::make_shared(stream, fill_descr); + }); + } +} + + void InterpreterSelectQuery::executeExtremes(Pipeline & pipeline) { if (!context.getSettingsRef().extremes) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index fa53a1f5d21..2e838dbfec5 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -179,6 +179,7 @@ private: void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression); void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression); void executeOrder(Pipeline & pipeline); + void executeWithFill(Pipeline & pipeline); void executeMergeSorted(Pipeline & pipeline); void executePreLimit(Pipeline & pipeline); void executeUnion(Pipeline & pipeline);