Merge pull request #58007 from ClickHouse/chesema-stateless-run-timeout

more messages in ci
This commit is contained in:
Sema Checherinda 2023-12-20 13:50:12 +01:00 committed by GitHub
commit 291567a5b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 40 deletions

View File

@ -216,11 +216,11 @@ export -f run_tests
if [ "$NUM_TRIES" -gt "1" ]; then
# We don't run tests with Ordinary database in PRs, only in master.
# So run new/changed tests with Ordinary at least once in flaky check.
timeout "$MAX_RUN_TIME" bash -c 'NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests' \
timeout_with_logging "$MAX_RUN_TIME" bash -c 'NUM_TRIES=1; USE_DATABASE_ORDINARY=1; run_tests' \
| sed 's/All tests have finished//' | sed 's/No tests were run//' ||:
fi
timeout "$MAX_RUN_TIME" bash -c run_tests ||:
timeout_with_logging "$MAX_RUN_TIME" bash -c run_tests ||:
echo "Files in current directory"
ls -la ./

View File

@ -35,4 +35,17 @@ function fn_exists() {
declare -F "$1" > /dev/null;
}
function timeout_with_logging() {
local exit_code=0
timeout "${@}" || exit_code="${?}"
if [[ "${exit_code}" -eq "124" ]]
then
echo "The command 'timeout ${*}' has been killed by timeout"
fi
return $exit_code
}
# vi: ft=bash

View File

@ -32,7 +32,7 @@ from typing import Tuple, Union, Optional, Dict, Set, List
import subprocess
from subprocess import Popen
from subprocess import PIPE
from datetime import datetime
from datetime import datetime, timedelta
from time import time, sleep
from errno import ESRCH
@ -279,36 +279,42 @@ def need_retry(args, stdout, stderr, total_time):
)
def get_processlist_with_stacktraces(args):
try:
if args.replicated_database:
return clickhouse_execute(
def get_processlist_size(args):
if args.replicated_database:
return int(
clickhouse_execute(
args,
"""
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
count()
FROM
FROM system.processes
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
))
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
)
else:
return clickhouse_execute(
""",
).strip()
)
else:
return int(
clickhouse_execute(
args,
"""
SELECT
count()
FROM system.processes
WHERE query NOT LIKE '%system.processes%'
""",
).strip()
)
def get_processlist_with_stacktraces(args):
if args.replicated_database:
return clickhouse_execute(
args,
"""
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
@ -319,14 +325,35 @@ def get_processlist_with_stacktraces(args):
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
)
except Exception as e:
return "Failed to get processlist: " + str(e)
))
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
timeout=120,
)
else:
return clickhouse_execute(
args,
"""
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
timeout=120,
)
def get_transactions_list(args):
@ -2427,11 +2454,42 @@ def main(args):
if args.hung_check:
# Some queries may execute in background for some time after test was finished. This is normal.
for _ in range(1, 60):
processlist = get_processlist_with_stacktraces(args)
if not processlist:
break
sleep(1)
print("Checking the hung queries: ", end="")
hung_count = 0
try:
deadline = datetime.now() + timedelta(seconds=90)
while datetime.now() < deadline:
hung_count = get_processlist_size(args)
if hung_count == 0:
print(" done")
break
print(". ", end="")
except Exception as e:
print(
colored(
"\nHung check failed. Failed to get processlist size: " + str(e),
args,
"red",
attrs=["bold"],
)
)
exit_code.value = 1
processlist = ""
if hung_count > 0:
try:
processlist = get_processlist_with_stacktraces(args)
except Exception as e:
print(
colored(
"\nHung check failed. Failed to get processlist with stacktraces: "
+ str(e),
args,
"red",
attrs=["bold"],
)
)
exit_code.value = 1
if processlist:
print(