Merge pull request #44821 from ClickHouse/fix-race-ddlworker

Fix data race in DDLWorker
This commit is contained in:
Alexey Milovidov 2023-01-02 02:15:13 +03:00 committed by GitHub
commit a2da7ce80a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 1 deletions

View File

@ -29,6 +29,7 @@ namespace ErrorCodes
extern const int DNS_ERROR;
}
HostID HostID::fromString(const String & host_port_str)
{
HostID res;
@ -36,6 +37,7 @@ HostID HostID::fromString(const String & host_port_str)
return res;
}
bool HostID::isLocalAddress(UInt16 clickhouse_port) const
{
try

View File

@ -248,6 +248,7 @@ void DDLWorker::scheduleTasks(bool reinitialized)
LOG_TRACE(log, "Don't have unfinished tasks after restarting");
else
LOG_INFO(log, "Have {} unfinished tasks, will check them", current_tasks.size());
assert(current_tasks.size() <= pool_size + (worker_pool != nullptr));
auto task_it = current_tasks.begin();
while (task_it != current_tasks.end())
@ -279,7 +280,9 @@ void DDLWorker::scheduleTasks(bool reinitialized)
task->completely_processed = true;
}
else
{
processTask(*task, zookeeper);
}
++task_it;
}
else
@ -291,6 +294,7 @@ void DDLWorker::scheduleTasks(bool reinitialized)
/// of log entry number (with leading zeros).
if (!first_failed_task_name || task->entry_name < *first_failed_task_name)
first_failed_task_name = task->entry_name;
task_it = current_tasks.erase(task_it);
}
}
@ -416,18 +420,24 @@ void DDLWorker::scheduleTasks(bool reinitialized)
DDLTaskBase & DDLWorker::saveTask(DDLTaskPtr && task)
{
current_tasks.remove_if([](const DDLTaskPtr & t) { return t->completely_processed.load(); });
/// Tasks are scheduled and executed in main thread <==> Parallel execution is disabled
assert((worker_pool != nullptr) == (1 < pool_size));
/// Parallel execution is disabled ==> All previous tasks are failed to start or finished,
/// so current tasks list must be empty when we are ready to process new one.
assert(worker_pool || current_tasks.empty());
/// Parallel execution is enabled ==> Not more than pool_size tasks are currently executing.
/// Note: If current_tasks.size() == pool_size, then all worker threads are busy,
/// so we will wait on worker_pool->scheduleOrThrowOnError(...)
assert(!worker_pool || current_tasks.size() <= pool_size);
current_tasks.emplace_back(std::move(task));
if (first_failed_task_name && *first_failed_task_name == current_tasks.back()->entry_name)
first_failed_task_name.reset();
return *current_tasks.back();
}
@ -660,8 +670,8 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
active_node->setAlreadyRemoved();
task.createSyncedNodeIfNeed(zookeeper);
task.completely_processed = true;
updateMaxDDLEntryID(task.entry_name);
task.completely_processed = true;
}