Merge pull request #9886 from ClickHouse/add_sort_check_to_mutations

Remove order by from mutations interpreter and add check
This commit is contained in:
alesapin 2020-03-27 16:23:33 +03:00 committed by GitHub
commit 655ce5b6a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 338 additions and 19 deletions

View File

@ -0,0 +1,93 @@
#include <DataStreams/CheckSortedBlockInputStream.h>
#include <Core/SortDescription.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
CheckSortedBlockInputStream::CheckSortedBlockInputStream(
const BlockInputStreamPtr & input_,
const SortDescription & sort_description_)
: header(input_->getHeader())
, sort_description_map(addPositionsToSortDescriptions(sort_description_))
{
children.push_back(input_);
}
SortDescriptionsWithPositions
CheckSortedBlockInputStream::addPositionsToSortDescriptions(const SortDescription & sort_description)
{
SortDescriptionsWithPositions result;
result.reserve(sort_description.size());
for (SortColumnDescription description_copy : sort_description)
{
if (!description_copy.column_name.empty())
description_copy.column_number = header.getPositionByName(description_copy.column_name);
result.push_back(description_copy);
}
return result;
}
/// Compares values in columns. Columns must have equal types.
struct SortingLessOrEqualComparator
{
const SortDescriptionsWithPositions & sort_description;
explicit SortingLessOrEqualComparator(const SortDescriptionsWithPositions & sort_description_)
: sort_description(sort_description_) {}
bool operator()(const Columns & left, size_t left_index, const Columns & right, size_t right_index) const
{
for (const auto & elem : sort_description)
{
size_t column_number = elem.column_number;
const IColumn * left_col = left[column_number].get();
const IColumn * right_col = right[column_number].get();
int res = elem.direction * left_col->compareAt(left_index, right_index, *right_col, elem.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return true;
}
};
Block CheckSortedBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block || block.rows() == 0)
return block;
SortingLessOrEqualComparator less(sort_description_map);
auto block_columns = block.getColumns();
if (!last_row.empty() && !less(last_row, 0, block_columns, 0))
throw Exception("Sort order of blocks violated", ErrorCodes::LOGICAL_ERROR);
size_t rows = block.rows();
for (size_t i = 1; i < rows; ++i)
if (!less(block_columns, i - 1, block_columns, i))
throw Exception("Sort order of blocks violated", ErrorCodes::LOGICAL_ERROR);
last_row.clear();
for (size_t i = 0; i < block.columns(); ++i)
{
auto column = block_columns[i]->cloneEmpty();
column->insertFrom(*block_columns[i], rows - 1);
last_row.emplace_back(std::move(column));
}
return block;
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Core/SortDescription.h>
#include <Columns/IColumn.h>
namespace DB
{
using SortDescriptionsWithPositions = std::vector<SortColumnDescription>;
/// Streams checks that flow of blocks is sorted in the sort_description order
/// Othrewise throws exception in readImpl function.
class CheckSortedBlockInputStream : public IBlockInputStream
{
public:
CheckSortedBlockInputStream(
const BlockInputStreamPtr & input_,
const SortDescription & sort_description_);
String getName() const override { return "CheckingSorted"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Block header;
SortDescriptionsWithPositions sort_description_map;
Columns last_row;
private:
/// Just checks, that all sort_descriptions has column_number
SortDescriptionsWithPositions addPositionsToSortDescriptions(const SortDescription & sort_description);
};
}

View File

@ -0,0 +1,181 @@
#include <Core/Block.h>
#include <gtest/gtest.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/BlocksListBlockInputStream.h>
#include <DataStreams/CheckSortedBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
using namespace DB;
static SortDescription getSortDescription(const std::vector<std::string> & column_names)
{
SortDescription descr;
for (const auto & column : column_names)
{
descr.emplace_back(column, 1, 1);
}
return descr;
}
static Block getSortedBlockWithSize(
const std::vector<std::string> & columns,
size_t rows, size_t stride, size_t start)
{
ColumnsWithTypeAndName cols;
size_t size_of_row_in_bytes = columns.size() * sizeof(UInt64);
for (size_t i = 0; i * sizeof(UInt64) < size_of_row_in_bytes; i++)
{
auto column = ColumnUInt64::create(rows, 0);
for (size_t j = 0; j < rows; ++j)
{
column->getElement(j) = start;
start += stride;
}
cols.emplace_back(std::move(column), std::make_shared<DataTypeUInt64>(), columns[i]);
}
return Block(cols);
}
static Block getUnSortedBlockWithSize(const std::vector<std::string> & columns, size_t rows, size_t stride, size_t start, size_t bad_row, size_t bad_column, size_t bad_value)
{
ColumnsWithTypeAndName cols;
size_t size_of_row_in_bytes = columns.size() * sizeof(UInt64);
for (size_t i = 0; i * sizeof(UInt64) < size_of_row_in_bytes; i++)
{
auto column = ColumnUInt64::create(rows, 0);
for (size_t j = 0; j < rows; ++j)
{
if (bad_row == j && bad_column == i)
column->getElement(j) = bad_value;
else if (i < bad_column)
column->getElement(j) = 0;
else
column->getElement(j) = start;
start += stride;
}
cols.emplace_back(std::move(column), std::make_shared<DataTypeUInt64>(), columns[i]);
}
return Block(cols);
}
static Block getEqualValuesBlockWithSize(
const std::vector<std::string> & columns, size_t rows)
{
ColumnsWithTypeAndName cols;
size_t size_of_row_in_bytes = columns.size() * sizeof(UInt64);
for (size_t i = 0; i * sizeof(UInt64) < size_of_row_in_bytes; i++)
{
auto column = ColumnUInt64::create(rows, 0);
for (size_t j = 0; j < rows; ++j)
column->getElement(j) = 0;
cols.emplace_back(std::move(column), std::make_shared<DataTypeUInt64>(), columns[i]);
}
return Block(cols);
}
TEST(CheckSortedBlockInputStream, CheckGoodCase)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
BlocksList blocks;
for (size_t i = 0; i < 3; ++i)
blocks.push_back(getSortedBlockWithSize(key_columns, 10, 1, i * 10));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
CheckSortedBlockInputStream sorted(stream, sort_description);
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
EXPECT_EQ(sorted.read(), Block());
}
TEST(CheckSortedBlockInputStream, CheckBadLastRow)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
BlocksList blocks;
blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 100));
blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 200));
blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 0));
blocks.push_back(getSortedBlockWithSize(key_columns, 100, 1, 300));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
CheckSortedBlockInputStream sorted(stream, sort_description);
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
EXPECT_THROW(sorted.read(), DB::Exception);
}
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock1)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 5, 1, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
CheckSortedBlockInputStream sorted(stream, sort_description);
EXPECT_THROW(sorted.read(), DB::Exception);
}
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock2)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 99, 2, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
CheckSortedBlockInputStream sorted(stream, sort_description);
EXPECT_THROW(sorted.read(), DB::Exception);
}
TEST(CheckSortedBlockInputStream, CheckUnsortedBlock3)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
BlocksList blocks;
blocks.push_back(getUnSortedBlockWithSize(key_columns, 100, 1, 0, 50, 0, 77));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
CheckSortedBlockInputStream sorted(stream, sort_description);
EXPECT_THROW(sorted.read(), DB::Exception);
}
TEST(CheckSortedBlockInputStream, CheckEqualBlock)
{
std::vector<std::string> key_columns{"K1", "K2", "K3"};
auto sort_description = getSortDescription(key_columns);
BlocksList blocks;
blocks.push_back(getEqualValuesBlockWithSize(key_columns, 100));
blocks.push_back(getEqualValuesBlockWithSize(key_columns, 10));
blocks.push_back(getEqualValuesBlockWithSize(key_columns, 1));
BlockInputStreamPtr stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
CheckSortedBlockInputStream sorted(stream, sort_description);
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
EXPECT_NO_THROW(sorted.read());
}

View File

@ -12,13 +12,13 @@
#include <DataStreams/CreatingSetsBlockInputStream.h> #include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h> #include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h> #include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/CheckSortedBlockInputStream.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <Parsers/ASTOrderByElement.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -620,24 +620,6 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
} }
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
} }
/// We have to execute select in order of primary key
/// because we don't sort results additionaly and don't have
/// any guarantees on data order without ORDER BY. It's almost free, because we
/// have optimization for data read in primary key order.
if (ASTPtr key_expr = storage->getSortingKeyAST(); key_expr && !key_expr->children.empty())
{
ASTPtr dummy;
auto res = std::make_shared<ASTExpressionList>();
for (const auto & key_part : key_expr->children)
{
auto order_by_expr = std::make_shared<ASTOrderByElement>(1, 1, false, dummy, false, dummy, dummy, dummy);
order_by_expr->children.push_back(key_part);
res->children.push_back(order_by_expr);
}
select->setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(res));
}
return select; return select;
} }
@ -702,9 +684,17 @@ BlockInputStreamPtr MutationsInterpreter::execute(TableStructureReadLockHolder &
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
BlockInputStreamPtr in = select_interpreter->execute().in; BlockInputStreamPtr in = select_interpreter->execute().in;
auto result_stream = addStreamsForLaterStages(stages, in); auto result_stream = addStreamsForLaterStages(stages, in);
/// Sometimes we update just part of columns (for example UPDATE mutation)
/// in this case we don't read sorting key, so just we don't check anything.
if (auto sort_desc = getStorageSortDescriptionIfPossible(result_stream->getHeader()))
result_stream = std::make_shared<CheckSortedBlockInputStream>(result_stream, *sort_desc);
if (!updated_header) if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader()); updated_header = std::make_unique<Block>(result_stream->getHeader());
return result_stream; return result_stream;
} }
@ -723,4 +713,22 @@ size_t MutationsInterpreter::evaluateCommandsSize()
return std::max(prepareQueryAffectedAST(commands)->size(), mutation_ast->size()); return std::max(prepareQueryAffectedAST(commands)->size(), mutation_ast->size());
} }
std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const
{
Names sort_columns = storage->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
{
if (header.has(sort_columns[i]))
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
else
return {};
}
return sort_description;
}
} }

View File

@ -43,6 +43,8 @@ private:
ASTPtr prepareInterpreterSelectQuery(std::vector<Stage> &prepared_stages, bool dry_run); ASTPtr prepareInterpreterSelectQuery(std::vector<Stage> &prepared_stages, bool dry_run);
BlockInputStreamPtr addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const; BlockInputStreamPtr addStreamsForLaterStages(const std::vector<Stage> & prepared_stages, BlockInputStreamPtr in) const;
std::optional<SortDescription> getStorageSortDescriptionIfPossible(const Block & header) const;
StoragePtr storage; StoragePtr storage;
MutationCommands commands; MutationCommands commands;
const Context & context; const Context & context;