Speedup initialization and fixed a bug. [#CLICKHOUSE-2]

This commit is contained in:
Vitaliy Lyudvichenko 2018-04-09 20:25:37 +03:00
parent d25338582d
commit 61705acd69

View File

@ -1043,8 +1043,12 @@ protected:
String workers_path = getWorkersPath();
String current_worker_path = getCurrentWorkerNodePath();
size_t num_bad_version_errors = 0;
while (true)
{
updateConfigIfNeeded();
zkutil::Stat stat;
zookeeper->get(workers_version_path, &stat);
auto version = stat.version;
@ -1054,6 +1058,12 @@ protected:
{
LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
<< ". Postpone processing " << description);
if (unprioritized)
current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
std::this_thread::sleep_for(current_sleep_time);
num_bad_version_errors = 0;
}
else
{
@ -1068,18 +1078,18 @@ protected:
if (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
{
LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
++num_bad_version_errors;
/// Try to make fast retries
if (num_bad_version_errors > 3)
{
LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
std::this_thread::sleep_for(default_sleep_time);
}
}
else
throw zkutil::KeeperException(code);
}
if (unprioritized)
current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
std::this_thread::sleep_for(current_sleep_time);
updateConfigIfNeeded();
}
}
@ -1291,6 +1301,9 @@ protected:
bool tryProcessTable(TaskTable & task_table)
{
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false;
/// Process each partition that is present in cluster
for (const String & partition_name : task_table.ordered_partition_names)
{
@ -1302,7 +1315,6 @@ protected:
Stopwatch watch;
TasksShard expected_shards;
size_t num_failed_shards = 0;
bool previous_shard_is_instantly_finished = false;
++cluster_partition.total_tries;
@ -1337,6 +1349,7 @@ protected:
else
{
/// We have already checked that partition, but did not discover it
previous_shard_is_instantly_finished = true;
continue;
}
}