Merge pull request #51720 from arenadata/ADQM-970

Added a crash log flush to the disk after the unexpected crash.
This commit is contained in:
Han Fei 2023-07-13 10:49:07 +02:00 committed by GitHub
commit f3684f78b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 101 additions and 20 deletions

View File

@ -137,25 +137,9 @@ void SystemLogBase<LogElement>::add(const LogElement & element)
template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset;
{
std::lock_guard lock(mutex);
if (is_shutdown)
return;
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to, this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset);
uint64_t this_thread_requested_offset = notifyFlushImpl(force);
if (this_thread_requested_offset == uint64_t(-1))
return;
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
@ -174,6 +158,33 @@ void SystemLogBase<LogElement>::flush(bool force)
}
}
template <typename LogElement>
void SystemLogBase<LogElement>::notifyFlush(bool force) { notifyFlushImpl(force); }
template <typename LogElement>
uint64_t SystemLogBase<LogElement>::notifyFlushImpl(bool force)
{
uint64_t this_thread_requested_offset;
{
std::lock_guard lock(mutex);
if (is_shutdown)
return uint64_t(-1);
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= force;
requested_flush_up_to = std::max(requested_flush_up_to, this_thread_requested_offset);
flush_event.notify_all();
}
LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset);
return this_thread_requested_offset;
}
#define INSTANTIATE_SYSTEM_LOG_BASE(ELEMENT) template class SystemLogBase<ELEMENT>;
SYSTEM_LOG_ELEMENTS(INSTANTIATE_SYSTEM_LOG_BASE)

View File

@ -87,9 +87,12 @@ public:
*/
void add(const LogElement & element);
/// Flush data in the buffer to disk
/// Flush data in the buffer to disk. Block the thread until the data is stored on disk.
void flush(bool force) override;
/// Non-blocking flush data in the buffer to disk.
void notifyFlush(bool force);
String getName() const override { return LogElement::name(); }
static const char * getDefaultOrderBy() { return "event_date, event_time"; }
@ -112,6 +115,10 @@ protected:
uint64_t flushed_up_to = 0;
// Logged overflow message at this queue front index
uint64_t logged_queue_full_at_index = -1;
private:
uint64_t notifyFlushImpl(bool force);
};
}

View File

@ -173,6 +173,9 @@ static void signalHandler(int sig, siginfo_t * info, void * context)
/// This coarse method of synchronization is perfectly ok for fatal signals.
sleepForSeconds(1);
}
/// Wait for all logs flush operations
sleepForSeconds(3);
call_default_signal_handler(sig);
}

View File

@ -84,5 +84,8 @@ void collectCrashLog(Int32 signal, UInt64 thread_id, const String & query_id, co
CrashLogElement element{static_cast<time_t>(time / 1000000000), time, signal, thread_id, query_id, trace, trace_full};
crash_log_owned->add(element);
/// Notify savingThreadFunction to start flushing crash log
/// Crash log is storing in parallel with the signal processing thread.
crash_log_owned->notifyFlush(true);
}
}

View File

@ -0,0 +1,57 @@
import os
import time
import pytest
import helpers.cluster
import helpers.test_tools
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@pytest.fixture(scope="module")
def started_node():
cluster = helpers.cluster.ClickHouseCluster(__file__)
try:
node = cluster.add_instance("node", stay_alive=True)
cluster.start()
yield node
finally:
cluster.shutdown()
def send_signal(started_node, signal):
started_node.exec_in_container(
["bash", "-c", f"pkill -{signal} clickhouse"], user="root"
)
def wait_for_clickhouse_stop(started_node):
result = None
for attempt in range(60):
time.sleep(1)
pid = started_node.get_process_pid("clickhouse")
if pid is None:
result = "OK"
break
assert result == "OK", "ClickHouse process is still running"
def test_pkill(started_node):
if (
started_node.is_built_with_thread_sanitizer()
or started_node.is_built_with_address_sanitizer()
or started_node.is_built_with_memory_sanitizer()
):
pytest.skip("doesn't fit in timeouts for stacktrace generation")
crashes_count = 0
for signal in ["SEGV", "4"]:
send_signal(started_node, signal)
wait_for_clickhouse_stop(started_node)
started_node.restart_clickhouse()
crashes_count += 1
assert (
started_node.query("SELECT COUNT(*) FROM system.crash_log")
== f"{crashes_count}\n"
)