diff --git a/dbms/src/DataStreams/CheckSortedBlockInputStream.cpp b/dbms/src/DataStreams/CheckSortedBlockInputStream.cpp new file mode 100644 index 00000000000..a56738a9a9f --- /dev/null +++ b/dbms/src/DataStreams/CheckSortedBlockInputStream.cpp @@ -0,0 +1,93 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + +CheckSortedBlockInputStream::CheckSortedBlockInputStream( + const BlockInputStreamPtr & input_, + const SortDescription & sort_description_) + : header(input_->getHeader()) + , sort_description_map(addPositionsToSortDescriptions(sort_description_)) +{ + children.push_back(input_); +} + +SortDescriptionsWithPositions +CheckSortedBlockInputStream::addPositionsToSortDescriptions(const SortDescription & sort_description) +{ + SortDescriptionsWithPositions result; + result.reserve(sort_description.size()); + + for (SortColumnDescription description_copy : sort_description) + { + if (!description_copy.column_name.empty()) + description_copy.column_number = header.getPositionByName(description_copy.column_name); + + result.push_back(description_copy); + } + + return result; +} + +/// Compares values in columns. Columns must have equal types. +struct SortingLessOrEqualComparator +{ + const SortDescriptionsWithPositions & sort_description; + + explicit SortingLessOrEqualComparator(const SortDescriptionsWithPositions & sort_description_) + : sort_description(sort_description_) {} + + bool operator()(const Columns & left, size_t left_index, const Columns & right, size_t right_index) const + { + for (const auto & elem : sort_description) + { + size_t column_number = elem.column_number; + + const IColumn * left_col = left[column_number].get(); + const IColumn * right_col = right[column_number].get(); + + int res = elem.direction * left_col->compareAt(left_index, right_index, *right_col, elem.nulls_direction); + if (res < 0) + return true; + else if (res > 0) + return false; + } + return true; + } +}; + +Block CheckSortedBlockInputStream::readImpl() +{ + Block block = children.back()->read(); + if (!block || block.rows() == 0) + return block; + + SortingLessOrEqualComparator less(sort_description_map); + + auto block_columns = block.getColumns(); + if (!last_row.empty() && !less(last_row, 0, block_columns, 0)) + throw Exception("Sort order of blocks violated", ErrorCodes::LOGICAL_ERROR); + + size_t rows = block.rows(); + for (size_t i = 1; i < rows; ++i) + if (!less(block_columns, i - 1, block_columns, i)) + throw Exception("Sort order of blocks violated", ErrorCodes::LOGICAL_ERROR); + + last_row.clear(); + for (size_t i = 0; i < block.columns(); ++i) + { + auto column = block_columns[i]->cloneEmpty(); + column->insertFrom(*block_columns[i], rows - 1); + last_row.emplace_back(std::move(column)); + } + + return block; +} + +} diff --git a/dbms/src/DataStreams/CheckSortedBlockInputStream.h b/dbms/src/DataStreams/CheckSortedBlockInputStream.h new file mode 100644 index 00000000000..42060befeeb --- /dev/null +++ b/dbms/src/DataStreams/CheckSortedBlockInputStream.h @@ -0,0 +1,35 @@ +#pragma once +#include +#include +#include + +namespace DB +{ +using SortDescriptionsWithPositions = std::vector; + +/// Streams checks that flow of blocks is sorted in the sort_description order +/// Othrewise throws exception in readImpl function. +class CheckSortedBlockInputStream : public IBlockInputStream +{ +public: + CheckSortedBlockInputStream( + const BlockInputStreamPtr & input_, + const SortDescription & sort_description_); + + String getName() const override { return "CheckingSorted"; } + + Block getHeader() const override { return header; } + +protected: + Block readImpl() override; + +private: + Block header; + SortDescriptionsWithPositions sort_description_map; + Columns last_row; + +private: + /// Just checks, that all sort_descriptions has column_number + SortDescriptionsWithPositions addPositionsToSortDescriptions(const SortDescription & sort_description); +}; +} diff --git a/dbms/src/DataStreams/tests/gtest_check_sorted_stream.cpp b/dbms/src/DataStreams/tests/gtest_check_sorted_stream.cpp new file mode 100644 index 00000000000..0c5cc6d58e1 --- /dev/null +++ b/dbms/src/DataStreams/tests/gtest_check_sorted_stream.cpp @@ -0,0 +1,181 @@ +#include +#include + +#include +#include +#include +#include + + +using namespace DB; + + +static SortDescription getSortDescription(const std::vector & column_names) +{ + SortDescription descr; + for (const auto & column : column_names) + { + descr.emplace_back(column, 1, 1); + } + return descr; +} + +static Block getSortedBlockWithSize( + const std::vector & columns, + size_t rows, size_t stride, size_t start) +{ + ColumnsWithTypeAndName cols; + size_t size_of_row_in_bytes = columns.size() * sizeof(UInt64); + for (size_t i = 0; i * sizeof(UInt64) < size_of_row_in_bytes; i++) + { + auto column = ColumnUInt64::create(rows, 0); + for (size_t j = 0; j < rows; ++j) + { + column->getElement(j) = start; + start += stride; + } + cols.emplace_back(std::move(column), std::make_shared(), columns[i]); + } + return Block(cols); +} + + +static Block getUnSortedBlockWithSize(const std::vector & columns, size_t rows, size_t stride, size_t start, size_t bad_row, size_t bad_column, size_t bad_value) +{ + ColumnsWithTypeAndName cols; + size_t size_of_row_in_bytes = columns.size() * sizeof(UInt64); + for (size_t i = 0; i * sizeof(UInt64) < size_of_row_in_bytes; i++) + { + auto column = ColumnUInt64::create(rows, 0); + for (size_t j = 0; j < rows; ++j) + { + if (bad_row == j && bad_column == i) + column->getElement(j) = bad_value; + else if (i < bad_column) + column->getElement(j) = 0; + else + column->getElement(j) = start; + + start += stride; + } + cols.emplace_back(std::move(column), std::make_shared(), columns[i]); + } + return Block(cols); +} + +static Block getEqualValuesBlockWithSize( + const std::vector & columns, size_t rows) +{ + ColumnsWithTypeAndName cols; + size_t size_of_row_in_bytes = columns.size() * sizeof(UInt64); + for (size_t i = 0; i * sizeof(UInt64) < size_of_row_in_bytes; i++) + { + auto column = ColumnUInt64::create(rows, 0); + for (size_t j = 0; j < rows; ++j) + column->getElement(j) = 0; + + cols.emplace_back(std::move(column), std::make_shared(), columns[i]); + } + return Block(cols); +} + + +TEST(CheckSortedBlockInputStream, CheckGoodCase) +{ + std::vector key_columns{"K1", "K2", "K3"}; + auto sort_description = getSortDescription(key_columns); + + BlocksList blocks; + for (size_t i = 0; i < 3; ++i) + blocks.push_back(getSortedBlockWithSize(key_columns, 10, 1, i * 10)); + + BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + + CheckSortedBlockInputStream sorted(stream, sort_description); + + EXPECT_NO_THROW(sorted.read()); + EXPECT_NO_THROW(sorted.read()); + EXPECT_NO_THROW(sorted.read()); + EXPECT_EQ(sorted.read(), Block()); +} + +TEST(CheckSortedBlockInputStream, CheckBadLastRow) +{ + std::vector key_columns{"K1", "K2", "K3"}; + auto sort_description = getSortDescription(key_columns); + BlocksList blocks; + blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 100)); + blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 200)); + blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 0)); + blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 300)); + + BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + + CheckSortedBlockInputStream sorted(stream, sort_description); + + + EXPECT_NO_THROW(sorted.read()); + EXPECT_NO_THROW(sorted.read()); + EXPECT_THROW(sorted.read(), DB::Exception); +} + + +TEST(CheckSortedBlockInputStream, CheckUnsortedBlock1) +{ + std::vector key_columns{"K1", "K2", "K3"}; + auto sort_description = getSortDescription(key_columns); + BlocksList blocks; + blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 5, 1, 77)); + + BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + + CheckSortedBlockInputStream sorted(stream, sort_description); + + EXPECT_THROW(sorted.read(), DB::Exception); +} + +TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2) +{ + std::vector key_columns{"K1", "K2", "K3"}; + auto sort_description = getSortDescription(key_columns); + BlocksList blocks; + blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 99, 2, 77)); + + BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + + CheckSortedBlockInputStream sorted(stream, sort_description); + + EXPECT_THROW(sorted.read(), DB::Exception); +} + +TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3) +{ + std::vector key_columns{"K1", "K2", "K3"}; + auto sort_description = getSortDescription(key_columns); + BlocksList blocks; + blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 50, 0, 77)); + + BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + + CheckSortedBlockInputStream sorted(stream, sort_description); + + EXPECT_THROW(sorted.read(), DB::Exception); +} + +TEST(CheckSortedBlockInputStream, CheckEqualBlock) +{ + std::vector key_columns{"K1", "K2", "K3"}; + auto sort_description = getSortDescription(key_columns); + BlocksList blocks; + blocks.push_back(getEqualValuesBlockWithSize(key_columns, 100)); + blocks.push_back(getEqualValuesBlockWithSize(key_columns, 10)); + blocks.push_back(getEqualValuesBlockWithSize(key_columns, 1)); + + BlockInputStreamPtr stream = std::make_shared(std::move(blocks)); + + CheckSortedBlockInputStream sorted(stream, sort_description); + + EXPECT_NO_THROW(sorted.read()); + EXPECT_NO_THROW(sorted.read()); + EXPECT_NO_THROW(sorted.read()); +} diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 9ff7d2bf126..311165066c9 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -12,13 +12,13 @@ #include #include #include +#include #include #include #include #include #include #include -#include #include @@ -620,24 +620,6 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & } select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); } - /// We have to execute select in order of primary key - /// because we don't sort results additionaly and don't have - /// any guarantees on data order without ORDER BY. It's almost free, because we - /// have optimization for data read in primary key order. - if (ASTPtr key_expr = storage->getSortingKeyAST(); key_expr && !key_expr->children.empty()) - { - ASTPtr dummy; - auto res = std::make_shared(); - for (const auto & key_part : key_expr->children) - { - auto order_by_expr = std::make_shared(1, 1, false, dummy, false, dummy, dummy, dummy); - order_by_expr->children.push_back(key_part); - - res->children.push_back(order_by_expr); - } - - select->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(res)); - } return select; } @@ -702,9 +684,17 @@ BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder & throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR); BlockInputStreamPtr in = select_interpreter->execute().in; + auto result_stream = addStreamsForLaterStages(stages, in); + + /// Sometimes we update just part of columns (for example UPDATE mutation) + /// in this case we don't read sorting key, so just we don't check anything. + if (auto sort_desc = getStorageSortDescriptionIfPossible(result_stream->getHeader())) + result_stream = std::make_shared(result_stream, *sort_desc); + if (!updated_header) updated_header = std::make_unique(result_stream->getHeader()); + return result_stream; } @@ -723,4 +713,22 @@ size_t MutationsInterpreter::evaluateCommandsSize() return std::max(prepareQueryAffectedAST(commands)->size(), mutation_ast->size()); } +std::optional MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const +{ + Names sort_columns = storage->getSortingKeyColumns(); + SortDescription sort_description; + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); + + for (size_t i = 0; i < sort_columns_size; ++i) + { + if (header.has(sort_columns[i])) + sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); + else + return {}; + } + + return sort_description; +} + } diff --git a/dbms/src/Interpreters/MutationsInterpreter.h b/dbms/src/Interpreters/MutationsInterpreter.h index c69216dfa58..a36430e201b 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.h +++ b/dbms/src/Interpreters/MutationsInterpreter.h @@ -43,6 +43,8 @@ private: ASTPtr prepareInterpreterSelectQuery(std::vector &prepared_stages, bool dry_run); BlockInputStreamPtr addStreamsForLaterStages(const std::vector & prepared_stages, BlockInputStreamPtr in) const; + std::optional getStorageSortDescriptionIfPossible(const Block & header) const; + StoragePtr storage; MutationCommands commands; const Context & context;