add sharding_key for remote table function

update docs

fix
This commit is contained in:
feng lv 2020-12-20 14:59:21 +00:00
parent c520d0efe3
commit 8ec695f024
5 changed files with 66 additions and 23 deletions

View File

@ -14,14 +14,10 @@ Allows to access all shards in an existing cluster which configured in `remote_s
Signatures:
``` sql
cluster('cluster_name', db.table)
cluster('cluster_name', db, table)
cluster('cluster_name', db.table, sharding_key)
cluster('cluster_name', db, table, sharding_key)
clusterAllReplicas('cluster_name', db.table)
clusterAllReplicas('cluster_name', db, table)
clusterAllReplicas('cluster_name', db.table, sharding_key)
clusterAllReplicas('cluster_name', db, table, sharding_key)
cluster('cluster_name', db.table[, sharding_key])
cluster('cluster_name', db, table[, sharding_key])
clusterAllReplicas('cluster_name', db.table[, sharding_key])
clusterAllReplicas('cluster_name', db, table[, sharding_key])
```
`cluster_name` Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.

View File

@ -10,13 +10,14 @@ Allows you to access remote servers without creating a `Distributed` table.
Signatures:
``` sql
remote('addresses_expr', db, table[, 'user'[, 'password']])
remote('addresses_expr', db.table[, 'user'[, 'password']])
remoteSecure('addresses_expr', db, table[, 'user'[, 'password']])
remoteSecure('addresses_expr', db.table[, 'user'[, 'password']])
remote('addresses_expr', db, table[, 'user'[, 'password'], sharding_key])
remote('addresses_expr', db.table[, 'user'[, 'password'], sharding_key])
remoteSecure('addresses_expr', db, table[, 'user'[, 'password'], sharding_key])
remoteSecure('addresses_expr', db.table[, 'user'[, 'password'], sharding_key])
```
`addresses_expr` An expression that generates addresses of remote servers. This may be just one server address. The server address is `host:port`, or just `host`. The host can be specified as the server name, or as the IPv4 or IPv6 address. An IPv6 address is specified in square brackets. The port is the TCP port on the remote server. If the port is omitted, it uses `tcp_port` from the servers config file (by default, 9000).
`sharding_key` - We can specify sharding key to support support distributing data across nodes. For example: `insert into remote('127.0.0.1:9000,127.0.0.2', db, table, 'default', rand())`.
!!! important "Important"
The port is required for an IPv6 address.

View File

@ -37,7 +37,7 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Cont
ASTs & args = args_func.at(0)->children;
const size_t max_args = is_cluster_function ? 4 : 5;
const size_t max_args = is_cluster_function ? 4 : 6;
if (args.size() < 2 || args.size() > max_args)
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -122,15 +122,34 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Cont
{
if (arg_num < args.size())
{
username = get_string_literal(*args[arg_num], "Username");
try
{
username = get_string_literal(*args[arg_num], "Username");
}
catch (const Exception & _ [[maybe_unused]])
{
username = "default";
sharding_key = args[arg_num];
}
++arg_num;
}
else
username = "default";
if (arg_num < args.size())
if (arg_num < args.size() && !sharding_key)
{
password = get_string_literal(*args[arg_num], "Password");
try
{
password = get_string_literal(*args[arg_num], "Password");
}
catch (const Exception & _ [[maybe_unused]])
{
sharding_key = args[arg_num];
}
++arg_num;
}
if (arg_num < args.size() && !sharding_key)
{
sharding_key = args[arg_num];
++arg_num;
}
}
@ -248,9 +267,12 @@ TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_
: name{name_}, secure{secure_}
{
is_cluster_function = (name == "cluster" || name == "clusterAllReplicas");
help_message = fmt::format("Table function '{}' requires from 2 to {} parameters: "
"<addresses pattern or cluster name>, <name of remote database>, <name of remote table>{}",
name, is_cluster_function ? 3 : 5, is_cluster_function ? "" : ", [username, [password]].");
help_message = fmt::format(
"Table function '{}' requires from 2 to {} parameters: "
"<addresses pattern or cluster name>, <name of remote database>, <name of remote table>{}",
name,
is_cluster_function ? 4 : 6,
is_cluster_function ? ", [sharding_key]" : ", [username, [password], sharding_key]");
}

View File

@ -1,20 +1,40 @@
0
0
0
0
1
1
1
1
2
2
2
2
3
3
3
3
4
4
4
4
5
5
5
5
6
6
6
6
7
7
7
7
8
8
8
8
9
9
9
9

View File

@ -1,11 +1,15 @@
DROP TABLE IF EXISTS default.x;
CREATE TABLE default.x ON CLUSTER test_shard_localhost AS system.numbers ENGINE = Log;
CREATE TABLE default.x AS system.numbers ENGINE = Log;
INSERT INTO FUNCTION cluster('test_shard_localhost', default, x) SELECT * FROM numbers(10);
-- In fact, in this case(just one shard), sharding key is not required
INSERT INTO FUNCTION cluster('test_shard_localhost', default, x, rand()) SELECT * FROM numbers(10);
INSERT INTO FUNCTION remote('localhost:59000', default, x, rand()) SELECT * FROM numbers(10);
INSERT INTO FUNCTION remote('localhost:59000', default, x, 'default', rand()) SELECT * FROM numbers(10);
SELECT * FROM default.x ORDER BY number;
DROP TABLE default.x ON CLUSTER test_shard_localhost;
DROP TABLE default.x;