added table function for clusterAll replicas

This commit is contained in:
Kiran 2020-01-01 13:09:10 +05:30
parent 49cfe326e3
commit f51412a2a2

View File

@ -14,6 +14,7 @@
#include <Common/parseRemoteDescription.h> #include <Common/parseRemoteDescription.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Core/Defines.h> #include <Core/Defines.h>
#include <ext/range.h>
#include "registerTableFunctions.h" #include "registerTableFunctions.h"
@ -137,11 +138,30 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
setIdentifierSpecial(ast); setIdentifierSpecial(ast);
ClusterPtr cluster; ClusterPtr cluster;
if (!cluster_name.empty()) if (!cluster_name.empty() && name == "cluster")
{ {
/// Use an existing cluster from the main config /// Use an existing cluster from the main config
cluster = context.getCluster(cluster_name); cluster = context.getCluster(cluster_name);
} }
else if(!cluster_name.empty() && name == "clusterAll") {
std::vector<std::vector<String>> clusterNodes;
cluster = context.getCluster(cluster_name);
// creating a new topology for clusterAll
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
auto maybe_secure_port = context.getTCPPortSecure();
for (size_t shard_index : ext::range(0, shards_info.size()))
{
const auto & replicas = addresses_with_failovers[shard_index];
for (size_t replica_index : ext::range(0, replicas.size()))
{
std::vector<String> newNode={replicas[replica_index].host_name};
clusterNodes.push_back(newNode);
}
}
cluster = std::make_shared<Cluster>(context.getSettings(), clusterNodes, username, password, (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), false, secure);
}
else else
{ {
/// Create new cluster from the scratch /// Create new cluster from the scratch
@ -198,7 +218,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_) TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_)
: name{name_}, secure{secure_} : name{name_}, secure{secure_}
{ {
is_cluster_function = name == "cluster"; is_cluster_function = (name == "cluster" || name == "clusterAll");
std::stringstream ss; std::stringstream ss;
ss << "Table function '" << name + "' requires from 2 to " << (is_cluster_function ? 3 : 5) << " parameters" ss << "Table function '" << name + "' requires from 2 to " << (is_cluster_function ? 3 : 5) << " parameters"
@ -213,6 +233,7 @@ void registerTableFunctionRemote(TableFunctionFactory & factory)
factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote"); }); factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote"); });
factory.registerFunction("remoteSecure", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote", /* secure = */ true); }); factory.registerFunction("remoteSecure", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote", /* secure = */ true); });
factory.registerFunction("cluster", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster"); }); factory.registerFunction("cluster", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster"); });
factory.registerFunction("clusterAll", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("clusterAll"); });
} }
} }