remote() table function accepts cluster name identifier as the 1st argument. [#CLICKHOUSE-2]

Add cluster(cluster_name, db, table) table function.
This commit is contained in:
Vitaliy Lyudvichenko 2018-02-12 23:27:14 +03:00 committed by alexey-milovidov
parent d5fd8b9b5f
commit 40ac028e21
4 changed files with 84 additions and 28 deletions

View File

@ -185,18 +185,18 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
{
ASTs & args_func = typeid_cast<ASTFunction &>(*ast_function).children;
const char * err = "Table function 'remote' requires from 2 to 5 parameters: "
"addresses pattern, name of remote database, name of remote table, [username, [password]].";
const size_t max_args = is_cluster_function ? 3 : 5;
if (args_func.size() != 1)
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() < 2 || args.size() > 5)
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (args.size() < 2 || args.size() > max_args)
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String description;
String cluster_name;
String cluster_description;
String remote_database;
String remote_table;
String username;
@ -216,7 +216,18 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
return safeGet<const String &>(lit->value);
};
description = getStringLiteral(*args[arg_num], "Hosts pattern");
if (is_cluster_function)
{
ASTPtr ast_name = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
cluster_name = static_cast<const ASTLiteral &>(*ast_name).value.safeGet<const String &>();
}
else
{
if (auto ast_cluster = typeid_cast<const ASTIdentifier *>(args[arg_num].get()))
cluster_name = ast_cluster->name;
else
cluster_description = getStringLiteral(*args[arg_num], "Hosts pattern");
}
++arg_num;
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
@ -233,29 +244,33 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
else
{
if (arg_num >= args.size())
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
args[arg_num] = evaluateConstantExpressionOrIdentifierAsLiteral(args[arg_num], context);
remote_table = static_cast<const ASTLiteral &>(*args[arg_num]).value.safeGet<String>();
++arg_num;
}
if (arg_num < args.size())
/// Username and password parameters are prohibited in cluster version of the function
if (!is_cluster_function)
{
username = getStringLiteral(*args[arg_num], "Username");
++arg_num;
}
else
username = "default";
if (arg_num < args.size())
{
username = getStringLiteral(*args[arg_num], "Username");
++arg_num;
}
else
username = "default";
if (arg_num < args.size())
{
password = getStringLiteral(*args[arg_num], "Password");
++arg_num;
if (arg_num < args.size())
{
password = getStringLiteral(*args[arg_num], "Password");
++arg_num;
}
}
if (arg_num < args.size())
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(help_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
/// ExpressionAnalyzer will be created in InterpreterSelectQuery that will meet these `Identifier` when processing the request.
/// We need to mark them as the name of the database or table, because the default value is column.
@ -265,16 +280,26 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses;
std::vector<std::vector<String>> names;
std::vector<String> shards = parseDescription(description, 0, description.size(), ',', max_addresses);
ClusterPtr cluster;
if (!cluster_name.empty())
{
/// Use an existing cluster from the main config
cluster = context.getCluster(cluster_name);
}
else
{
/// Create new cluster from the scratch
std::vector<String> shards = parseDescription(cluster_description, 0, cluster_description.size(), ',', max_addresses);
for (size_t i = 0; i < shards.size(); ++i)
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|', max_addresses));
std::vector<std::vector<String>> names;
for (size_t i = 0; i < shards.size(); ++i)
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|', max_addresses));
if (names.empty())
throw Exception("Shard list is empty after parsing first argument", ErrorCodes::BAD_ARGUMENTS);
if (names.empty())
throw Exception("Shard list is empty after parsing first argument", ErrorCodes::BAD_ARGUMENTS);
auto cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password, context.getTCPPort());
cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password, context.getTCPPort());
}
auto res = StorageDistributed::createWithOwnCluster(
getName(),
@ -288,9 +313,23 @@ StoragePtr TableFunctionRemote::execute(const ASTPtr & ast_function, const Conte
}
TableFunctionRemote::TableFunctionRemote(const std::string & name_)
: name(name_)
{
is_cluster_function = name == "cluster";
std::stringstream ss;
ss << "Table function '" << name + "' requires from 2 to " << (is_cluster_function ? 3 : 5) << " parameters"
<< ": <addresses pattern or cluster name>, <name of remote database>, <name of remote table>"
<< (is_cluster_function ? "" : ", [username, [password]].");
help_message = ss.str();
}
void registerTableFunctionRemote(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionRemote>();
factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote"); });
factory.registerFunction("cluster", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster"); });
}
}

View File

@ -11,13 +11,23 @@ namespace DB
* For example
* SELECT count() FROM remote('example01-01-1', merge, hits) - go to `example01-01-1`, in the merge database, the hits table.
* An expression that generates a set of shards and replicas can also be specified as the host name - see below.
* Also, there is a cluster version of the function: cluster('existing_cluster_name', 'db', 'table')
*/
class TableFunctionRemote : public ITableFunction
{
public:
static constexpr auto name = "remote";
explicit TableFunctionRemote(const std::string & name_ = "remote");
std::string getName() const override { return name; }
StoragePtr execute(const ASTPtr & ast_function, const Context & context) const override;
private:
std::string name;
bool is_cluster_function;
std::string help_message;
};
}

View File

@ -6,3 +6,6 @@
0
0
0
0
0
0

View File

@ -20,3 +20,7 @@ fi
$CLICKHOUSE_CLIENT -q "SELECT * FROM remote('${CLICKHOUSE_HOST}', system, one);"
$CLICKHOUSE_CLIENT -q "SELECT * FROM remote('${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_TCP}', system, one);"
$CLICKHOUSE_CLIENT -q "SELECT * FROM remote(test_shard_localhost, system, one);"
$CLICKHOUSE_CLIENT -q "SELECT * FROM remote(test_shard_localhost, system, one, 'default', '');"
$CLICKHOUSE_CLIENT -q "SELECT * FROM cluster('test_shard_localhost', system, one);"