Fix constraints for DEFAULT fields; fix constraints for temporary tables; improve performance of INSERT for small blocks

This commit is contained in:
Alexey Milovidov 2020-05-29 05:08:48 +03:00
parent 04bdffd9d7
commit 23d7947549
9 changed files with 62 additions and 16 deletions

View File

@ -164,7 +164,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
/// Create table
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns});
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns}, {});
auto storage = temporary_table.getTable();
context.addExternalTable(data->table_name, std::move(temporary_table));
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);

View File

@ -58,13 +58,17 @@ TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
}
TemporaryTableHolder::TemporaryTableHolder(const Context & context_, const ColumnsDescription & columns, const ASTPtr & query)
TemporaryTableHolder::TemporaryTableHolder(
const Context & context_,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
const ASTPtr & query)
: TemporaryTableHolder
(
context_,
[&](const StorageID & table_id)
{
return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{});
return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{constraints});
},
query
)

View File

@ -21,6 +21,7 @@ class Context;
class IDatabase;
class Exception;
class ColumnsDescription;
struct ConstraintsDescription;
using DatabasePtr = std::shared_ptr<IDatabase>;
using DatabaseAndTable = std::pair<DatabasePtr, StoragePtr>;
@ -71,7 +72,11 @@ struct TemporaryTableHolder : boost::noncopyable
TemporaryTableHolder(const Context & context, const Creator & creator, const ASTPtr & query = {});
/// Creates temporary table with Engine=Memory
TemporaryTableHolder(const Context & context, const ColumnsDescription & columns, const ASTPtr & query = {});
TemporaryTableHolder(
const Context & context,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
const ASTPtr & query = {});
TemporaryTableHolder(TemporaryTableHolder && rhs);
TemporaryTableHolder & operator = (TemporaryTableHolder && rhs);

View File

@ -103,7 +103,7 @@ public:
Block sample = interpreter->getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(context, ColumnsDescription{columns});
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(context, ColumnsDescription{columns}, ConstraintsDescription{});
StoragePtr external_storage = external_storage_holder->getTable();
/** We replace the subquery with the name of the temporary table.

View File

@ -637,7 +637,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
if (create.if_not_exists && context.tryResolveStorageID({"", table_name}, Context::ResolveExternal))
return false;
auto temporary_table = TemporaryTableHolder(context, properties.columns, query_ptr);
auto temporary_table = TemporaryTableHolder(context, properties.columns, properties.constraints, query_ptr);
context.getSessionContext().addExternalTable(table_name, std::move(temporary_table));
return true;
}

View File

@ -233,6 +233,21 @@ BlockIO InterpreterInsertQuery::execute()
else
out = std::make_shared<PushingToViewsBlockOutputStream>(table, context, query_ptr, no_destination);
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(
query.table_id, out, out->getHeader(), table->getConstraints(), context);
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch)
@ -244,15 +259,6 @@ BlockIO InterpreterInsertQuery::execute()
context.getSettingsRef().min_insert_block_size_bytes);
}
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(
query.table_id, out, query_sample_block, table->getConstraints(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
out_wrapper->setProcessListElement(context.getProcessListElement());
out = std::move(out_wrapper);

View File

@ -948,7 +948,7 @@ bool TCPHandler::receiveData(bool scalar)
else
{
NamesAndTypesList columns = block.getNamesAndTypesList();
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns});
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
storage = temporary_table.getTable();
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
}

View File

@ -0,0 +1,2 @@
0
0

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS default_constraints;
CREATE TABLE default_constraints
(
x UInt8,
y UInt8 DEFAULT x + 1,
CONSTRAINT c CHECK y < 5
) ENGINE = Memory;
INSERT INTO default_constraints (x) SELECT number FROM system.numbers LIMIT 5; -- { serverError 469 }
INSERT INTO default_constraints (x) VALUES (0),(1),(2),(3),(4); -- { serverError 469 }
SELECT y, throwIf(NOT y < 5) FROM default_constraints;
SELECT count() FROM default_constraints;
DROP TABLE default_constraints;
CREATE TEMPORARY TABLE default_constraints
(
x UInt8,
y UInt8 DEFAULT x + 1,
CONSTRAINT c CHECK y < 5
);
INSERT INTO default_constraints (x) SELECT number FROM system.numbers LIMIT 5; -- { serverError 469 }
INSERT INTO default_constraints (x) VALUES (0),(1),(2),(3),(4); -- { serverError 469 }
SELECT y, throwIf(NOT y < 5) FROM default_constraints;
SELECT count() FROM default_constraints;