diff --git a/src/Common/SystemLogBase.cpp b/src/Common/SystemLogBase.cpp index 86adcbbd31b..5e9ee9a1e04 100644 --- a/src/Common/SystemLogBase.cpp +++ b/src/Common/SystemLogBase.cpp @@ -137,25 +137,9 @@ void SystemLogBase::add(const LogElement & element) template void SystemLogBase::flush(bool force) { - uint64_t this_thread_requested_offset; - - { - std::lock_guard lock(mutex); - - if (is_shutdown) - return; - - this_thread_requested_offset = queue_front_index + queue.size(); - - // Publish our flush request, taking care not to overwrite the requests - // made by other threads. - is_force_prepare_tables |= force; - requested_flush_up_to = std::max(requested_flush_up_to, this_thread_requested_offset); - - flush_event.notify_all(); - } - - LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset); + uint64_t this_thread_requested_offset = notifyFlushImpl(force); + if (this_thread_requested_offset == uint64_t(-1)) + return; // Use an arbitrary timeout to avoid endless waiting. 60s proved to be // too fast for our parallel functional tests, probably because they @@ -174,6 +158,33 @@ void SystemLogBase::flush(bool force) } } +template +void SystemLogBase::notifyFlush(bool force) { notifyFlushImpl(force); } + +template +uint64_t SystemLogBase::notifyFlushImpl(bool force) +{ + uint64_t this_thread_requested_offset; + + { + std::lock_guard lock(mutex); + if (is_shutdown) + return uint64_t(-1); + + this_thread_requested_offset = queue_front_index + queue.size(); + + // Publish our flush request, taking care not to overwrite the requests + // made by other threads. + is_force_prepare_tables |= force; + requested_flush_up_to = std::max(requested_flush_up_to, this_thread_requested_offset); + + flush_event.notify_all(); + } + + LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset); + return this_thread_requested_offset; +} + #define INSTANTIATE_SYSTEM_LOG_BASE(ELEMENT) template class SystemLogBase; SYSTEM_LOG_ELEMENTS(INSTANTIATE_SYSTEM_LOG_BASE) diff --git a/src/Common/SystemLogBase.h b/src/Common/SystemLogBase.h index f8febd8b159..92409028c22 100644 --- a/src/Common/SystemLogBase.h +++ b/src/Common/SystemLogBase.h @@ -87,9 +87,12 @@ public: */ void add(const LogElement & element); - /// Flush data in the buffer to disk + /// Flush data in the buffer to disk. Block the thread until the data is stored on disk. void flush(bool force) override; + /// Non-blocking flush data in the buffer to disk. + void notifyFlush(bool force); + String getName() const override { return LogElement::name(); } static const char * getDefaultOrderBy() { return "event_date, event_time"; } @@ -112,6 +115,10 @@ protected: uint64_t flushed_up_to = 0; // Logged overflow message at this queue front index uint64_t logged_queue_full_at_index = -1; + +private: + uint64_t notifyFlushImpl(bool force); + }; } diff --git a/src/Daemon/BaseDaemon.cpp b/src/Daemon/BaseDaemon.cpp index 319d2bc8b5b..3852ec5ada5 100644 --- a/src/Daemon/BaseDaemon.cpp +++ b/src/Daemon/BaseDaemon.cpp @@ -173,6 +173,9 @@ static void signalHandler(int sig, siginfo_t * info, void * context) /// This coarse method of synchronization is perfectly ok for fatal signals. sleepForSeconds(1); } + + /// Wait for all logs flush operations + sleepForSeconds(3); call_default_signal_handler(sig); } diff --git a/src/Interpreters/CrashLog.cpp b/src/Interpreters/CrashLog.cpp index 08c08ffecd1..379c9122cc8 100644 --- a/src/Interpreters/CrashLog.cpp +++ b/src/Interpreters/CrashLog.cpp @@ -84,5 +84,8 @@ void collectCrashLog(Int32 signal, UInt64 thread_id, const String & query_id, co CrashLogElement element{static_cast(time / 1000000000), time, signal, thread_id, query_id, trace, trace_full}; crash_log_owned->add(element); + /// Notify savingThreadFunction to start flushing crash log + /// Crash log is storing in parallel with the signal processing thread. + crash_log_owned->notifyFlush(true); } } diff --git a/tests/integration/test_crash_log/__init__.py b/tests/integration/test_crash_log/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_crash_log/test.py b/tests/integration/test_crash_log/test.py new file mode 100644 index 00000000000..9f6eca794b1 --- /dev/null +++ b/tests/integration/test_crash_log/test.py @@ -0,0 +1,57 @@ +import os +import time +import pytest + +import helpers.cluster +import helpers.test_tools + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + + +@pytest.fixture(scope="module") +def started_node(): + cluster = helpers.cluster.ClickHouseCluster(__file__) + try: + node = cluster.add_instance("node", stay_alive=True) + + cluster.start() + yield node + finally: + cluster.shutdown() + + +def send_signal(started_node, signal): + started_node.exec_in_container( + ["bash", "-c", f"pkill -{signal} clickhouse"], user="root" + ) + + +def wait_for_clickhouse_stop(started_node): + result = None + for attempt in range(60): + time.sleep(1) + pid = started_node.get_process_pid("clickhouse") + if pid is None: + result = "OK" + break + assert result == "OK", "ClickHouse process is still running" + + +def test_pkill(started_node): + if ( + started_node.is_built_with_thread_sanitizer() + or started_node.is_built_with_address_sanitizer() + or started_node.is_built_with_memory_sanitizer() + ): + pytest.skip("doesn't fit in timeouts for stacktrace generation") + + crashes_count = 0 + for signal in ["SEGV", "4"]: + send_signal(started_node, signal) + wait_for_clickhouse_stop(started_node) + started_node.restart_clickhouse() + crashes_count += 1 + assert ( + started_node.query("SELECT COUNT(*) FROM system.crash_log") + == f"{crashes_count}\n" + )