Merge pull request #25305 from ClickHouse/improve_replicated_database_tests

Improve Replicated database tests
This commit is contained in:
tavplubix 2021-06-16 15:58:46 +03:00 committed by GitHub
commit 76348edc71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 47 additions and 22 deletions

View File

@ -112,12 +112,15 @@ timeout "$MAX_RUN_TIME" bash -c run_tests ||:
./process_functional_tests_result.py || echo -e "failure\tCannot parse results" > /test_output/check_status.tsv ./process_functional_tests_result.py || echo -e "failure\tCannot parse results" > /test_output/check_status.tsv
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz ||: pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz ||:
mv /var/log/clickhouse-server/stderr.log /test_output/ ||: mv /var/log/clickhouse-server/stderr.log /test_output/ ||:
if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
tar -chf /test_output/clickhouse_coverage.tar.gz /profraw ||: tar -chf /test_output/clickhouse_coverage.tar.gz /profraw ||:
fi fi
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||: pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||:
pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||: pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||:
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||: mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:

View File

@ -103,6 +103,7 @@ timeout "$MAX_RUN_TIME" bash -c run_tests ||:
clickhouse-client -q "system flush logs" ||: clickhouse-client -q "system flush logs" ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz & pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
clickhouse-client -q "select * from system.query_log format TSVWithNamesAndTypes" | pigz > /test_output/query-log.tsv.gz & clickhouse-client -q "select * from system.query_log format TSVWithNamesAndTypes" | pigz > /test_output/query-log.tsv.gz &
clickhouse-client -q "select * from system.query_thread_log format TSVWithNamesAndTypes" | pigz > /test_output/query-thread-log.tsv.gz & clickhouse-client -q "select * from system.query_thread_log format TSVWithNamesAndTypes" | pigz > /test_output/query-thread-log.tsv.gz &
@ -140,6 +141,8 @@ tar -chf /test_output/query_log_dump.tar /var/lib/clickhouse/data/system/query_l
tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||: tar -chf /test_output/coordination.tar /var/lib/clickhouse/coordination ||:
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server1.log ||:
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server2.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||: pigz < /var/log/clickhouse-server/clickhouse-server1.log > /test_output/clickhouse-server1.log.gz ||:
pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||: pigz < /var/log/clickhouse-server/clickhouse-server2.log > /test_output/clickhouse-server2.log.gz ||:
mv /var/log/clickhouse-server/stderr1.log /test_output/ ||: mv /var/log/clickhouse-server/stderr1.log /test_output/ ||:

View File

@ -15,6 +15,7 @@
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataStreams/NullBlockOutputStream.h> #include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <filesystem> #include <filesystem>
@ -175,17 +176,15 @@ BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & en
if (context->getSettingsRef().distributed_ddl_task_timeout == 0) if (context->getSettingsRef().distributed_ddl_task_timeout == 0)
return io; return io;
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context, hosts_to_wait); BlockInputStreamPtr stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context, hosts_to_wait);
if (context->getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE) if (context->getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE)
{ {
/// Wait for query to finish, but ignore output /// Wait for query to finish, but ignore output
NullBlockOutputStream output{Block{}}; auto null_output = std::make_shared<NullBlockOutputStream>(stream->getHeader());
copyData(*stream, output); stream = std::make_shared<NullAndDoCopyBlockInputStream>(std::move(stream), std::move(null_output));
}
else
{
io.in = std::move(stream);
} }
io.in = std::move(stream);
return io; return io;
} }

View File

@ -36,11 +36,15 @@ MESSAGES_TO_RETRY = [
"Coordination::Exception: Session expired", "Coordination::Exception: Session expired",
"Coordination::Exception: Connection loss", "Coordination::Exception: Connection loss",
"Coordination::Exception: Operation timeout", "Coordination::Exception: Operation timeout",
"DB::Exception: Operation timeout",
"Operation timed out", "Operation timed out",
"ConnectionPoolWithFailover: Connection failed at try", "ConnectionPoolWithFailover: Connection failed at try",
"DB::Exception: New table appeared in database being dropped or detached. Try again" "DB::Exception: New table appeared in database being dropped or detached. Try again",
"is executing longer than distributed_ddl_task_timeout (=120)" # FIXME
] ]
MAX_RETRIES = 5
class Terminated(KeyboardInterrupt): class Terminated(KeyboardInterrupt):
pass pass
def signal_handler(sig, frame): def signal_handler(sig, frame):
@ -258,7 +262,8 @@ def get_processlist(args):
query = b"SHOW PROCESSLIST FORMAT Vertical" query = b"SHOW PROCESSLIST FORMAT Vertical"
if args.replicated_database: if args.replicated_database:
query = b"SELECT materialize((hostName(), tcpPort())) as host, * " \ query = b"SELECT materialize((hostName(), tcpPort())) as host, * " \
b"FROM clusterAllReplicas('r', system.processes) WHERE query NOT LIKE '%system.processes%' FORMAT Vertical" b"FROM clusterAllReplicas('test_cluster_database_replicated', system.processes) " \
b"WHERE query NOT LIKE '%system.processes%' FORMAT Vertical"
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE) clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, _) = clickhouse_proc.communicate((query), timeout=20) (stdout, _) = clickhouse_proc.communicate((query), timeout=20)
return False, stdout.decode('utf-8') return False, stdout.decode('utf-8')
@ -279,8 +284,16 @@ def get_stacktraces_from_gdb(server_pid):
# collect server stacktraces from system.stack_trace table # collect server stacktraces from system.stack_trace table
# it does not work in Sandbox # it does not work in Sandbox
def get_stacktraces_from_clickhouse(client): def get_stacktraces_from_clickhouse(client, replicated_database=False):
try: try:
if replicated_database:
return subprocess.check_output("{} --allow_introspection_functions=1 --skip_unavailable_shards=1 --query "
"\"SELECT materialize((hostName(), tcpPort())) as host, thread_id, "
"arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), "
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace "
"FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') "
"ORDER BY host, thread_id format Vertical\"".format(client), shell=True, stderr=subprocess.STDOUT).decode('utf-8')
return subprocess.check_output("{} --allow_introspection_functions=1 --query " return subprocess.check_output("{} --allow_introspection_functions=1 --query "
"\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), " "\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), arrayMap(x -> addressToLine(x), trace), "
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace " "arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace "
@ -289,7 +302,6 @@ def get_stacktraces_from_clickhouse(client):
print("Error occured while receiving stack traces from client: {}".format(str(ex))) print("Error occured while receiving stack traces from client: {}".format(str(ex)))
return None return None
def get_server_pid(server_tcp_port): def get_server_pid(server_tcp_port):
# lsof does not work in stress tests for some reason # lsof does not work in stress tests for some reason
cmd_lsof = "lsof -i tcp:{port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'".format(port=server_tcp_port) cmd_lsof = "lsof -i tcp:{port} -s tcp:LISTEN -Fp | awk '/^p[0-9]+$/{{print substr($0, 2)}}'".format(port=server_tcp_port)
@ -317,7 +329,7 @@ SERVER_DIED = False
exit_code = 0 exit_code = 0
stop_time = None stop_time = None
queue = multiprocessing.Queue(maxsize=1) queue = multiprocessing.Queue(maxsize=1)
restarted_tests = [] # (test, stderr)
# def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total): # def run_tests_array(all_tests, suite, suite_dir, suite_tmp_dir, run_total):
def run_tests_array(all_tests_with_params): def run_tests_array(all_tests_with_params):
@ -458,11 +470,12 @@ def run_tests_array(all_tests_with_params):
else: else:
counter = 1 counter = 1
while need_retry(stderr): while need_retry(stderr):
restarted_tests.append((case_file, stderr))
testcase_args = configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file) testcase_args = configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file)
proc, stdout, stderr, total_time = run_single_test(testcase_args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file) proc, stdout, stderr, total_time = run_single_test(testcase_args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
sleep(2**counter) sleep(2**counter)
counter += 1 counter += 1
if counter > 6: if MAX_RETRIES < counter:
break break
if proc.returncode != 0: if proc.returncode != 0:
@ -917,7 +930,7 @@ def main(args):
clickhouse_tcp_port = os.getenv("CLICKHOUSE_PORT_TCP", '9000') clickhouse_tcp_port = os.getenv("CLICKHOUSE_PORT_TCP", '9000')
server_pid = get_server_pid(clickhouse_tcp_port) server_pid = get_server_pid(clickhouse_tcp_port)
bt = None bt = None
if server_pid: if server_pid and not args.replicated_database:
print("\nLocated ClickHouse server process {} listening at TCP port {}".format(server_pid, clickhouse_tcp_port)) print("\nLocated ClickHouse server process {} listening at TCP port {}".format(server_pid, clickhouse_tcp_port))
print("\nCollecting stacktraces from all running threads with gdb:") print("\nCollecting stacktraces from all running threads with gdb:")
bt = get_stacktraces_from_gdb(server_pid) bt = get_stacktraces_from_gdb(server_pid)
@ -926,7 +939,7 @@ def main(args):
bt = None bt = None
if bt is None: if bt is None:
print("\nCollecting stacktraces from system.stacktraces table:") print("\nCollecting stacktraces from system.stacktraces table:")
bt = get_stacktraces_from_clickhouse(args.client) bt = get_stacktraces_from_clickhouse(args.client, args.replicated_database)
if bt is None: if bt is None:
print( print(
colored( colored(
@ -941,6 +954,13 @@ def main(args):
else: else:
print(colored("\nNo queries hung.", args, "green", attrs=["bold"])) print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
if len(restarted_tests) > 0:
print("\nSome tests were restarted:\n")
for (test_case, stderr) in restarted_tests:
print(test_case)
print(stderr)
print("\n")
if total_tests_run == 0: if total_tests_run == 0:
print("No tests were run.") print("No tests were run.")
sys.exit(1) sys.exit(1)

View File

@ -3,8 +3,8 @@
<default> <default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated> <allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<distributed_ddl_output_mode>none</distributed_ddl_output_mode> <distributed_ddl_output_mode>none</distributed_ddl_output_mode>
<database_replicated_initial_query_timeout_sec>100</database_replicated_initial_query_timeout_sec> <database_replicated_initial_query_timeout_sec>120</database_replicated_initial_query_timeout_sec>
<distributed_ddl_task_timeout>100</distributed_ddl_task_timeout> <distributed_ddl_task_timeout>120</distributed_ddl_task_timeout>
<database_replicated_always_detach_permanently>1</database_replicated_always_detach_permanently> <database_replicated_always_detach_permanently>1</database_replicated_always_detach_permanently>
<distributed_ddl_entry_format_version>2</distributed_ddl_entry_format_version> <distributed_ddl_entry_format_version>2</distributed_ddl_entry_format_version>
</default> </default>

View File

@ -1,9 +1,9 @@
1 1 1 -1 1 1 1 -1
2 2 2 -1 2 2 2 -1
CREATE TABLE default.table_with_version_replicated_1\n(\n `key` UInt64,\n `value` String,\n `version` UInt8,\n `sign` Int8\n)\nENGINE = ReplicatedVersionedCollapsingMergeTree(\'/clickhouse/default/test_01511/t\', \'1\', sign, version)\nORDER BY key\nSETTINGS index_granularity = 8192 CREATE TABLE default.table_with_version_replicated_1\n(\n `key` UInt64,\n `value` String,\n `version` UInt8,\n `sign` Int8\n)\nENGINE = ReplicatedVersionedCollapsingMergeTree(\'/clickhouse/default/test_01511/{shard}/t\', \'1_{replica}\', sign, version)\nORDER BY key\nSETTINGS index_granularity = 8192
1 1 1 -1 1 1 1 -1
2 2 2 -1 2 2 2 -1
CREATE TABLE default.table_with_version_replicated_1\n(\n `key` UInt64,\n `value` String,\n `version` UInt32,\n `sign` Int8\n)\nENGINE = ReplicatedVersionedCollapsingMergeTree(\'/clickhouse/default/test_01511/t\', \'1\', sign, version)\nORDER BY key\nSETTINGS index_granularity = 8192 CREATE TABLE default.table_with_version_replicated_1\n(\n `key` UInt64,\n `value` String,\n `version` UInt32,\n `sign` Int8\n)\nENGINE = ReplicatedVersionedCollapsingMergeTree(\'/clickhouse/default/test_01511/{shard}/t\', \'1_{replica}\', sign, version)\nORDER BY key\nSETTINGS index_granularity = 8192
1 1 2 1 1 1 2 1
2 2 2 -1 2 2 2 -1
1 1 2 1 1 1 2 1
@ -11,6 +11,6 @@ CREATE TABLE default.table_with_version_replicated_1\n(\n `key` UInt64,\n
3 3 65555 1 3 3 65555 1
1 1 2 1 1 1 2 1
2 2 2 -1 2 2 2 -1
CREATE TABLE default.table_with_version_replicated_2\n(\n `key` UInt64,\n `value` String,\n `version` UInt32,\n `sign` Int8\n)\nENGINE = ReplicatedVersionedCollapsingMergeTree(\'/clickhouse/default/test_01511/t\', \'2\', sign, version)\nORDER BY key\nSETTINGS index_granularity = 8192 CREATE TABLE default.table_with_version_replicated_2\n(\n `key` UInt64,\n `value` String,\n `version` UInt32,\n `sign` Int8\n)\nENGINE = ReplicatedVersionedCollapsingMergeTree(\'/clickhouse/default/test_01511/{shard}/t\', \'2_{replica}\', sign, version)\nORDER BY key\nSETTINGS index_granularity = 8192
1 1 2 1 1 1 2 1
2 2 2 -1 2 2 2 -1

View File

@ -8,7 +8,7 @@ CREATE TABLE table_with_version_replicated_1
version UInt8, version UInt8,
sign Int8 sign Int8
) )
ENGINE ReplicatedVersionedCollapsingMergeTree('/clickhouse/{database}/test_01511/t', '1', sign, version) ENGINE ReplicatedVersionedCollapsingMergeTree('/clickhouse/' || currentDatabase() || '/test_01511/{shard}/t', '1_{replica}', sign, version)
ORDER BY key; ORDER BY key;
CREATE TABLE table_with_version_replicated_2 CREATE TABLE table_with_version_replicated_2
@ -18,7 +18,7 @@ CREATE TABLE table_with_version_replicated_2
version UInt8, version UInt8,
sign Int8 sign Int8
) )
ENGINE ReplicatedVersionedCollapsingMergeTree('/clickhouse/{database}/test_01511/t', '2', sign, version) ENGINE ReplicatedVersionedCollapsingMergeTree('/clickhouse/' || currentDatabase() || '/test_01511/{shard}/t', '2_{replica}', sign, version)
ORDER BY key; ORDER BY key;
INSERT INTO table_with_version_replicated_1 VALUES (1, '1', 1, -1); INSERT INTO table_with_version_replicated_1 VALUES (1, '1', 1, -1);