Merge pull request #19924 from amosbird/ddlworker2

Initialize MaxDDLEntryID upon restarting
This commit is contained in:
tavplubix 2021-02-03 13:22:22 +03:00 committed by GitHub
commit 50362840bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 15 deletions

View File

@ -462,6 +462,7 @@ void DDLWorker::scheduleTasks()
else
{
LOG_DEBUG(log, "Task {} ({}) has been already processed", entry_name, task->entry.query);
updateMaxDDLEntryID(*task);
}
saveTask(entry_name);
@ -680,6 +681,26 @@ void DDLWorker::enqueueTask(DDLTaskPtr task_ptr)
}
}
}
void DDLWorker::updateMaxDDLEntryID(const DDLTask & task)
{
DB::ReadBufferFromString in(task.entry_name);
DB::assertString("query-", in);
UInt64 id;
readText(id, in);
auto prev_id = max_id.load(std::memory_order_relaxed);
while (prev_id < id)
{
if (max_id.compare_exchange_weak(prev_id, id))
{
CurrentMetrics::set(CurrentMetrics::MaxDDLEntryID, id);
break;
}
}
}
void DDLWorker::processTask(DDLTask & task)
{
auto zookeeper = tryGetZooKeeper();
@ -754,21 +775,7 @@ void DDLWorker::processTask(DDLTask & task)
task.was_executed = true;
}
{
DB::ReadBufferFromString in(task.entry_name);
DB::assertString("query-", in);
UInt64 id;
readText(id, in);
auto prev_id = max_id.load(std::memory_order_relaxed);
while (prev_id < id)
{
if (max_id.compare_exchange_weak(prev_id, id))
{
CurrentMetrics::set(CurrentMetrics::MaxDDLEntryID, id);
break;
}
}
}
updateMaxDDLEntryID(task);
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.

View File

@ -129,6 +129,7 @@ private:
/// Returns non-empty DDLTaskPtr if entry parsed and the check is passed
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
void updateMaxDDLEntryID(const DDLTask & task);
void enqueueTask(DDLTaskPtr task);
void processTask(DDLTask & task);