Merge pull request #8617 from ClickHouse/kiransunkari-CLICKHOUSE-7262

Merging #8493
This commit is contained in:
alexey-milovidov 2020-01-11 12:43:07 +03:00 committed by GitHub
commit 5af9eabfdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 9 deletions

View File

@ -10,6 +10,7 @@
#include <IO/ReadHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <ext/range.h>
namespace DB
{
@ -449,18 +450,64 @@ void Cluster::initMisc()
}
}
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings) const
{
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings)};
}
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
{
return std::unique_ptr<Cluster>{ new Cluster(*this, {index}) };
return std::unique_ptr<Cluster>{ new Cluster(SubclusterTag{}, *this, {index}) };
}
std::unique_ptr<Cluster> Cluster::getClusterWithMultipleShards(const std::vector<size_t> & indices) const
{
return std::unique_ptr<Cluster>{ new Cluster(*this, indices) };
return std::unique_ptr<Cluster>{ new Cluster(SubclusterTag{}, *this, indices) };
}
Cluster::Cluster(const Cluster & from, const std::vector<size_t> & indices)
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings)
: shards_info{}, addresses_with_failover{}
{
if (from.addresses_with_failover.empty())
throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR);
std::set<std::pair<String, int>> unique_hosts;
for (size_t shard_index : ext::range(0, from.shards_info.size()))
{
const auto & replicas = from.addresses_with_failover[shard_index];
for (const auto & address : replicas)
{
if (!unique_hosts.emplace(address.host_name, address.port).second)
continue; /// Duplicate host, skip.
ShardInfo info;
if (address.is_local)
info.local_addresses.push_back(address);
ConnectionPoolPtr pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
address.host_name,
address.port,
address.default_database,
address.user,
address.password,
"server",
address.compression,
address.secure);
info.pool = std::make_shared<ConnectionPoolWithFailover>(ConnectionPoolPtrs{pool}, settings.load_balancing);
info.per_replica_pools = {std::move(pool)};
addresses_with_failover.emplace_back(Addresses{address});
shards_info.emplace_back(std::move(info));
}
}
initMisc();
}
Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector<size_t> & indices)
: shards_info{}
{
for (size_t index : indices)

View File

@ -26,7 +26,7 @@ public:
const String & username, const String & password,
UInt16 clickhouse_port, bool treat_local_as_remote, bool secure = false);
Cluster(const Cluster &) = delete;
Cluster(const Cluster &)= delete;
Cluster & operator=(const Cluster &) = delete;
/// is used to set a limit on the size of the timeout
@ -148,6 +148,9 @@ public:
/// Get a subcluster consisting of one or multiple shards - indexes by count (from 0) of the shard of this cluster.
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings) const;
private:
using SlotToShard = std::vector<UInt64>;
SlotToShard slot_to_shard;
@ -159,7 +162,12 @@ private:
void initMisc();
/// For getClusterWithMultipleShards implementation.
Cluster(const Cluster & from, const std::vector<size_t> & indices);
struct SubclusterTag {};
Cluster(SubclusterTag, const Cluster & from, const std::vector<size_t> & indices);
/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings);
String hash_of_addresses;
/// Description of the cluster shards.

View File

@ -14,6 +14,7 @@
#include <Common/parseRemoteDescription.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Core/Defines.h>
#include <ext/range.h>
#include "registerTableFunctions.h"
@ -140,7 +141,10 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
if (!cluster_name.empty())
{
/// Use an existing cluster from the main config
cluster = context.getCluster(cluster_name);
if (name != "clusterAllReplicas")
cluster = context.getCluster(cluster_name);
else
cluster = context.getCluster(cluster_name)->getClusterWithReplicasAsShards(context.getSettings());
}
else
{
@ -164,13 +168,22 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
{
size_t colon = host.find(':');
if (colon == String::npos)
context.getRemoteHostFilter().checkHostAndPort(host, toString((secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort())));
context.getRemoteHostFilter().checkHostAndPort(
host,
toString((secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort())));
else
context.getRemoteHostFilter().checkHostAndPort(host.substr(0, colon), host.substr(colon + 1));
}
}
cluster = std::make_shared<Cluster>(context.getSettings(), names, username, password, (secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()), false, secure);
cluster = std::make_shared<Cluster>(
context.getSettings(),
names,
username,
password,
(secure ? (maybe_secure_port ? *maybe_secure_port : DBMS_DEFAULT_SECURE_PORT) : context.getTCPPort()),
false,
secure);
}
auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_database, remote_table, context, remote_table_function_ptr);
@ -198,7 +211,7 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_)
: name{name_}, secure{secure_}
{
is_cluster_function = name == "cluster";
is_cluster_function = (name == "cluster" || name == "clusterAllReplicas");
std::stringstream ss;
ss << "Table function '" << name + "' requires from 2 to " << (is_cluster_function ? 3 : 5) << " parameters"
@ -213,6 +226,7 @@ void registerTableFunctionRemote(TableFunctionFactory & factory)
factory.registerFunction("remote", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote"); });
factory.registerFunction("remoteSecure", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("remote", /* secure = */ true); });
factory.registerFunction("cluster", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("cluster"); });
factory.registerFunction("clusterAllReplicas", [] () -> TableFunctionPtr { return std::make_shared<TableFunctionRemote>("clusterAllReplicas"); });
}
}

View File

@ -0,0 +1,16 @@
<yandex>
<remote_servers>
<two_shards>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</two_shards>
</remote_servers>
</yandex>

View File

@ -0,0 +1,21 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_remote(start_cluster):
assert node1.query('''SELECT hostName() FROM clusterAllReplicas("two_shards", system.one)''') == 'node1\nnode2\n'
assert node1.query('''SELECT hostName() FROM cluster("two_shards", system.one)''') == 'node1\n'