Update src/Storages/PostgreSQL/PostgreSQLReplicationHandler.cpp

Co-authored-by: tavplubix <tavplubix@gmail.com>
This commit is contained in:
Kseniia Sumarokova 2021-03-20 13:50:16 +03:00 committed by GitHub
parent 6bb81630e5
commit 4e63b8e5dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -217,26 +217,19 @@ NameSet PostgreSQLReplicationHandler::loadFromSnapshot(std::string & snapshot_na
void PostgreSQLReplicationHandler::consumerFunc() void PostgreSQLReplicationHandler::consumerFunc()
{ {
auto start_time = std::chrono::steady_clock::now();
std::vector<std::pair<Int32, String>> skipped_tables; std::vector<std::pair<Int32, String>> skipped_tables;
while (!stop_synchronization) bool schedule_now = consumer->consume(skipped_tables);
{
bool reschedule = !consumer->consume(skipped_tables);
if (!skipped_tables.empty()) if (!skipped_tables.empty())
consumer->updateSkipList(reloadFromSnapshot(skipped_tables)); consumer->updateSkipList(reloadFromSnapshot(skipped_tables));
if (reschedule) if (stop_synchronization)
break; return;
auto end_time = std::chrono::steady_clock::now(); if (schedule_now)
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time); consumer_task->schedule();
if (duration.count() > max_thread_work_duration_ms) else
break;
}
if (!stop_synchronization)
consumer_task->scheduleAfter(reschedule_ms); consumer_task->scheduleAfter(reschedule_ms);
} }