ClickHouse/tests/integration/test_crash_log/test.py

75 lines
2.2 KiB
Python

import os
import time
import pytest
import helpers.cluster
import helpers.test_tools
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@pytest.fixture(scope="module")
def started_node():
cluster = helpers.cluster.ClickHouseCluster(__file__)
try:
node = cluster.add_instance(
"node", main_configs=["configs/crash_log.xml"], stay_alive=True
)
cluster.start()
yield node
finally:
cluster.shutdown()
def send_signal(started_node, signal):
started_node.exec_in_container(
["bash", "-c", f"pkill -{signal} clickhouse"], user="root"
)
def wait_for_clickhouse_stop(started_node):
result = None
for attempt in range(120):
time.sleep(1)
pid = started_node.get_process_pid("clickhouse")
if pid is None:
result = "OK"
break
assert result == "OK", "ClickHouse process is still running"
def test_pkill(started_node):
if (
started_node.is_built_with_thread_sanitizer()
or started_node.is_built_with_address_sanitizer()
or started_node.is_built_with_memory_sanitizer()
):
pytest.skip("doesn't fit in timeouts for stacktrace generation")
crashes_count = 0
for signal in ["SEGV", "4"]:
send_signal(started_node, signal)
wait_for_clickhouse_stop(started_node)
started_node.restart_clickhouse()
crashes_count += 1
assert (
started_node.query("SELECT COUNT(*) FROM system.crash_log")
== f"{crashes_count}\n"
)
def test_pkill_query_log(started_node):
for signal in ["SEGV", "4"]:
# force create query_log if it was not created
started_node.query("SYSTEM FLUSH LOGS")
started_node.query("TRUNCATE TABLE IF EXISTS system.query_log")
started_node.query("SELECT COUNT(*) FROM system.query_log")
# logs don't flush
assert started_node.query("SELECT COUNT(*) FROM system.query_log") == f"{0}\n"
send_signal(started_node, signal)
wait_for_clickhouse_stop(started_node)
started_node.restart_clickhouse()
assert started_node.query("SELECT COUNT(*) FROM system.query_log") >= f"3\n"