Merge pull request #18264 from ucasFL/insert-cluster

Support insert into table function cluster
This commit is contained in:
alexey-milovidov 2021-01-16 13:22:49 +03:00 committed by GitHub
commit a15092eeb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 131 additions and 25 deletions

View File

@ -14,14 +14,16 @@ Allows to access all shards in an existing cluster which configured in `remote_s
Signatures:
``` sql
cluster('cluster_name', db.table)
cluster('cluster_name', db, table)
clusterAllReplicas('cluster_name', db.table)
clusterAllReplicas('cluster_name', db, table)
cluster('cluster_name', db.table[, sharding_key])
cluster('cluster_name', db, table[, sharding_key])
clusterAllReplicas('cluster_name', db.table[, sharding_key])
clusterAllReplicas('cluster_name', db, table[, sharding_key])
```
`cluster_name` Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
`sharding_key` - When insert into cluster function with more than one shard, sharding_key need to be provided.
Using the `cluster` and `clusterAllReplicas` table functions are less efficient than creating a `Distributed` table because in this case, the server connection is re-established for every request. When processing a large number of queries, please always create the `Distributed` table ahead of time, and dont use the `cluster` and `clusterAllReplicas` table functions.
The `cluster` and `clusterAllReplicas` table functions can be useful in the following cases:

View File

@ -10,13 +10,14 @@ Allows you to access remote servers without creating a `Distributed` table.
Signatures:
``` sql
remote('addresses_expr', db, table[, 'user'[, 'password']])
remote('addresses_expr', db.table[, 'user'[, 'password']])
remoteSecure('addresses_expr', db, table[, 'user'[, 'password']])
remoteSecure('addresses_expr', db.table[, 'user'[, 'password']])
remote('addresses_expr', db, table[, 'user'[, 'password'], sharding_key])
remote('addresses_expr', db.table[, 'user'[, 'password'], sharding_key])
remoteSecure('addresses_expr', db, table[, 'user'[, 'password'], sharding_key])
remoteSecure('addresses_expr', db.table[, 'user'[, 'password'], sharding_key])
```
`addresses_expr` An expression that generates addresses of remote servers. This may be just one server address. The server address is `host:port`, or just `host`. The host can be specified as the server name, or as the IPv4 or IPv6 address. An IPv6 address is specified in square brackets. The port is the TCP port on the remote server. If the port is omitted, it uses `tcp_port` from the servers config file (by default, 9000).
`sharding_key` - We can specify sharding key to support distributing data across nodes. For example: `insert into remote('127.0.0.1:9000,127.0.0.2', db, table, 'default', rand())`.
!!! important "Important"
The port is required for an IPv6 address.

View File

@ -352,7 +352,10 @@ DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobRep
/// Forward user settings
job.local_context = std::make_unique<Context>(context);
InterpreterInsertQuery interp(query_ast, *job.local_context);
/// InterpreterInsertQuery is modifying the AST, but the same AST is also used to insert to remote shards.
auto copy_query_ast = query_ast->clone();
InterpreterInsertQuery interp(copy_query_ast, *job.local_context);
auto block_io = interp.execute();
job.stream = block_io.out;

View File

@ -37,7 +37,7 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Cont
ASTs & args = args_func.at(0)->children;
const size_t max_args = is_cluster_function ? 3 : 5;
const size_t max_args = is_cluster_function ? 4 : 6;
if (args.size() < 2 || args.size() > max_args)
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -50,16 +50,17 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Cont
size_t arg_num = 0;
auto get_string_literal = [](const IAST & node, const char * description)
auto get_string_literal = [](const IAST & node, String & res)
{
const auto * lit = node.as<ASTLiteral>();
if (!lit)
throw Exception(description + String(" must be string literal (in single quotes)."), ErrorCodes::BAD_ARGUMENTS);
return false;
if (lit->value.getType() != Field::Types::String)
throw Exception(description + String(" must be string literal (in single quotes)."), ErrorCodes::BAD_ARGUMENTS);
return false;
return safeGet<const String &>(lit->value);
res = safeGet<const String &>(lit->value);
return true;
};
if (is_cluster_function)
@ -70,7 +71,10 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Cont
else
{
if (!tryGetIdentifierNameInto(args[arg_num], cluster_name))
cluster_description = get_string_literal(*args[arg_num], "Hosts pattern");
{
if (!get_string_literal(*args[arg_num], cluster_description))
throw Exception("Hosts pattern must be string literal (in single quotes).", ErrorCodes::BAD_ARGUMENTS);
}
}
++arg_num;
@ -110,20 +114,38 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Cont
}
}
/// Cluster function may have sharding key for insert
if (is_cluster_function && arg_num < args.size())
{
sharding_key = args[arg_num];
++arg_num;
}
/// Username and password parameters are prohibited in cluster version of the function
if (!is_cluster_function)
{
if (arg_num < args.size())
{
username = get_string_literal(*args[arg_num], "Username");
if (!get_string_literal(*args[arg_num], username))
{
username = "default";
sharding_key = args[arg_num];
}
++arg_num;
}
else
username = "default";
if (arg_num < args.size())
if (arg_num < args.size() && !sharding_key)
{
password = get_string_literal(*args[arg_num], "Password");
if (!get_string_literal(*args[arg_num], password))
{
sharding_key = args[arg_num];
}
++arg_num;
}
if (arg_num < args.size() && !sharding_key)
{
sharding_key = args[arg_num];
++arg_num;
}
}
@ -208,7 +230,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, con
remote_table_function_ptr,
String{},
context,
ASTPtr{},
sharding_key,
String{},
String{},
DistributedSettings{},
@ -222,7 +244,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, con
remote_table_id.table_name,
String{},
context,
ASTPtr{},
sharding_key,
String{},
String{},
DistributedSettings{},
@ -243,9 +265,12 @@ TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_
: name{name_}, secure{secure_}
{
is_cluster_function = (name == "cluster" || name == "clusterAllReplicas");
help_message = fmt::format("Table function '{}' requires from 2 to {} parameters: "
"<addresses pattern or cluster name>, <name of remote database>, <name of remote table>{}",
name, is_cluster_function ? 3 : 5, is_cluster_function ? "" : ", [username, [password]].");
help_message = fmt::format(
"Table function '{}' requires from 2 to {} parameters: "
"<addresses pattern or cluster name>, <name of remote database>, <name of remote table>{}",
name,
is_cluster_function ? 4 : 6,
is_cluster_function ? " [, sharding_key]" : " [, username[, password], sharding_key]");
}

View File

@ -40,6 +40,7 @@ private:
ClusterPtr cluster;
StorageID remote_table_id = StorageID::createEmpty();
ASTPtr remote_table_function_ptr;
ASTPtr sharding_key = nullptr;
};
}

View File

@ -0,0 +1,50 @@
0
0
0
1
1
1
2
2
2
3
3
3
4
4
4
5
5
5
6
6
6
7
7
7
8
8
8
9
9
9
0
0
1
1
2
2
3
3
4
4
5
5
6
6
7
7
8
8
9
9

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS x;
DROP TABLE IF EXISTS y;
CREATE TABLE x AS system.numbers ENGINE = MergeTree ORDER BY number;
CREATE TABLE y AS system.numbers ENGINE = MergeTree ORDER BY number;
-- Just one shard, sharding key isn't necessary
INSERT INTO FUNCTION cluster('test_shard_localhost', currentDatabase(), x) SELECT * FROM numbers(10);
INSERT INTO FUNCTION cluster('test_shard_localhost', currentDatabase(), x, rand()) SELECT * FROM numbers(10);
-- More than one shard, sharding key is necessary
INSERT INTO FUNCTION cluster('test_cluster_two_shards_localhost', currentDatabase(), x) SELECT * FROM numbers(10); --{ serverError 55 }
INSERT INTO FUNCTION cluster('test_cluster_two_shards_localhost', currentDatabase(), x, rand()) SELECT * FROM numbers(10);
INSERT INTO FUNCTION remote('127.0.0.{1,2}', currentDatabase(), y, 'default') SELECT * FROM numbers(10); -- { serverError 55 }
INSERT INTO FUNCTION remote('127.0.0.{1,2}', currentDatabase(), y, 'default', rand()) SELECT * FROM numbers(10);
SELECT * FROM x ORDER BY number;
SELECT * FROM remote('127.0.0.{1,2}', currentDatabase(), y) ORDER BY number;
DROP TABLE x;
DROP TABLE y;

View File

@ -180,6 +180,7 @@
01558_ttest_scipy
01561_mann_whitney_scipy
01601_custom_tld
01602_insert_into_table_function_cluster
01636_nullable_fuzz2
01639_distributed_sync_insert_zero_rows
01644_distributed_async_insert_fsync_smoke