mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Fix flaky integration test test_async_backups_to_same_destination.
This commit is contained in:
parent
14135927fb
commit
dc25f18f13
@ -3121,7 +3121,69 @@ class ClickHouseInstance:
|
||||
params=None,
|
||||
user=None,
|
||||
password=None,
|
||||
expect_fail_and_get_error=False,
|
||||
port=8123,
|
||||
timeout=None,
|
||||
retry_strategy=None,
|
||||
):
|
||||
output, error = self.http_query_and_get_answer_with_error(
|
||||
sql,
|
||||
data=data,
|
||||
method=method,
|
||||
params=params,
|
||||
user=user,
|
||||
password=password,
|
||||
port=port,
|
||||
timeout=timeout,
|
||||
retry_strategy=retry_strategy,
|
||||
)
|
||||
|
||||
if error:
|
||||
raise Exception("ClickHouse HTTP server returned " + error)
|
||||
|
||||
return output
|
||||
|
||||
# Connects to the instance via HTTP interface, sends a query, expects an error and return the error message
|
||||
def http_query_and_get_error(
|
||||
self,
|
||||
sql,
|
||||
data=None,
|
||||
method=None,
|
||||
params=None,
|
||||
user=None,
|
||||
password=None,
|
||||
port=8123,
|
||||
timeout=None,
|
||||
retry_strategy=None,
|
||||
):
|
||||
output, error = self.http_query_and_get_answer_with_error(
|
||||
sql,
|
||||
data=data,
|
||||
method=method,
|
||||
params=params,
|
||||
user=user,
|
||||
password=password,
|
||||
port=port,
|
||||
timeout=timeout,
|
||||
retry_strategy=retry_strategy,
|
||||
)
|
||||
|
||||
if not error:
|
||||
raise Exception(
|
||||
"ClickHouse HTTP server is expected to fail, but succeeded: " + output
|
||||
)
|
||||
|
||||
return error
|
||||
|
||||
# Connects to the instance via HTTP interface, sends a query and returns both the answer and the error message
|
||||
# as a tuple (output, error).
|
||||
def http_query_and_get_answer_with_error(
|
||||
self,
|
||||
sql,
|
||||
data=None,
|
||||
method=None,
|
||||
params=None,
|
||||
user=None,
|
||||
password=None,
|
||||
port=8123,
|
||||
timeout=None,
|
||||
retry_strategy=None,
|
||||
@ -3155,23 +3217,11 @@ class ClickHouseInstance:
|
||||
|
||||
r = requester.request(method, url, data=data, auth=auth, timeout=timeout)
|
||||
|
||||
def http_code_and_message():
|
||||
code = r.status_code
|
||||
return str(code) + " " + http.client.responses[code] + ": " + r.text
|
||||
if r.ok:
|
||||
return (r.text, None)
|
||||
|
||||
if expect_fail_and_get_error:
|
||||
if r.ok:
|
||||
raise Exception(
|
||||
"ClickHouse HTTP server is expected to fail, but succeeded: "
|
||||
+ r.text
|
||||
)
|
||||
return http_code_and_message()
|
||||
else:
|
||||
if not r.ok:
|
||||
raise Exception(
|
||||
"ClickHouse HTTP server returned " + http_code_and_message()
|
||||
)
|
||||
return r.text
|
||||
code = r.status_code
|
||||
return (None, str(code) + " " + http.client.responses[code] + ": " + r.text)
|
||||
|
||||
# Connects to the instance via HTTP interface, sends a query and returns the answer
|
||||
def http_request(self, url, method="GET", params=None, data=None, headers=None):
|
||||
@ -3181,20 +3231,6 @@ class ClickHouseInstance:
|
||||
method=method, url=url, params=params, data=data, headers=headers
|
||||
)
|
||||
|
||||
# Connects to the instance via HTTP interface, sends a query, expects an error and return the error message
|
||||
def http_query_and_get_error(
|
||||
self, sql, data=None, params=None, user=None, password=None
|
||||
):
|
||||
logging.debug(f"Executing query {sql} on {self.name} via HTTP interface")
|
||||
return self.http_query(
|
||||
sql=sql,
|
||||
data=data,
|
||||
params=params,
|
||||
user=user,
|
||||
password=password,
|
||||
expect_fail_and_get_error=True,
|
||||
)
|
||||
|
||||
def stop_clickhouse(self, stop_wait_sec=30, kill=False):
|
||||
if not self.stay_alive:
|
||||
raise Exception(
|
||||
|
@ -335,31 +335,51 @@ def test_async_backups_to_same_destination(interface):
|
||||
create_and_fill_table()
|
||||
backup_name = new_backup_name()
|
||||
|
||||
ids = []
|
||||
for _ in range(2):
|
||||
if interface == "http":
|
||||
res = instance.http_query(f"BACKUP TABLE test.table TO {backup_name} ASYNC")
|
||||
else:
|
||||
res = instance.query(f"BACKUP TABLE test.table TO {backup_name} ASYNC")
|
||||
ids.append(res.split("\t")[0])
|
||||
# The first backup.
|
||||
if interface == "http":
|
||||
res = instance.http_query(f"BACKUP TABLE test.table TO {backup_name} ASYNC")
|
||||
else:
|
||||
res = instance.query(f"BACKUP TABLE test.table TO {backup_name} ASYNC")
|
||||
id1 = res.split("\t")[0]
|
||||
|
||||
[id1, id2] = ids
|
||||
# The second backup to the same destination.
|
||||
if interface == "http":
|
||||
res, err = instance.http_query_and_get_answer_with_error(
|
||||
f"BACKUP TABLE test.table TO {backup_name} ASYNC"
|
||||
)
|
||||
else:
|
||||
res, err = instance.query_and_get_answer_with_error(
|
||||
f"BACKUP TABLE test.table TO {backup_name} ASYNC"
|
||||
)
|
||||
|
||||
# The second backup to the same destination is expected to fail. It can either fail immediately or after a while.
|
||||
# If it fails immediately we won't even get its ID.
|
||||
id2 = None if err else res.split("\t")[0]
|
||||
|
||||
ids = [id1]
|
||||
if id2:
|
||||
ids.append(id2)
|
||||
ids_for_query = "[" + ", ".join(f"'{id}'" for id in ids) + "]"
|
||||
|
||||
assert_eq_with_retry(
|
||||
instance,
|
||||
f"SELECT status FROM system.backups WHERE id IN ['{id1}', '{id2}'] AND status == 'CREATING_BACKUP'",
|
||||
f"SELECT status FROM system.backups WHERE id IN {ids_for_query} AND status == 'CREATING_BACKUP'",
|
||||
"",
|
||||
)
|
||||
|
||||
# The first backup should succeed.
|
||||
assert instance.query(
|
||||
f"SELECT status, error FROM system.backups WHERE id='{id1}'"
|
||||
) == TSV([["BACKUP_CREATED", ""]])
|
||||
|
||||
assert (
|
||||
instance.query(f"SELECT status FROM system.backups WHERE id='{id2}'")
|
||||
== "BACKUP_FAILED\n"
|
||||
)
|
||||
if id2:
|
||||
# The second backup should fail.
|
||||
assert (
|
||||
instance.query(f"SELECT status FROM system.backups WHERE id='{id2}'")
|
||||
== "BACKUP_FAILED\n"
|
||||
)
|
||||
|
||||
# Check that the first backup is all right.
|
||||
instance.query("DROP TABLE test.table")
|
||||
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
|
||||
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
|
||||
|
@ -439,56 +439,73 @@ def test_async_backups_to_same_destination(interface, on_cluster):
|
||||
"ORDER BY x"
|
||||
)
|
||||
|
||||
nodes = [node1, node2]
|
||||
node1.query("INSERT INTO tbl VALUES (1)")
|
||||
|
||||
backup_name = new_backup_name()
|
||||
|
||||
ids = []
|
||||
nodes = [node1, node2]
|
||||
on_cluster_part = "ON CLUSTER 'cluster'" if on_cluster else ""
|
||||
|
||||
# Multiple backups to the same destination.
|
||||
ids = []
|
||||
for node in nodes:
|
||||
if interface == "http":
|
||||
res = node.http_query(
|
||||
res, err = node.http_query_and_get_answer_with_error(
|
||||
f"BACKUP TABLE tbl {on_cluster_part} TO {backup_name} ASYNC"
|
||||
)
|
||||
else:
|
||||
res = node.query(
|
||||
res, err = node.query_and_get_answer_with_error(
|
||||
f"BACKUP TABLE tbl {on_cluster_part} TO {backup_name} ASYNC"
|
||||
)
|
||||
ids.append(res.split("\t")[0])
|
||||
|
||||
[id1, id2] = ids
|
||||
# The second backup to the same destination is expected to fail. It can either fail immediately or after a while.
|
||||
# If it fails immediately we won't even get its ID.
|
||||
if not err:
|
||||
ids.append(res.split("\t")[0])
|
||||
|
||||
for i in range(len(nodes)):
|
||||
ids_for_query = "[" + ", ".join(f"'{id}'" for id in ids) + "]"
|
||||
|
||||
for node in nodes:
|
||||
assert_eq_with_retry(
|
||||
nodes[i],
|
||||
f"SELECT status FROM system.backups WHERE id='{ids[i]}' AND status == 'CREATING_BACKUP'",
|
||||
node,
|
||||
f"SELECT status FROM system.backups WHERE id IN {ids_for_query} AND status == 'CREATING_BACKUP'",
|
||||
"",
|
||||
)
|
||||
|
||||
num_completed_backups = sum(
|
||||
num_created_backups = sum(
|
||||
[
|
||||
int(
|
||||
nodes[i]
|
||||
.query(
|
||||
f"SELECT count() FROM system.backups WHERE id='{ids[i]}' AND status == 'BACKUP_CREATED'"
|
||||
)
|
||||
.strip()
|
||||
node.query(
|
||||
f"SELECT count() FROM system.backups WHERE id IN {ids_for_query} AND status == 'BACKUP_CREATED'"
|
||||
).strip()
|
||||
)
|
||||
for i in range(len(nodes))
|
||||
for node in nodes
|
||||
]
|
||||
)
|
||||
|
||||
if num_completed_backups != 1:
|
||||
for i in range(len(nodes)):
|
||||
num_failed_backups = sum(
|
||||
[
|
||||
int(
|
||||
node.query(
|
||||
f"SELECT count() FROM system.backups WHERE id IN {ids_for_query} AND status == 'BACKUP_FAILED'"
|
||||
).strip()
|
||||
)
|
||||
for node in nodes
|
||||
]
|
||||
)
|
||||
|
||||
# Only one backup should succeed.
|
||||
if (num_created_backups != 1) or (num_failed_backups != len(ids) - 1):
|
||||
for node in nodes:
|
||||
print(
|
||||
nodes[i].query(
|
||||
f"SELECT status, error FROM system.backups WHERE id='{ids[i]}'"
|
||||
node.query(
|
||||
f"SELECT status, error FROM system.backups WHERE id IN {ids_for_query}"
|
||||
)
|
||||
)
|
||||
|
||||
assert num_completed_backups == 1
|
||||
assert num_created_backups == 1
|
||||
assert num_failed_backups == len(ids) - 1
|
||||
|
||||
# Check that the succeeded backup is all right.
|
||||
node1.query("DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY")
|
||||
node1.query(f"RESTORE TABLE tbl FROM {backup_name}")
|
||||
assert node1.query("SELECT * FROM tbl") == "1\n"
|
||||
|
Loading…
Reference in New Issue
Block a user