Add support for custom fetchPart timeouts

This commit is contained in:
Nicolae Vartolomei 2021-02-04 17:25:10 +00:00
parent 813092ff55
commit b153e8c190
4 changed files with 30 additions and 9 deletions

View File

@ -80,6 +80,9 @@ struct Settings;
M(UInt64, replicated_max_parallel_fetches_for_host, DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT, "Limit parallel fetches from endpoint (actually pool size).", 0) \
M(UInt64, replicated_max_parallel_sends, 0, "Limit parallel sends.", 0) \
M(UInt64, replicated_max_parallel_sends_for_table, 0, "Limit parallel sends for one table.", 0) \
M(Seconds, replicated_fetches_http_connection_timeout, 0, "HTTP connection timeout for part fetch requests. Inherited from default profile `http_connection_timeout` if not set explicitly.", 0) \
M(Seconds, replicated_fetches_http_send_timeout, 0, "HTTP send timeout for part fetch/send requests. Inherited from default profile `http_send_timeout` if not set explicitly.", 0) \
M(Seconds, replicated_fetches_http_receive_timeout, 0, "HTTP receive timeout for fetch/send part requests. Inherited from default profile `http_receive_timeout` if not set explicitly.", 0) \
M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \
M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \
M(Bool, detach_old_local_parts_when_cloning_replica, 1, "Do not remove old local parts when repairing lost replica.", 0) \

View File

@ -2178,7 +2178,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
String source_replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto timeouts = getFetchPartHTTPTimeouts(global_context);
auto [user, password] = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
@ -3111,6 +3112,23 @@ void StorageReplicatedMergeTree::exitLeaderElection()
leader_election = nullptr;
}
ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(const Context & context)
{
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context);
auto settings = getSettings();
if (settings->replicated_fetches_http_connection_timeout.changed)
timeouts.connection_timeout = settings->replicated_fetches_http_connection_timeout;
if (settings->replicated_fetches_http_send_timeout.changed)
timeouts.send_timeout = settings->replicated_fetches_http_send_timeout;
if (settings->replicated_fetches_http_receive_timeout.changed)
timeouts.receive_timeout = settings->replicated_fetches_http_receive_timeout;
return timeouts;
}
bool StorageReplicatedMergeTree::checkReplicaHavePart(const String & replica, const String & part_name)
{
auto zookeeper = getZooKeeper();
@ -3520,7 +3538,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
else
{
ReplicatedMergeTreeAddress address(zookeeper->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto timeouts = getFetchPartHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();

View File

@ -488,6 +488,8 @@ private:
/// Exchange parts.
ConnectionTimeouts getFetchPartHTTPTimeouts(const Context & context);
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);

View File

@ -68,15 +68,12 @@ def test_no_stall(started_cluster):
node2.query("SYSTEM START REPLICATED SENDS")
for _ in range(1000):
print(node3.query("SELECT result_part_name FROM system.replicated_fetches").strip().split())
print()
result = node3.query("SELECT count() FROM system.parts WHERE table = 't'").strip()
print(result)
print()
print()
print('Currently running fetches', node3.query("SELECT result_part_name FROM system.replicated_fetches").strip().split())
parts_fetched = node3.query("SELECT count() FROM system.parts WHERE table = 't'").strip()
print('parts_fetched', parts_fetched)
# Replication done.
if result == "5":
if parts_fetched == "5":
break
time.sleep(3)