Merge pull request #22761 from ClickHouse/aku/flush-log

add more messages when flushing the logs
This commit is contained in:
Alexander Kuzmenkov 2021-04-08 13:50:17 +03:00 committed by GitHub
commit dc7a2b7f88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -184,12 +184,13 @@ private:
// synchronous log flushing for SYSTEM FLUSH LOGS.
uint64_t queue_front_index = 0;
bool is_shutdown = false;
// A flag that says we must create the tables even if the queue is empty.
bool is_force_prepare_tables = false;
std::condition_variable flush_event;
// Requested to flush logs up to this index, exclusive
uint64_t requested_flush_before = 0;
uint64_t requested_flush_up_to = 0;
// Flushed log up to this index, exclusive
uint64_t flushed_before = 0;
uint64_t flushed_up_to = 0;
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;
@ -267,8 +268,8 @@ void SystemLog<LogElement>::add(const LogElement & element)
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_before < queue_end)
requested_flush_before = queue_end;
if (requested_flush_up_to < queue_end)
requested_flush_up_to = queue_end;
flush_event.notify_all();
}
@ -304,24 +305,34 @@ void SystemLog<LogElement>::add(const LogElement & element)
template <typename LogElement>
void SystemLog<LogElement>::flush(bool force)
{
std::unique_lock lock(mutex);
uint64_t this_thread_requested_offset;
if (is_shutdown)
return;
const uint64_t queue_end = queue_front_index + queue.size();
is_force_prepare_tables = force;
if (requested_flush_before < queue_end || force)
{
requested_flush_before = queue_end;
std::unique_lock lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to,
this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}",
this_thread_requested_offset);
// Use an arbitrary timeout to avoid endless waiting.
const int timeout_seconds = 60;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds),
[&] { return flushed_before >= queue_end && !is_force_prepare_tables; });
[&] { return flushed_up_to >= this_thread_requested_offset
&& !is_force_prepare_tables; });
if (!result)
{
@ -371,6 +382,8 @@ void SystemLog<LogElement>::savingThreadFunction()
// The end index (exclusive, like std end()) of the messages we are
// going to flush.
uint64_t to_flush_end = 0;
// Should we prepare table even if there are no new messages.
bool should_prepare_tables_anyway = false;
{
std::unique_lock lock(mutex);
@ -378,7 +391,7 @@ void SystemLog<LogElement>::savingThreadFunction()
std::chrono::milliseconds(flush_interval_milliseconds),
[&] ()
{
return requested_flush_before > flushed_before || is_shutdown || is_force_prepare_tables;
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
}
);
@ -389,18 +402,14 @@ void SystemLog<LogElement>::savingThreadFunction()
to_flush.resize(0);
queue.swap(to_flush);
should_prepare_tables_anyway = is_force_prepare_tables;
exit_this_thread = is_shutdown;
}
if (to_flush.empty())
{
bool force;
{
std::lock_guard lock(mutex);
force = is_force_prepare_tables;
}
if (force)
if (should_prepare_tables_anyway)
{
prepareTable();
LOG_TRACE(log, "Table created (force)");
@ -429,7 +438,8 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
{
try
{
LOG_TRACE(log, "Flushing system log, {} entries to flush", to_flush.size());
LOG_TRACE(log, "Flushing system log, {} entries to flush up to offset {}",
to_flush.size(), to_flush_end);
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment
@ -468,12 +478,12 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
{
std::lock_guard lock(mutex);
flushed_before = to_flush_end;
flushed_up_to = to_flush_end;
is_force_prepare_tables = false;
flush_event.notify_all();
}
LOG_TRACE(log, "Flushed system log");
LOG_TRACE(log, "Flushed system log up to offset {}", to_flush_end);
}