mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Merge pull request #53155 from ClickHouse/vdimir/test_quorum_inserts
Attempt to fix test_insert_quorum by adding sync second replica
This commit is contained in:
commit
891bf7e164
@ -147,12 +147,16 @@ def test_drop_replica_and_achieve_quorum(started_cluster):
|
|||||||
|
|
||||||
@pytest.mark.parametrize(("add_new_data"), [False, True])
|
@pytest.mark.parametrize(("add_new_data"), [False, True])
|
||||||
def test_insert_quorum_with_drop_partition(started_cluster, add_new_data):
|
def test_insert_quorum_with_drop_partition(started_cluster, add_new_data):
|
||||||
zero.query(
|
# use different table names for easier disambiguation in logs between runs (you may also check uuid though, but not always convenient)
|
||||||
"DROP TABLE IF EXISTS test_quorum_insert_with_drop_partition ON CLUSTER cluster"
|
table_name = (
|
||||||
|
"test_quorum_insert_with_drop_partition_new_data"
|
||||||
|
if add_new_data
|
||||||
|
else "test_quorum_insert_with_drop_partition"
|
||||||
)
|
)
|
||||||
|
zero.query(f"DROP TABLE IF EXISTS {table_name} ON CLUSTER cluster")
|
||||||
|
|
||||||
create_query = (
|
create_query = (
|
||||||
"CREATE TABLE test_quorum_insert_with_drop_partition ON CLUSTER cluster "
|
f"CREATE TABLE {table_name} ON CLUSTER cluster "
|
||||||
"(a Int8, d Date) "
|
"(a Int8, d Date) "
|
||||||
"Engine = ReplicatedMergeTree "
|
"Engine = ReplicatedMergeTree "
|
||||||
"PARTITION BY d ORDER BY a "
|
"PARTITION BY d ORDER BY a "
|
||||||
@ -161,78 +165,74 @@ def test_insert_quorum_with_drop_partition(started_cluster, add_new_data):
|
|||||||
print("Create Replicated table with three replicas")
|
print("Create Replicated table with three replicas")
|
||||||
zero.query(create_query)
|
zero.query(create_query)
|
||||||
|
|
||||||
print("Stop fetches for test_quorum_insert_with_drop_partition at first replica.")
|
print(f"Stop fetches for {table_name} at first replica.")
|
||||||
first.query("SYSTEM STOP FETCHES test_quorum_insert_with_drop_partition")
|
first.query(f"SYSTEM STOP FETCHES {table_name}")
|
||||||
|
|
||||||
print("Insert with quorum. (zero and second)")
|
print("Insert with quorum. (zero and second)")
|
||||||
zero.query(
|
zero.query(f"INSERT INTO {table_name}(a,d) VALUES(1, '2011-01-01')")
|
||||||
"INSERT INTO test_quorum_insert_with_drop_partition(a,d) VALUES(1, '2011-01-01')"
|
|
||||||
)
|
|
||||||
|
|
||||||
print("Drop partition.")
|
print("Drop partition.")
|
||||||
zero.query(
|
zero.query(f"ALTER TABLE {table_name} DROP PARTITION '2011-01-01'")
|
||||||
"ALTER TABLE test_quorum_insert_with_drop_partition DROP PARTITION '2011-01-01'"
|
|
||||||
)
|
|
||||||
|
|
||||||
if add_new_data:
|
if add_new_data:
|
||||||
print("Insert to deleted partition")
|
print("Insert to deleted partition")
|
||||||
zero.query(
|
zero.query(f"INSERT INTO {table_name}(a,d) VALUES(2, '2011-01-01')")
|
||||||
"INSERT INTO test_quorum_insert_with_drop_partition(a,d) VALUES(2, '2011-01-01')"
|
|
||||||
)
|
|
||||||
|
|
||||||
print("Resume fetches for test_quorum_insert_with_drop_partition at first replica.")
|
print(f"Resume fetches for {table_name} at first replica.")
|
||||||
first.query("SYSTEM START FETCHES test_quorum_insert_with_drop_partition")
|
first.query(f"SYSTEM START FETCHES {table_name}")
|
||||||
|
|
||||||
print("Sync first replica with others.")
|
print("Sync first replica with others.")
|
||||||
first.query("SYSTEM SYNC REPLICA test_quorum_insert_with_drop_partition")
|
first.query(f"SYSTEM SYNC REPLICA {table_name}")
|
||||||
|
|
||||||
assert "20110101" not in first.query(
|
assert "20110101" not in first.query(
|
||||||
"""
|
f"""
|
||||||
WITH (SELECT toString(uuid) FROM system.tables WHERE name = 'test_quorum_insert_with_drop_partition') AS uuid,
|
WITH (SELECT toString(uuid) FROM system.tables WHERE name = '{table_name}') AS uuid,
|
||||||
'/clickhouse/tables/' || uuid || '/0/quorum/last_part' AS p
|
'/clickhouse/tables/' || uuid || '/0/quorum/last_part' AS p
|
||||||
SELECT * FROM system.zookeeper WHERE path = p FORMAT Vertical
|
SELECT * FROM system.zookeeper WHERE path = p FORMAT Vertical
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Sync second replica not to have `REPLICA_IS_NOT_IN_QUORUM` error
|
||||||
|
second.query(f"SYSTEM SYNC REPLICA {table_name}")
|
||||||
|
|
||||||
print("Select from updated partition.")
|
print("Select from updated partition.")
|
||||||
if add_new_data:
|
if add_new_data:
|
||||||
|
assert TSV("2\t2011-01-01\n") == TSV(zero.query(f"SELECT * FROM {table_name}"))
|
||||||
assert TSV("2\t2011-01-01\n") == TSV(
|
assert TSV("2\t2011-01-01\n") == TSV(
|
||||||
zero.query("SELECT * FROM test_quorum_insert_with_drop_partition")
|
second.query(f"SELECT * FROM {table_name}")
|
||||||
)
|
|
||||||
assert TSV("2\t2011-01-01\n") == TSV(
|
|
||||||
second.query("SELECT * FROM test_quorum_insert_with_drop_partition")
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
assert TSV("") == TSV(
|
assert TSV("") == TSV(zero.query(f"SELECT * FROM {table_name}"))
|
||||||
zero.query("SELECT * FROM test_quorum_insert_with_drop_partition")
|
assert TSV("") == TSV(second.query(f"SELECT * FROM {table_name}"))
|
||||||
)
|
|
||||||
assert TSV("") == TSV(
|
|
||||||
second.query("SELECT * FROM test_quorum_insert_with_drop_partition")
|
|
||||||
)
|
|
||||||
|
|
||||||
zero.query(
|
zero.query(f"DROP TABLE IF EXISTS {table_name} ON CLUSTER cluster")
|
||||||
"DROP TABLE IF EXISTS test_quorum_insert_with_drop_partition ON CLUSTER cluster"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(("add_new_data"), [False, True])
|
@pytest.mark.parametrize(("add_new_data"), [False, True])
|
||||||
def test_insert_quorum_with_move_partition(started_cluster, add_new_data):
|
def test_insert_quorum_with_move_partition(started_cluster, add_new_data):
|
||||||
zero.query(
|
# use different table names for easier disambiguation in logs between runs (you may also check uuid though, but not always convenient)
|
||||||
"DROP TABLE IF EXISTS test_insert_quorum_with_move_partition_source ON CLUSTER cluster"
|
source_table_name = (
|
||||||
|
"test_insert_quorum_with_move_partition_source_new_data"
|
||||||
|
if add_new_data
|
||||||
|
else "test_insert_quorum_with_move_partition_source"
|
||||||
)
|
)
|
||||||
zero.query(
|
destination_table_name = (
|
||||||
"DROP TABLE IF EXISTS test_insert_quorum_with_move_partition_destination ON CLUSTER cluster"
|
"test_insert_quorum_with_move_partition_destination_new_data"
|
||||||
|
if add_new_data
|
||||||
|
else "test_insert_quorum_with_move_partition_destination"
|
||||||
)
|
)
|
||||||
|
zero.query(f"DROP TABLE IF EXISTS {source_table_name} ON CLUSTER cluster")
|
||||||
|
zero.query(f"DROP TABLE IF EXISTS {destination_table_name} ON CLUSTER cluster")
|
||||||
|
|
||||||
create_source = (
|
create_source = (
|
||||||
"CREATE TABLE test_insert_quorum_with_move_partition_source ON CLUSTER cluster "
|
f"CREATE TABLE {source_table_name} ON CLUSTER cluster "
|
||||||
"(a Int8, d Date) "
|
"(a Int8, d Date) "
|
||||||
"Engine = ReplicatedMergeTree "
|
"Engine = ReplicatedMergeTree "
|
||||||
"PARTITION BY d ORDER BY a "
|
"PARTITION BY d ORDER BY a "
|
||||||
)
|
)
|
||||||
|
|
||||||
create_destination = (
|
create_destination = (
|
||||||
"CREATE TABLE test_insert_quorum_with_move_partition_destination ON CLUSTER cluster "
|
f"CREATE TABLE {destination_table_name} ON CLUSTER cluster "
|
||||||
"(a Int8, d Date) "
|
"(a Int8, d Date) "
|
||||||
"Engine = ReplicatedMergeTree "
|
"Engine = ReplicatedMergeTree "
|
||||||
"PARTITION BY d ORDER BY a "
|
"PARTITION BY d ORDER BY a "
|
||||||
@ -244,65 +244,52 @@ def test_insert_quorum_with_move_partition(started_cluster, add_new_data):
|
|||||||
print("Create destination Replicated table with three replicas")
|
print("Create destination Replicated table with three replicas")
|
||||||
zero.query(create_destination)
|
zero.query(create_destination)
|
||||||
|
|
||||||
print(
|
print(f"Stop fetches for {source_table_name} at first replica.")
|
||||||
"Stop fetches for test_insert_quorum_with_move_partition_source at first replica."
|
first.query(f"SYSTEM STOP FETCHES {source_table_name}")
|
||||||
)
|
|
||||||
first.query("SYSTEM STOP FETCHES test_insert_quorum_with_move_partition_source")
|
|
||||||
|
|
||||||
print("Insert with quorum. (zero and second)")
|
print("Insert with quorum. (zero and second)")
|
||||||
zero.query(
|
zero.query(f"INSERT INTO {source_table_name}(a,d) VALUES(1, '2011-01-01')")
|
||||||
"INSERT INTO test_insert_quorum_with_move_partition_source(a,d) VALUES(1, '2011-01-01')"
|
|
||||||
)
|
|
||||||
|
|
||||||
print("Drop partition.")
|
print("Drop partition.")
|
||||||
zero.query(
|
zero.query(
|
||||||
"ALTER TABLE test_insert_quorum_with_move_partition_source MOVE PARTITION '2011-01-01' TO TABLE test_insert_quorum_with_move_partition_destination"
|
f"ALTER TABLE {source_table_name} MOVE PARTITION '2011-01-01' TO TABLE {destination_table_name}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if add_new_data:
|
if add_new_data:
|
||||||
print("Insert to deleted partition")
|
print("Insert to deleted partition")
|
||||||
zero.query(
|
zero.query(f"INSERT INTO {source_table_name}(a,d) VALUES(2, '2011-01-01')")
|
||||||
"INSERT INTO test_insert_quorum_with_move_partition_source(a,d) VALUES(2, '2011-01-01')"
|
|
||||||
)
|
|
||||||
|
|
||||||
print(
|
print(f"Resume fetches for {source_table_name} at first replica.")
|
||||||
"Resume fetches for test_insert_quorum_with_move_partition_source at first replica."
|
first.query(f"SYSTEM START FETCHES {source_table_name}")
|
||||||
)
|
|
||||||
first.query("SYSTEM START FETCHES test_insert_quorum_with_move_partition_source")
|
|
||||||
|
|
||||||
print("Sync first replica with others.")
|
print("Sync first replica with others.")
|
||||||
first.query("SYSTEM SYNC REPLICA test_insert_quorum_with_move_partition_source")
|
first.query(f"SYSTEM SYNC REPLICA {source_table_name}")
|
||||||
|
|
||||||
assert "20110101" not in first.query(
|
assert "20110101" not in first.query(
|
||||||
"""
|
f"""
|
||||||
WITH (SELECT toString(uuid) FROM system.tables WHERE name = 'test_insert_quorum_with_move_partition_source') AS uuid,
|
WITH (SELECT toString(uuid) FROM system.tables WHERE name = '{source_table_name}') AS uuid,
|
||||||
'/clickhouse/tables/' || uuid || '/0/quorum/last_part' AS p
|
'/clickhouse/tables/' || uuid || '/0/quorum/last_part' AS p
|
||||||
SELECT * FROM system.zookeeper WHERE path = p FORMAT Vertical
|
SELECT * FROM system.zookeeper WHERE path = p FORMAT Vertical
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Sync second replica not to have `REPLICA_IS_NOT_IN_QUORUM` error
|
||||||
|
second.query(f"SYSTEM SYNC REPLICA {source_table_name}")
|
||||||
|
|
||||||
print("Select from updated partition.")
|
print("Select from updated partition.")
|
||||||
if add_new_data:
|
if add_new_data:
|
||||||
assert TSV("2\t2011-01-01\n") == TSV(
|
assert TSV("2\t2011-01-01\n") == TSV(
|
||||||
zero.query("SELECT * FROM test_insert_quorum_with_move_partition_source")
|
zero.query(f"SELECT * FROM {source_table_name}")
|
||||||
)
|
)
|
||||||
assert TSV("2\t2011-01-01\n") == TSV(
|
assert TSV("2\t2011-01-01\n") == TSV(
|
||||||
second.query("SELECT * FROM test_insert_quorum_with_move_partition_source")
|
second.query(f"SELECT * FROM {source_table_name}")
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
assert TSV("") == TSV(
|
assert TSV("") == TSV(zero.query(f"SELECT * FROM {source_table_name}"))
|
||||||
zero.query("SELECT * FROM test_insert_quorum_with_move_partition_source")
|
assert TSV("") == TSV(second.query(f"SELECT * FROM {source_table_name}"))
|
||||||
)
|
|
||||||
assert TSV("") == TSV(
|
|
||||||
second.query("SELECT * FROM test_insert_quorum_with_move_partition_source")
|
|
||||||
)
|
|
||||||
|
|
||||||
zero.query(
|
zero.query(f"DROP TABLE IF EXISTS {source_table_name} ON CLUSTER cluster")
|
||||||
"DROP TABLE IF EXISTS test_insert_quorum_with_move_partition_source ON CLUSTER cluster"
|
zero.query(f"DROP TABLE IF EXISTS {destination_table_name} ON CLUSTER cluster")
|
||||||
)
|
|
||||||
zero.query(
|
|
||||||
"DROP TABLE IF EXISTS test_insert_quorum_with_move_partition_destination ON CLUSTER cluster"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_insert_quorum_with_ttl(started_cluster):
|
def test_insert_quorum_with_ttl(started_cluster):
|
||||||
|
Loading…
Reference in New Issue
Block a user