support insert into cluster function

This commit is contained in:
feng lv 2020-12-20 07:32:44 +00:00
parent fb1221148d
commit 5a6a6991f1
4 changed files with 18 additions and 3 deletions

View File

@ -37,7 +37,7 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Cont
ASTs & args = args_func.at(0)->children;
const size_t max_args = is_cluster_function ? 3 : 5;
const size_t max_args = is_cluster_function ? 4 : 5;
if (args.size() < 2 || args.size() > max_args)
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
@ -110,6 +110,13 @@ void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Cont
}
}
/// Cluster function may have sharding key for insert
if (is_cluster_function && arg_num < args.size())
{
sharding_key = args[arg_num];
++arg_num;
}
/// Username and password parameters are prohibited in cluster version of the function
if (!is_cluster_function)
{
@ -208,7 +215,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, con
remote_table_function_ptr,
String{},
context,
ASTPtr{},
sharding_key,
String{},
String{},
false,
@ -221,7 +228,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, con
remote_table_id.table_name,
String{},
context,
ASTPtr{},
sharding_key,
String{},
String{},
false,

View File

@ -40,6 +40,7 @@ private:
ClusterPtr cluster;
StorageID remote_table_id = StorageID::createEmpty();
ASTPtr remote_table_function_ptr;
ASTPtr sharding_key{};
};
}

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS default.x;
CREATE TABLE default.x ON CLUSTER test_cluster_two_shards_localhost AS system.numbers ENGINE = Log;
INSERT INTO FUNCTION cluster('test_cluster_two_shards_localhost', default, x, rand()) SELECT * FROM numbers(10);
DROP TABLE default.x ON CLUSTER test_cluster_two_shards_localhost;