fix tests, rework recreation tables conditions, add log about ignored logs

This commit is contained in:
Sema Checherinda 2024-07-31 23:28:03 +02:00
parent 8e5577ad8f
commit 86267418f9
7 changed files with 105 additions and 78 deletions

View File

@ -164,5 +164,3 @@ template <typename... Args>
constexpr void UNUSED(Args &&... args [[maybe_unused]]) // NOLINT(cppcoreguidelines-missing-std-forward)
{
}
#define LOGICAL_IF_THEN(A, B) (!(A) || !!(B))

View File

@ -1171,6 +1171,8 @@ void BackupsWorker::waitAll()
for (const auto & id : current_operations)
wait(id, /* rethrow_exception= */ false);
backup_log->flush(backup_log->getLastLogIndex());
LOG_INFO(log, "Backups and restores finished");
}

View File

@ -88,31 +88,18 @@ void SystemLogQueue<LogElement>::push(LogElement&& element)
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const auto last_log_index = queue_front_index + queue.size();
requested_flush_index = std::max(requested_flush_index, last_log_index);
flush_event.notify_all();
notifyFlushUnlocked(last_log_index, /* should_prepare_tables_anyway */ false);
}
if (queue.size() >= settings.max_size_rows)
{
chassert(queue.size() == settings.max_size_rows);
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}. max_size_rows {}",
demangle(typeid(*this).name()),
queue_front_index,
settings.max_size_rows);
}
// To the next batch we add a log message about how much we have lost
++ignored_logs;
return;
}
@ -133,15 +120,22 @@ void SystemLogQueue<LogElement>::handleCrash()
}
}
template <typename LogElement>
void SystemLogQueue<LogElement>::notifyFlushUnlocked(Index expected_flushed_index, bool should_prepare_tables_anyway)
{
if (should_prepare_tables_anyway)
requested_prepare_tables = std::max(requested_prepare_tables, expected_flushed_index);
requested_flush_index = std::max(requested_flush_index, expected_flushed_index);
flush_event.notify_all();
}
template <typename LogElement>
void SystemLogQueue<LogElement>::notifyFlush(SystemLogQueue<LogElement>::Index expected_flushed_index, bool should_prepare_tables_anyway)
{
std::unique_lock lock(mutex);
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
force_prepare_tables_requested |= should_prepare_tables_anyway;
requested_flush_index = std::max(requested_flush_index, expected_flushed_index);
flush_event.notify_all();
std::lock_guard lock(mutex);
notifyFlushUnlocked(expected_flushed_index, should_prepare_tables_anyway);
}
template <typename LogElement>
@ -156,18 +150,15 @@ void SystemLogQueue<LogElement>::waitFlush(SystemLogQueue<LogElement>::Index exp
std::unique_lock lock(mutex);
// there is no obligation to call notifyFlush before waitFlush, than we have to be sure that flush_event has been triggered
force_prepare_tables_requested |= should_prepare_tables_anyway;
if (requested_flush_index < expected_flushed_index)
{
requested_flush_index = expected_flushed_index;
flush_event.notify_all();
}
// there is no obligation to call notifyFlush before waitFlush, than we have to be sure that flush_event has been triggered before we wait the result
notifyFlushUnlocked(expected_flushed_index, should_prepare_tables_anyway);
auto result = confirm_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
{
const bool if_should_prepare_then_it_is_done = LOGICAL_IF_THEN(should_prepare_tables_anyway, prepare_tables_done);
return (flushed_index >= expected_flushed_index && if_should_prepare_then_it_is_done) || is_shutdown;
if (should_prepare_tables_anyway)
return (flushed_index >= expected_flushed_index && prepared_tables >= requested_prepare_tables) || is_shutdown;
else
return (flushed_index >= expected_flushed_index) || is_shutdown;
});
if (!result)
@ -194,7 +185,7 @@ template <typename LogElement>
void SystemLogQueue<LogElement>::confirm(SystemLogQueue<LogElement>::Index last_flashed_index)
{
std::lock_guard lock(mutex);
prepare_tables_done = true;
prepared_tables = std::max(prepared_tables, last_flashed_index);
flushed_index = std::max(flushed_index, last_flashed_index);
confirm_event.notify_all();
}
@ -202,25 +193,34 @@ void SystemLogQueue<LogElement>::confirm(SystemLogQueue<LogElement>::Index last_
template <typename LogElement>
typename SystemLogQueue<LogElement>::PopResult SystemLogQueue<LogElement>::pop()
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock, std::chrono::milliseconds(settings.flush_interval_milliseconds), [&] ()
{
const bool if_prepare_requested_and_it_is_not_done = force_prepare_tables_requested && !prepare_tables_done;
return requested_flush_index > flushed_index || if_prepare_requested_and_it_is_not_done || is_shutdown;
});
if (is_shutdown)
return PopResult{.is_shutdown = true};
queue_front_index += queue.size();
PopResult result;
result.logs_index = queue_front_index;
result.logs_elemets.swap(queue);
size_t prev_ignored_logs = 0;
const bool if_prepare_requested_and_it_is_not_done = force_prepare_tables_requested && !prepare_tables_done;
result.create_table_force = if_prepare_requested_and_it_is_not_done;
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock, std::chrono::milliseconds(settings.flush_interval_milliseconds), [&] ()
{
return requested_flush_index > flushed_index || requested_prepare_tables > prepared_tables || is_shutdown;
});
if (is_shutdown)
return PopResult{.is_shutdown = true};
queue_front_index += queue.size();
prev_ignored_logs = ignored_logs;
ignored_logs = 0;
result.last_log_index = queue_front_index;
result.logs.swap(queue);
result.create_table_force = requested_prepare_tables > prepared_tables;
}
if (prev_ignored_logs)
LOG_ERROR(log, "Queue had been full at {}, accepted {} logs, ignored {} logs.",
result.last_log_index - result.logs.size(),
result.logs.size(),
prev_ignored_logs);
return result;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <condition_variable>
#include <limits>
#include <memory>
#include <vector>
#include <base/types.h>
@ -54,7 +55,7 @@ struct StorageID;
class ISystemLog
{
public:
using Index = uint64_t;
using Index = int64_t;
virtual String getName() const = 0;
@ -125,8 +126,8 @@ public:
struct PopResult
{
Index logs_index = 0;
std::vector<LogElement> logs_elemets = {};
Index last_log_index = 0;
std::vector<LogElement> logs = {};
bool create_table_force = false;
bool is_shutdown = false;
};
@ -136,6 +137,8 @@ public:
void confirm(Index last_flashed_index);
private:
void notifyFlushUnlocked(Index expected_flushed_index, bool should_prepare_tables_anyway);
/// Data shared between callers of add()/flush()/shutdown(), and the saving thread
std::mutex mutex;
@ -150,18 +153,21 @@ private:
// synchronous log flushing for SYSTEM FLUSH LOGS.
Index queue_front_index = 0;
// A flag that says we must create the tables even if the queue is empty.
bool force_prepare_tables_requested = false;
bool prepare_tables_done = false;
// Requested to flush logs up to this index, exclusive
Index requested_flush_index = 0;
Index requested_flush_index = std::numeric_limits<Index>::min();
// Flushed log up to this index, exclusive
Index flushed_index = 0;
// Logged overflow message at this queue front index
Index logged_queue_full_at_index = -1;
// The same logic for the prepare tables: if requested_prepar_tables > prepared_tables we need to do prepare
// except that initial prepared_tables is -1
// it is due to the difference: when no logs have been written and we call flush logs
// it becomes in the state: requested_flush_index = 0 and flushed_index = 0 -- we do not want to do anything
// but if we need to prepare tables it becomes requested_prepare_tables = 0 and prepared_tables = -1
// we trigger background thread and do prepare
Index requested_prepare_tables = std::numeric_limits<Index>::min();
Index prepared_tables = -1;
size_t ignored_logs = 0;
bool is_shutdown = false;

View File

@ -475,14 +475,14 @@ void SystemLog<LogElement>::savingThreadFunction()
return;
}
if (!result.logs_elemets.empty())
if (!result.logs.empty())
{
flushImpl(result.logs_elemets, result.logs_index);
flushImpl(result.logs, result.last_log_index);
}
else if (result.create_table_force)
{
prepareTable();
queue->confirm(/* last_flashed_index */ 0);
queue->confirm(result.last_log_index);
}
}
catch (...)
@ -572,9 +572,6 @@ StoragePtr SystemLog<LogElement>::getStorage() const
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
if (is_prepared)
return;
String description = table_id.getNameForLogs();
auto table = getStorage();

View File

@ -4,7 +4,7 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from helpers.test_tools import assert_eq_with_retry, assert_logs_contain_with_retry, TSV
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
@ -75,6 +75,8 @@ def test_system_suspend():
def test_log_max_size(start_cluster):
# we do misconfiguration here: buffer_size_rows_flush_threshold > max_size_rows, flush_interval_milliseconds is huge
# no auto flush by size not by time has a chance
node.exec_in_container(
[
"bash",
@ -83,6 +85,7 @@ def test_log_max_size(start_cluster):
<clickhouse>
<query_log>
<flush_interval_milliseconds replace=\\"replace\\">1000000</flush_interval_milliseconds>
<buffer_size_rows_flush_threshold replace=\\"replace\\">1000000</buffer_size_rows_flush_threshold>
<max_size_rows replace=\\"replace\\">10</max_size_rows>
<reserved_size_rows replace=\\"replace\\">10</reserved_size_rows>
</query_log>
@ -91,11 +94,23 @@ def test_log_max_size(start_cluster):
""",
]
)
node.restart_clickhouse()
for i in range(10):
node.query(f"select {i}")
assert node.query("select count() >= 10 from system.query_log") == "1\n"
node.query(f"TRUNCATE TABLE IF EXISTS system.query_log")
node.restart_clickhouse()
# all logs records above max_size_rows are lost
# The accepted logs records are never flushed until system flush logs is called by us
for i in range(21):
node.query(f"select {i}")
node.query("system flush logs")
assert_logs_contain_with_retry(
node, "Queue had been full at 0, accepted 10 logs, ignored 34 logs."
)
assert node.query(
"select count() >= 10, count() < 20 from system.query_log"
) == TSV([[1, 1]])
node.exec_in_container(
["rm", f"/etc/clickhouse-server/config.d/yyy-override-query_log.xml"]
)

View File

@ -173,11 +173,20 @@ def test_drop_system_log():
node.query("system flush logs")
node.query("select 2")
node.query("system flush logs")
assert node.query("select count() > 0 from system.query_log") == "1\n"
assert node.query("select count() >= 2 from system.query_log") == "1\n"
node.query("drop table system.query_log sync")
node.query("select 3")
node.query("system flush logs")
assert node.query("select count() > 0 from system.query_log") == "1\n"
assert node.query("select count() >= 1 from system.query_log") == "1\n"
node.query("drop table system.query_log sync")
node.restart_clickhouse()
node.query("system flush logs")
assert (
node.query("select count() >= 0 from system.query_log") == "1\n"
) # we check that query_log just exists
node.exec_in_container(
["rm", f"/etc/clickhouse-server/config.d/yyy-override-query_log.xml"]
)