Merge pull request #43396 from azat/tests/improve-hung-check

RFC: tests: add stacktraces for hunged queries
This commit is contained in:
Alexander Tokmakov 2023-01-25 18:35:39 +03:00 committed by GitHub
commit c19110e186
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -232,19 +232,52 @@ def need_retry(args, stdout, stderr, total_time):
) )
def get_processlist(args): def get_processlist_with_stacktraces(args):
try: try:
if args.replicated_database: if args.replicated_database:
return clickhouse_execute_json( return clickhouse_execute_json(
args, args,
""" """
SELECT materialize((hostName(), tcpPort())) as host, * SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
FROM clusterAllReplicas('test_cluster_database_replicated', system.processes) -- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
groupArray((s.thread_id, arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces,
p.*
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%' WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
))
ORDER BY elapsed DESC
""", """,
settings={
"allow_introspection_functions": 1,
},
) )
else: else:
return clickhouse_execute_json(args, "SHOW PROCESSLIST") return clickhouse_execute_json(
args,
"""
SELECT
groupArray((s.thread_id, arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces,
p.*
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC
""",
settings={
"allow_introspection_functions": 1,
},
)
except Exception as e: except Exception as e:
return "Failed to get processlist: " + str(e) return "Failed to get processlist: " + str(e)
@ -1223,7 +1256,7 @@ class TestSuite:
line = line.strip() line = line.strip()
if line and not is_shebang(line): if line and not is_shebang(line):
return line return line
return '' return ""
def load_tags_from_file(filepath): def load_tags_from_file(filepath):
comment_sign = get_comment_sign(filepath) comment_sign = get_comment_sign(filepath)
@ -1750,7 +1783,7 @@ def removesuffix(text, *suffixes):
return text return text
def reportCoverageFor(args, what, query, permissive = False): def reportCoverageFor(args, what, query, permissive=False):
value = clickhouse_execute(args, query).decode() value = clickhouse_execute(args, query).decode()
if value != "": if value != "":
@ -1763,7 +1796,8 @@ def reportCoverageFor(args, what, query, permissive = False):
def reportCoverage(args): def reportCoverage(args):
return reportCoverageFor( return (
reportCoverageFor(
args, args,
"functions", "functions",
""" """
@ -1776,8 +1810,9 @@ def reportCoverage(args):
) )
ORDER BY name ORDER BY name
""", """,
True True,
) and reportCoverageFor( )
and reportCoverageFor(
args, args,
"aggregate functions", "aggregate functions",
""" """
@ -1789,8 +1824,9 @@ def reportCoverage(args):
SELECT arrayJoin(used_aggregate_functions) FROM system.query_log WHERE event_date >= yesterday() SELECT arrayJoin(used_aggregate_functions) FROM system.query_log WHERE event_date >= yesterday()
) )
ORDER BY name ORDER BY name
""" """,
) and reportCoverageFor( )
and reportCoverageFor(
args, args,
"aggregate function combinators", "aggregate function combinators",
""" """
@ -1802,8 +1838,9 @@ def reportCoverage(args):
SELECT arrayJoin(used_aggregate_function_combinators) FROM system.query_log WHERE event_date >= yesterday() SELECT arrayJoin(used_aggregate_function_combinators) FROM system.query_log WHERE event_date >= yesterday()
) )
ORDER BY name ORDER BY name
""" """,
) and reportCoverageFor( )
and reportCoverageFor(
args, args,
"data type families", "data type families",
""" """
@ -1815,7 +1852,8 @@ def reportCoverage(args):
SELECT arrayJoin(used_data_type_families) FROM system.query_log WHERE event_date >= yesterday() SELECT arrayJoin(used_data_type_families) FROM system.query_log WHERE event_date >= yesterday()
) )
ORDER BY name ORDER BY name
""" """,
)
) )
def reportLogStats(args): def reportLogStats(args):
@ -1891,7 +1929,9 @@ def main(args):
args, "system", "processes", "is_all_data_sent" args, "system", "processes", "is_all_data_sent"
) )
if args.s3_storage and (BuildFlags.THREAD in args.build_flags or BuildFlags.DEBUG in args.build_flags): if args.s3_storage and (
BuildFlags.THREAD in args.build_flags or BuildFlags.DEBUG in args.build_flags
):
args.no_random_settings = True args.no_random_settings = True
if args.skip: if args.skip:
@ -1963,10 +2003,9 @@ def main(args):
exit_code.value = 1 exit_code.value = 1
if args.hung_check: if args.hung_check:
# Some queries may execute in background for some time after test was finished. This is normal. # Some queries may execute in background for some time after test was finished. This is normal.
for _ in range(1, 60): for _ in range(1, 60):
processlist = get_processlist(args) processlist = get_processlist_with_stacktraces(args)
if not processlist: if not processlist:
break break
sleep(1) sleep(1)
@ -1980,7 +2019,6 @@ def main(args):
print(json.dumps(processlist, indent=4)) print(json.dumps(processlist, indent=4))
print(get_transactions_list(args)) print(get_transactions_list(args))
print_stacktraces()
exit_code.value = 1 exit_code.value = 1
else: else:
print(colored("\nNo queries hung.", args, "green", attrs=["bold"])) print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))