Move tode to to try to make the diff simpler

This commit is contained in:
Dmitry Kardymon 2023-07-20 08:55:22 +00:00
parent ff235e0f30
commit c7ab6e908a

View File

@ -121,6 +121,36 @@ void SystemLogQueue<LogElement>::push(const LogElement & element)
LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name())); LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name()));
} }
template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset = queue->notifyFlush(force);
if (this_thread_requested_offset == uint64_t(-1))
return;
queue->waitFlush(this_thread_requested_offset);
}
template <typename LogElement>
void SystemLogQueue<LogElement>::waitFlush(uint64_t expected_flushed_up_to)
{
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
{
return flushed_up_to >= expected_flushed_up_to && !is_force_prepare_tables;
});
if (!result)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded ({} s) while flushing system log '{}'.",
toString(timeout_seconds), demangle(typeid(*this).name()));
}
}
template <typename LogElement> template <typename LogElement>
uint64_t SystemLogQueue<LogElement>::notifyFlush(bool should_prepare_tables_anyway) uint64_t SystemLogQueue<LogElement>::notifyFlush(bool should_prepare_tables_anyway)
{ {
@ -145,26 +175,6 @@ uint64_t SystemLogQueue<LogElement>::notifyFlush(bool should_prepare_tables_anyw
return this_thread_requested_offset; return this_thread_requested_offset;
} }
template <typename LogElement>
void SystemLogQueue<LogElement>::waitFlush(uint64_t expected_flushed_up_to)
{
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
{
return flushed_up_to >= expected_flushed_up_to && !is_force_prepare_tables;
});
if (!result)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded ({} s) while flushing system log '{}'.",
toString(timeout_seconds), demangle(typeid(*this).name()));
}
}
template <typename LogElement> template <typename LogElement>
void SystemLogQueue<LogElement>::confirm(uint64_t to_flush_end) void SystemLogQueue<LogElement>::confirm(uint64_t to_flush_end)
{ {
@ -229,16 +239,6 @@ void SystemLogBase<LogElement>::add(const LogElement & element)
queue->push(element); queue->push(element);
} }
template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset = queue->notifyFlush(force);
if (this_thread_requested_offset == uint64_t(-1))
return;
queue->waitFlush(this_thread_requested_offset);
}
template <typename LogElement> template <typename LogElement>
void SystemLogBase<LogElement>::notifyFlush(bool force) { queue->notifyFlush(force); } void SystemLogBase<LogElement>::notifyFlush(bool force) { queue->notifyFlush(force); }