diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index aa3dc113e44..e1d7e4d2a79 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -184,12 +184,13 @@ private: // synchronous log flushing for SYSTEM FLUSH LOGS. uint64_t queue_front_index = 0; bool is_shutdown = false; + // A flag that says we must create the tables even if the queue is empty. bool is_force_prepare_tables = false; std::condition_variable flush_event; // Requested to flush logs up to this index, exclusive - uint64_t requested_flush_before = 0; + uint64_t requested_flush_up_to = 0; // Flushed log up to this index, exclusive - uint64_t flushed_before = 0; + uint64_t flushed_up_to = 0; // Logged overflow message at this queue front index uint64_t logged_queue_full_at_index = -1; @@ -267,8 +268,8 @@ void SystemLog::add(const LogElement & element) // It is enough to only wake the flushing thread once, after the message // count increases past half available size. const uint64_t queue_end = queue_front_index + queue.size(); - if (requested_flush_before < queue_end) - requested_flush_before = queue_end; + if (requested_flush_up_to < queue_end) + requested_flush_up_to = queue_end; flush_event.notify_all(); } @@ -304,24 +305,34 @@ void SystemLog::add(const LogElement & element) template void SystemLog::flush(bool force) { - std::unique_lock lock(mutex); + uint64_t this_thread_requested_offset; - if (is_shutdown) - return; - - const uint64_t queue_end = queue_front_index + queue.size(); - - is_force_prepare_tables = force; - if (requested_flush_before < queue_end || force) { - requested_flush_before = queue_end; + std::unique_lock lock(mutex); + + if (is_shutdown) + return; + + this_thread_requested_offset = queue_front_index + queue.size(); + + // Publish our flush request, taking care not to overwrite the requests + // made by other threads. + is_force_prepare_tables |= force; + requested_flush_up_to = std::max(requested_flush_up_to, + this_thread_requested_offset); + flush_event.notify_all(); } + LOG_DEBUG(log, "Requested flush up to offset {}", + this_thread_requested_offset); + // Use an arbitrary timeout to avoid endless waiting. const int timeout_seconds = 60; + std::unique_lock lock(mutex); bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), - [&] { return flushed_before >= queue_end && !is_force_prepare_tables; }); + [&] { return flushed_up_to >= this_thread_requested_offset + && !is_force_prepare_tables; }); if (!result) { @@ -371,6 +382,8 @@ void SystemLog::savingThreadFunction() // The end index (exclusive, like std end()) of the messages we are // going to flush. uint64_t to_flush_end = 0; + // Should we prepare table even if there are no new messages. + bool should_prepare_tables_anyway = false; { std::unique_lock lock(mutex); @@ -378,7 +391,7 @@ void SystemLog::savingThreadFunction() std::chrono::milliseconds(flush_interval_milliseconds), [&] () { - return requested_flush_before > flushed_before || is_shutdown || is_force_prepare_tables; + return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables; } ); @@ -389,18 +402,14 @@ void SystemLog::savingThreadFunction() to_flush.resize(0); queue.swap(to_flush); + should_prepare_tables_anyway = is_force_prepare_tables; + exit_this_thread = is_shutdown; } if (to_flush.empty()) { - bool force; - { - std::lock_guard lock(mutex); - force = is_force_prepare_tables; - } - - if (force) + if (should_prepare_tables_anyway) { prepareTable(); LOG_TRACE(log, "Table created (force)"); @@ -429,7 +438,8 @@ void SystemLog::flushImpl(const std::vector & to_flush, { try { - LOG_TRACE(log, "Flushing system log, {} entries to flush", to_flush.size()); + LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}", + to_flush.size(), to_flush_end); /// We check for existence of the table and create it as needed at every /// flush. This is done to allow user to drop the table at any moment @@ -468,12 +478,12 @@ void SystemLog::flushImpl(const std::vector & to_flush, { std::lock_guard lock(mutex); - flushed_before = to_flush_end; + flushed_up_to = to_flush_end; is_force_prepare_tables = false; flush_event.notify_all(); } - LOG_TRACE(log, "Flushed system log"); + LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end); }