ClickHouse 3419 Add setting prefer_localhost_replica (#2832)

* add setting prefer_localhost_replica

* add prefer_localhost_replica setting

* fix bugs

* update setting comment

* Add test for prefer_localhost_replica

* fix bug
This commit is contained in:
Anton Popov 2018-08-10 04:27:54 +03:00 committed by alexey-milovidov
parent 973bdab77f
commit c019d732c5
6 changed files with 99 additions and 65 deletions

View File

@ -201,12 +201,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
info.weight = weight; info.weight = weight;
if (address.is_local) if (address.is_local)
{
info.local_addresses.push_back(address); info.local_addresses.push_back(address);
info.per_replica_pools = {nullptr};
}
else
{
ConnectionPoolPtr pool = std::make_shared<ConnectionPool>( ConnectionPoolPtr pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size, settings.distributed_connections_pool_size,
address.host_name, address.port, address.host_name, address.port,
@ -217,7 +213,6 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
info.pool = std::make_shared<ConnectionPoolWithFailover>( info.pool = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, settings.load_balancing, settings.connections_with_failover_max_tries); ConnectionPoolPtrs{pool}, settings.load_balancing, settings.connections_with_failover_max_tries);
info.per_replica_pools = {std::move(pool)}; info.per_replica_pools = {std::move(pool)};
}
if (weight) if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
@ -276,19 +271,10 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
Addresses shard_local_addresses; Addresses shard_local_addresses;
ConnectionPoolPtrs remote_replicas_pools;
ConnectionPoolPtrs all_replicas_pools; ConnectionPoolPtrs all_replicas_pools;
remote_replicas_pools.reserve(replica_addresses.size());
all_replicas_pools.reserve(replica_addresses.size()); all_replicas_pools.reserve(replica_addresses.size());
for (const auto & replica : replica_addresses) for (const auto & replica : replica_addresses)
{
if (replica.is_local)
{
shard_local_addresses.push_back(replica);
all_replicas_pools.emplace_back(nullptr);
}
else
{ {
auto replica_pool = std::make_shared<ConnectionPool>( auto replica_pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size, settings.distributed_connections_pool_size,
@ -297,15 +283,13 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time), ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time),
"server", replica.compression, replica.secure); "server", replica.compression, replica.secure);
remote_replicas_pools.emplace_back(replica_pool);
all_replicas_pools.emplace_back(replica_pool); all_replicas_pools.emplace_back(replica_pool);
} if (replica.is_local)
shard_local_addresses.push_back(replica);
} }
ConnectionPoolWithFailoverPtr shard_pool; ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
if (!remote_replicas_pools.empty()) all_replicas_pools, settings.load_balancing, settings.connections_with_failover_max_tries);
shard_pool = std::make_shared<ConnectionPoolWithFailover>(
std::move(remote_replicas_pools), settings.load_balancing, settings.connections_with_failover_max_tries);
if (weight) if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
@ -341,18 +325,9 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
Addresses shard_local_addresses; Addresses shard_local_addresses;
ConnectionPoolPtrs all_replicas; ConnectionPoolPtrs all_replicas;
ConnectionPoolPtrs remote_replicas;
all_replicas.reserve(current.size()); all_replicas.reserve(current.size());
remote_replicas.reserve(current.size());
for (const auto & replica : current) for (const auto & replica : current)
{
if (replica.is_local && !treat_local_as_remote)
{
shard_local_addresses.push_back(replica);
all_replicas.emplace_back(nullptr);
}
else
{ {
auto replica_pool = std::make_shared<ConnectionPool>( auto replica_pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size, settings.distributed_connections_pool_size,
@ -361,12 +336,12 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time), ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time),
"server", replica.compression, replica.secure); "server", replica.compression, replica.secure);
all_replicas.emplace_back(replica_pool); all_replicas.emplace_back(replica_pool);
remote_replicas.emplace_back(replica_pool); if (replica.is_local && !treat_local_as_remote)
} shard_local_addresses.push_back(replica);
} }
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>( ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
std::move(remote_replicas), settings.load_balancing, settings.connections_with_failover_max_tries); all_replicas, settings.load_balancing, settings.connections_with_failover_max_tries);
slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size()); slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size());
shards_info.push_back({{}, current_shard_num, default_weight, std::move(shard_local_addresses), std::move(shard_pool), shards_info.push_back({{}, current_shard_num, default_weight, std::move(shard_local_addresses), std::move(shard_pool),

View File

@ -99,7 +99,7 @@ public:
{ {
public: public:
bool isLocal() const { return !local_addresses.empty(); } bool isLocal() const { return !local_addresses.empty(); }
bool hasRemoteConnections() const { return pool != nullptr; } bool hasRemoteConnections() const { return local_addresses.size() != per_replica_pools.size(); }
size_t getLocalNodeCount() const { return local_addresses.size(); } size_t getLocalNodeCount() const { return local_addresses.size(); }
bool hasInternalReplication() const { return has_internal_replication; } bool hasInternalReplication() const { return has_internal_replication; }

View File

@ -90,7 +90,9 @@ void SelectStreamFactory::createForShard(
res.emplace_back(std::move(stream)); res.emplace_back(std::move(stream));
}; };
if (shard_info.isLocal()) const auto & settings = context.getSettingsRef();
if (settings.prefer_localhost_replica && shard_info.isLocal())
{ {
StoragePtr main_table_storage; StoragePtr main_table_storage;
@ -106,23 +108,19 @@ void SelectStreamFactory::createForShard(
if (!main_table_storage) /// Table is absent on a local server. if (!main_table_storage) /// Table is absent on a local server.
{ {
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable); ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
if (shard_info.pool) if (shard_info.hasRemoteConnections())
{ {
LOG_WARNING( LOG_WARNING(
&Logger::get("ClusterProxy::SelectStreamFactory"), &Logger::get("ClusterProxy::SelectStreamFactory"),
"There is no table " << main_table.database << "." << main_table.table "There is no table " << main_table.database << "." << main_table.table
<< " on local replica of shard " << shard_info.shard_num << ", will try remote replicas."); << " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");
emplace_remote_stream(); emplace_remote_stream();
return;
} }
else else
{ emplace_local_stream(); /// Let it fail the usual way.
/// Let it fail the usual way.
emplace_local_stream();
return; return;
} }
}
const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get()); const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());
@ -133,7 +131,6 @@ void SelectStreamFactory::createForShard(
return; return;
} }
const Settings & settings = context.getSettingsRef();
UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries; UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
if (!max_allowed_delay) if (!max_allowed_delay)
@ -158,7 +155,7 @@ void SelectStreamFactory::createForShard(
if (!settings.fallback_to_stale_replicas_for_distributed_queries) if (!settings.fallback_to_stale_replicas_for_distributed_queries)
{ {
if (shard_info.pool) if (shard_info.hasRemoteConnections())
{ {
/// If we cannot fallback, then we cannot use local replica. Try our luck with remote replicas. /// If we cannot fallback, then we cannot use local replica. Try our luck with remote replicas.
emplace_remote_stream(); emplace_remote_stream();
@ -171,7 +168,7 @@ void SelectStreamFactory::createForShard(
ErrorCodes::ALL_REPLICAS_ARE_STALE); ErrorCodes::ALL_REPLICAS_ARE_STALE);
} }
if (!shard_info.pool) if (!shard_info.hasRemoteConnections())
{ {
/// There are no remote replicas but we are allowed to fall back to stale local replica. /// There are no remote replicas but we are allowed to fall back to stale local replica.
emplace_local_stream(); emplace_local_stream();

View File

@ -268,8 +268,10 @@ struct Settings
M(SettingUInt64, enable_conditional_computation, 0, "Enable conditional computations") \ M(SettingUInt64, enable_conditional_computation, 0, "Enable conditional computations") \
\ \
M(SettingDateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.") \ M(SettingDateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.") \
M(SettingBool, prefer_localhost_replica, 1, "1 - always send query to local replica, if it exists. 0 - choose replica to send query between local and remote ones according to load_balancing") \
M(SettingUInt64, max_fetch_partition_retries_count, 5, "Amount of retries while fetching partition from another host.") \ M(SettingUInt64, max_fetch_partition_retries_count, 5, "Amount of retries while fetching partition from another host.") \
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \ #define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT}; TYPE NAME {DEFAULT};

View File

@ -16,5 +16,17 @@
</replica> </replica>
</shard> </shard>
</test_local_cluster> </test_local_cluster>
<shard_with_local_replica>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</shard_with_local_replica>
</remote_servers> </remote_servers>
</yandex> </yandex>

View File

@ -18,6 +18,9 @@ instance_test_inserts_local_cluster = cluster.add_instance(
'instance_test_inserts_local_cluster', 'instance_test_inserts_local_cluster',
main_configs=['configs/remote_servers.xml']) main_configs=['configs/remote_servers.xml'])
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
@ -39,6 +42,20 @@ CREATE TABLE distributed (d Date, x UInt32) ENGINE = Distributed('test_cluster',
CREATE TABLE distributed_on_local (d Date, x UInt32) ENGINE = Distributed('test_local_cluster', 'default', 'local') CREATE TABLE distributed_on_local (d Date, x UInt32) ENGINE = Distributed('test_local_cluster', 'default', 'local')
''') ''')
node1.query('''
CREATE TABLE replicated(date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/replicated', 'node1', date, id, 8192)
''')
node2.query('''
CREATE TABLE replicated(date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/replicated', 'node2', date, id, 8192)
''')
node1.query('''
CREATE TABLE distributed (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica', 'default', 'replicated')
''')
node2.query('''
CREATE TABLE distributed (date Date, id UInt32) ENGINE = Distributed('shard_with_local_replica', 'default', 'replicated')
''')
yield cluster yield cluster
finally: finally:
@ -122,3 +139,34 @@ def test_inserts_local(started_cluster):
instance.query("INSERT INTO distributed_on_local VALUES ('2000-01-01', 1)") instance.query("INSERT INTO distributed_on_local VALUES ('2000-01-01', 1)")
time.sleep(0.5) time.sleep(0.5)
assert instance.query("SELECT count(*) FROM local").strip() == '1' assert instance.query("SELECT count(*) FROM local").strip() == '1'
def test_prefer_localhost_replica(started_cluster):
test_query = "SELECT * FROM distributed ORDER BY id;"
node1.query("INSERT INTO distributed VALUES (toDate('2017-06-17'), 11)")
node2.query("INSERT INTO distributed VALUES (toDate('2017-06-17'), 22)")
time.sleep(1.0)
expected_distributed = '''\
2017-06-17 11
2017-06-17 22
'''
assert TSV(node1.query(test_query)) == TSV(expected_distributed)
assert TSV(node2.query(test_query)) == TSV(expected_distributed)
with PartitionManager() as pm:
pm.partition_instances(node1, node2, action='REJECT --reject-with tcp-reset')
node1.query("INSERT INTO replicated VALUES (toDate('2017-06-17'), 33)")
node2.query("INSERT INTO replicated VALUES (toDate('2017-06-17'), 44)")
time.sleep(1.0)
expected_from_node2 = '''\
2017-06-17 11
2017-06-17 22
2017-06-17 44
'''
# Query is sent to node2, as it local and prefer_localhost_replica=1
assert TSV(node2.query(test_query)) == TSV(expected_from_node2)
expected_from_node1 = '''\
2017-06-17 11
2017-06-17 22
2017-06-17 33
'''
# Now query is sent to node1, as it higher in order
assert TSV(node2.query("SET load_balancing='in_order'; SET prefer_localhost_replica=0;" + test_query)) == TSV(expected_from_node1)