hung_check check with lighter queries

This commit is contained in:
Sema Checherinda 2023-12-18 19:16:50 +01:00
parent a12fe7473c
commit 140f6dafd8

View File

@ -32,7 +32,7 @@ from typing import Tuple, Union, Optional, Dict, Set, List
import subprocess
from subprocess import Popen
from subprocess import PIPE
from datetime import datetime
from datetime import datetime, timedelta
from time import time, sleep
from errno import ESRCH
@ -278,37 +278,44 @@ def need_retry(args, stdout, stderr, total_time):
msg in stderr for msg in MESSAGES_TO_RETRY
)
def get_processlist_size(args):
if args.replicated_database:
return int(
clickhouse_execute(
args,
"""
SELECT
count()
FROM
FROM system.processes
WHERE query NOT LIKE '%system.processes%'
))
FORMAT Vertical
""",
).strip()
)
else:
return int(
clickhouse_execute(
args,
"""
SELECT
count()
FROM system.processes
WHERE query NOT LIKE '%system.processes%'
FORMAT Vertical
""",
).strip()
)
def get_processlist_with_stacktraces(args):
try:
if args.replicated_database:
return clickhouse_execute(
args,
"""
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
))
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
)
else:
return clickhouse_execute(
args,
"""
if args.replicated_database:
return clickhouse_execute(
args,
"""
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
@ -319,14 +326,36 @@ def get_processlist_with_stacktraces(args):
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
)
except Exception as e:
return "Failed to get processlist: " + str(e)
))
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
timeout=120,
)
else:
return clickhouse_execute(
args,
"""
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
timeout=120,
)
def get_transactions_list(args):
@ -2420,11 +2449,36 @@ def main(args):
if args.hung_check:
# Some queries may execute in background for some time after test was finished. This is normal.
for _ in range(1, 60):
processlist = get_processlist_with_stacktraces(args)
if not processlist:
break
sleep(1)
print("Checking the hung queries: ", end='')
hung_count = 0
try:
deadline = datetime.now() + timedelta(seconds=90)
while datetime.now() < deadline:
hung_count = get_processlist_size(args)
if hung_count == 0:
print(" done")
break
print(". ", end='')
except Exception as e:
print(
colored(
"\nHung check failed. Failed to get processlist size: " + str(e), args, "red", attrs=["bold"]
)
)
exit_code.value = 1
processlist = ""
if hung_count > 0:
try:
processlist = get_processlist_with_stacktraces(args)
except Exception as e:
print(
colored(
"\nHung check failed, Failed to get processlist with stacktraces: " + str(e), args, "red", attrs=["bold"]
)
)
exit_code.value = 1
if processlist:
print(