mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge pull request #11276 from ClickHouse/fix-constraints-and-insert-performance
Fix constraints for DEFAULT fields; fix constraints for temporary tables; improve performance of INSERT for small blocks.
This commit is contained in:
commit
1abaca5174
@ -164,7 +164,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
|
||||
|
||||
/// Create table
|
||||
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
|
||||
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns});
|
||||
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns}, {});
|
||||
auto storage = temporary_table.getTable();
|
||||
context.addExternalTable(data->table_name, std::move(temporary_table));
|
||||
BlockOutputStreamPtr output = storage->write(ASTPtr(), context);
|
||||
|
@ -126,7 +126,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingBool, force_optimize_skip_unused_shards_no_nested, false, "Do not apply force_optimize_skip_unused_shards for nested Distributed tables.", 0) \
|
||||
\
|
||||
M(SettingBool, input_format_parallel_parsing, true, "Enable parallel parsing for some data formats.", 0) \
|
||||
M(SettingUInt64, min_chunk_bytes_for_parallel_parsing, (1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
|
||||
M(SettingUInt64, min_chunk_bytes_for_parallel_parsing, (10 * 1024 * 1024), "The minimum chunk size in bytes, which each thread will parse in parallel.", 0) \
|
||||
\
|
||||
M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.", 0) \
|
||||
M(SettingUInt64, merge_tree_min_bytes_for_concurrent_read, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized.", 0) \
|
||||
|
@ -58,13 +58,17 @@ TemporaryTableHolder::TemporaryTableHolder(const Context & context_,
|
||||
}
|
||||
|
||||
|
||||
TemporaryTableHolder::TemporaryTableHolder(const Context & context_, const ColumnsDescription & columns, const ASTPtr & query)
|
||||
TemporaryTableHolder::TemporaryTableHolder(
|
||||
const Context & context_,
|
||||
const ColumnsDescription & columns,
|
||||
const ConstraintsDescription & constraints,
|
||||
const ASTPtr & query)
|
||||
: TemporaryTableHolder
|
||||
(
|
||||
context_,
|
||||
[&](const StorageID & table_id)
|
||||
{
|
||||
return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{constraints});
|
||||
},
|
||||
query
|
||||
)
|
||||
|
@ -21,6 +21,7 @@ class Context;
|
||||
class IDatabase;
|
||||
class Exception;
|
||||
class ColumnsDescription;
|
||||
struct ConstraintsDescription;
|
||||
|
||||
using DatabasePtr = std::shared_ptr<IDatabase>;
|
||||
using DatabaseAndTable = std::pair<DatabasePtr, StoragePtr>;
|
||||
@ -71,7 +72,11 @@ struct TemporaryTableHolder : boost::noncopyable
|
||||
TemporaryTableHolder(const Context & context, const Creator & creator, const ASTPtr & query = {});
|
||||
|
||||
/// Creates temporary table with Engine=Memory
|
||||
TemporaryTableHolder(const Context & context, const ColumnsDescription & columns, const ASTPtr & query = {});
|
||||
TemporaryTableHolder(
|
||||
const Context & context,
|
||||
const ColumnsDescription & columns,
|
||||
const ConstraintsDescription & constraints,
|
||||
const ASTPtr & query = {});
|
||||
|
||||
TemporaryTableHolder(TemporaryTableHolder && rhs);
|
||||
TemporaryTableHolder & operator = (TemporaryTableHolder && rhs);
|
||||
|
@ -103,7 +103,7 @@ public:
|
||||
Block sample = interpreter->getSampleBlock();
|
||||
NamesAndTypesList columns = sample.getNamesAndTypesList();
|
||||
|
||||
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(context, ColumnsDescription{columns});
|
||||
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(context, ColumnsDescription{columns}, ConstraintsDescription{});
|
||||
StoragePtr external_storage = external_storage_holder->getTable();
|
||||
|
||||
/** We replace the subquery with the name of the temporary table.
|
||||
|
@ -637,7 +637,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
if (create.if_not_exists && context.tryResolveStorageID({"", table_name}, Context::ResolveExternal))
|
||||
return false;
|
||||
|
||||
auto temporary_table = TemporaryTableHolder(context, properties.columns, query_ptr);
|
||||
auto temporary_table = TemporaryTableHolder(context, properties.columns, properties.constraints, query_ptr);
|
||||
context.getSessionContext().addExternalTable(table_name, std::move(temporary_table));
|
||||
return true;
|
||||
}
|
||||
|
@ -233,6 +233,21 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
else
|
||||
out = std::make_shared<PushingToViewsBlockOutputStream>(table, context, query_ptr, no_destination);
|
||||
|
||||
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
|
||||
|
||||
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
|
||||
if (const auto & constraints = table->getConstraints(); !constraints.empty())
|
||||
out = std::make_shared<CheckConstraintsBlockOutputStream>(
|
||||
query.table_id, out, out->getHeader(), table->getConstraints(), context);
|
||||
|
||||
/// Actually we don't know structure of input blocks from query/table,
|
||||
/// because some clients break insertion protocol (columns != header)
|
||||
out = std::make_shared<AddingDefaultBlockOutputStream>(
|
||||
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
|
||||
|
||||
/// It's important to squash blocks as early as possible (before other transforms),
|
||||
/// because other transforms may work inefficient if block size is small.
|
||||
|
||||
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
|
||||
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
|
||||
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch)
|
||||
@ -244,15 +259,6 @@ BlockIO InterpreterInsertQuery::execute()
|
||||
context.getSettingsRef().min_insert_block_size_bytes);
|
||||
}
|
||||
|
||||
/// Actually we don't know structure of input blocks from query/table,
|
||||
/// because some clients break insertion protocol (columns != header)
|
||||
out = std::make_shared<AddingDefaultBlockOutputStream>(
|
||||
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
|
||||
|
||||
if (const auto & constraints = table->getConstraints(); !constraints.empty())
|
||||
out = std::make_shared<CheckConstraintsBlockOutputStream>(
|
||||
query.table_id, out, query_sample_block, table->getConstraints(), context);
|
||||
|
||||
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
|
||||
out_wrapper->setProcessListElement(context.getProcessListElement());
|
||||
out = std::move(out_wrapper);
|
||||
|
@ -948,7 +948,7 @@ bool TCPHandler::receiveData(bool scalar)
|
||||
else
|
||||
{
|
||||
NamesAndTypesList columns = block.getNamesAndTypesList();
|
||||
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns});
|
||||
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
|
||||
storage = temporary_table.getTable();
|
||||
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
|
||||
}
|
||||
|
38
tests/performance/insert_select_default_small_block.xml
Normal file
38
tests/performance/insert_select_default_small_block.xml
Normal file
@ -0,0 +1,38 @@
|
||||
<test>
|
||||
<settings><max_block_size>1</max_block_size></settings>
|
||||
|
||||
<create_query>
|
||||
CREATE TABLE insert_small_block_performance
|
||||
(
|
||||
`x` String,
|
||||
`a` DEFAULT SHA256(x),
|
||||
`b` DEFAULT SHA256(toString(a)),
|
||||
`c` DEFAULT SHA256(toString(b)),
|
||||
`d` DEFAULT SHA256(toString(c)),
|
||||
`e` DEFAULT SHA256(toString(d)),
|
||||
`f` DEFAULT SHA256(toString(e)),
|
||||
`g` DEFAULT SHA256(toString(f)),
|
||||
`h` DEFAULT SHA256(toString(g)),
|
||||
`i` DEFAULT SHA256(toString(h)),
|
||||
`j` DEFAULT SHA256(toString(i)),
|
||||
`k` DEFAULT SHA256(toString(j)),
|
||||
`l` DEFAULT SHA256(toString(k)),
|
||||
`m` DEFAULT SHA256(toString(l)),
|
||||
`n` DEFAULT SHA256(toString(m)),
|
||||
`o` DEFAULT SHA256(toString(n)),
|
||||
`p` DEFAULT SHA256(toString(o)),
|
||||
`q` DEFAULT SHA256(toString(p)),
|
||||
`r` DEFAULT SHA256(toString(q)),
|
||||
`s` DEFAULT SHA256(toString(r)),
|
||||
`t` DEFAULT SHA256(toString(s)),
|
||||
`u` DEFAULT SHA256(toString(t)),
|
||||
`v` DEFAULT SHA256(toString(u)),
|
||||
`w` DEFAULT SHA256(toString(v))
|
||||
)
|
||||
ENGINE = Null;
|
||||
</create_query>
|
||||
|
||||
<query>INSERT INTO insert_small_block_performance (x) SELECT toString(number) FROM numbers(10000);</query>
|
||||
|
||||
<drop_query>DROP TABLE IF EXISTS insert_small_block_performance</drop_query>
|
||||
</test>
|
@ -0,0 +1,2 @@
|
||||
0
|
||||
0
|
29
tests/queries/0_stateless/01286_constraints_on_default.sql
Normal file
29
tests/queries/0_stateless/01286_constraints_on_default.sql
Normal file
@ -0,0 +1,29 @@
|
||||
DROP TABLE IF EXISTS default_constraints;
|
||||
CREATE TABLE default_constraints
|
||||
(
|
||||
x UInt8,
|
||||
y UInt8 DEFAULT x + 1,
|
||||
CONSTRAINT c CHECK y < 5
|
||||
) ENGINE = Memory;
|
||||
|
||||
INSERT INTO default_constraints (x) SELECT number FROM system.numbers LIMIT 5; -- { serverError 469 }
|
||||
INSERT INTO default_constraints (x) VALUES (0),(1),(2),(3),(4); -- { serverError 469 }
|
||||
|
||||
SELECT y, throwIf(NOT y < 5) FROM default_constraints;
|
||||
SELECT count() FROM default_constraints;
|
||||
|
||||
DROP TABLE default_constraints;
|
||||
|
||||
|
||||
CREATE TEMPORARY TABLE default_constraints
|
||||
(
|
||||
x UInt8,
|
||||
y UInt8 DEFAULT x + 1,
|
||||
CONSTRAINT c CHECK y < 5
|
||||
);
|
||||
|
||||
INSERT INTO default_constraints (x) SELECT number FROM system.numbers LIMIT 5; -- { serverError 469 }
|
||||
INSERT INTO default_constraints (x) VALUES (0),(1),(2),(3),(4); -- { serverError 469 }
|
||||
|
||||
SELECT y, throwIf(NOT y < 5) FROM default_constraints;
|
||||
SELECT count() FROM default_constraints;
|
Loading…
Reference in New Issue
Block a user