mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge pull request #47152 from ClickHouse/fix_flaky_test_replicated_database
Use unique names and paths in `test_replicated_database`
This commit is contained in:
commit
5be681c5f6
@ -80,15 +80,15 @@ def started_cluster():
|
|||||||
|
|
||||||
def test_create_replicated_table(started_cluster):
|
def test_create_replicated_table(started_cluster):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica' || '1');"
|
"CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica' || '1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE create_replicated_table ENGINE = Replicated('/test/create_replicated_table', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
assert (
|
assert (
|
||||||
"Explicit zookeeper_path and replica_name are specified"
|
"Explicit zookeeper_path and replica_name are specified"
|
||||||
in main_node.query_and_get_error(
|
in main_node.query_and_get_error(
|
||||||
"CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) "
|
"CREATE TABLE create_replicated_table.replicated_table (d Date, k UInt64, i32 Int32) "
|
||||||
"ENGINE=ReplicatedMergeTree('/test/tmp', 'r') ORDER BY k PARTITION BY toYYYYMM(d);"
|
"ENGINE=ReplicatedMergeTree('/test/tmp', 'r') ORDER BY k PARTITION BY toYYYYMM(d);"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -96,7 +96,7 @@ def test_create_replicated_table(started_cluster):
|
|||||||
assert (
|
assert (
|
||||||
"Explicit zookeeper_path and replica_name are specified"
|
"Explicit zookeeper_path and replica_name are specified"
|
||||||
in main_node.query_and_get_error(
|
in main_node.query_and_get_error(
|
||||||
"CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) "
|
"CREATE TABLE create_replicated_table.replicated_table (d Date, k UInt64, i32 Int32) "
|
||||||
"ENGINE=ReplicatedMergeTree('/test/tmp', 'r') ORDER BY k PARTITION BY toYYYYMM(d);"
|
"ENGINE=ReplicatedMergeTree('/test/tmp', 'r') ORDER BY k PARTITION BY toYYYYMM(d);"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -104,39 +104,41 @@ def test_create_replicated_table(started_cluster):
|
|||||||
assert (
|
assert (
|
||||||
"This syntax for *MergeTree engine is deprecated"
|
"This syntax for *MergeTree engine is deprecated"
|
||||||
in main_node.query_and_get_error(
|
in main_node.query_and_get_error(
|
||||||
"CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) "
|
"CREATE TABLE create_replicated_table.replicated_table (d Date, k UInt64, i32 Int32) "
|
||||||
"ENGINE=ReplicatedMergeTree('/test/tmp/{shard}', '{replica}', d, k, 8192);"
|
"ENGINE=ReplicatedMergeTree('/test/tmp/{shard}', '{replica}', d, k, 8192);"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);"
|
"CREATE TABLE create_replicated_table.replicated_table (d Date, k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);"
|
||||||
)
|
)
|
||||||
|
|
||||||
expected = (
|
expected = (
|
||||||
"CREATE TABLE testdb.replicated_table\\n(\\n `d` Date,\\n `k` UInt64,\\n `i32` Int32\\n)\\n"
|
"CREATE TABLE create_replicated_table.replicated_table\\n(\\n `d` Date,\\n `k` UInt64,\\n `i32` Int32\\n)\\n"
|
||||||
"ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\n"
|
"ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\n"
|
||||||
"PARTITION BY toYYYYMM(d)\\nORDER BY k\\nSETTINGS index_granularity = 8192"
|
"PARTITION BY toYYYYMM(d)\\nORDER BY k\\nSETTINGS index_granularity = 8192"
|
||||||
)
|
)
|
||||||
assert_create_query([main_node, dummy_node], "testdb.replicated_table", expected)
|
assert_create_query(
|
||||||
# assert without replacing uuid
|
[main_node, dummy_node], "create_replicated_table.replicated_table", expected
|
||||||
assert main_node.query("show create testdb.replicated_table") == dummy_node.query(
|
|
||||||
"show create testdb.replicated_table"
|
|
||||||
)
|
)
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
# assert without replacing uuid
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
assert main_node.query(
|
||||||
|
"show create create_replicated_table.replicated_table"
|
||||||
|
) == dummy_node.query("show create create_replicated_table.replicated_table")
|
||||||
|
main_node.query("DROP DATABASE create_replicated_table SYNC")
|
||||||
|
dummy_node.query("DROP DATABASE create_replicated_table SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
||||||
def test_simple_alter_table(started_cluster, engine):
|
def test_simple_alter_table(started_cluster, engine):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE test_simple_alter_table ENGINE = Replicated('/test/simple_alter_table', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE test_simple_alter_table ENGINE = Replicated('/test/simple_alter_table', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
# test_simple_alter_table
|
# test_simple_alter_table
|
||||||
name = "testdb.alter_test_{}".format(engine)
|
name = "test_simple_alter_table.alter_test_{}".format(engine)
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE TABLE {} "
|
"CREATE TABLE {} "
|
||||||
"(CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) "
|
"(CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) "
|
||||||
@ -184,10 +186,10 @@ def test_simple_alter_table(started_cluster, engine):
|
|||||||
|
|
||||||
# test_create_replica_after_delay
|
# test_create_replica_after_delay
|
||||||
competing_node.query(
|
competing_node.query(
|
||||||
"CREATE DATABASE IF NOT EXISTS testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica3');"
|
"CREATE DATABASE IF NOT EXISTS test_simple_alter_table ENGINE = Replicated('/test/simple_alter_table', 'shard1', 'replica3');"
|
||||||
)
|
)
|
||||||
|
|
||||||
name = "testdb.alter_test_{}".format(engine)
|
name = "test_simple_alter_table.alter_test_{}".format(engine)
|
||||||
main_node.query("ALTER TABLE {} ADD COLUMN Added3 UInt32;".format(name))
|
main_node.query("ALTER TABLE {} ADD COLUMN Added3 UInt32;".format(name))
|
||||||
main_node.query("ALTER TABLE {} DROP COLUMN AddedNested1;".format(name))
|
main_node.query("ALTER TABLE {} DROP COLUMN AddedNested1;".format(name))
|
||||||
main_node.query("ALTER TABLE {} RENAME COLUMN Added1 TO AddedNested1;".format(name))
|
main_node.query("ALTER TABLE {} RENAME COLUMN Added1 TO AddedNested1;".format(name))
|
||||||
@ -207,21 +209,21 @@ def test_simple_alter_table(started_cluster, engine):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert_create_query([main_node, dummy_node, competing_node], name, expected)
|
assert_create_query([main_node, dummy_node, competing_node], name, expected)
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
main_node.query("DROP DATABASE test_simple_alter_table SYNC")
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
dummy_node.query("DROP DATABASE test_simple_alter_table SYNC")
|
||||||
competing_node.query("DROP DATABASE testdb SYNC")
|
competing_node.query("DROP DATABASE test_simple_alter_table SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
||||||
def test_delete_from_table(started_cluster, engine):
|
def test_delete_from_table(started_cluster, engine):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE delete_from_table ENGINE = Replicated('/test/simple_alter_table', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard2', 'replica1');"
|
"CREATE DATABASE delete_from_table ENGINE = Replicated('/test/simple_alter_table', 'shard2', 'replica1');"
|
||||||
)
|
)
|
||||||
|
|
||||||
name = "testdb.delete_test_{}".format(engine)
|
name = "delete_from_table.delete_test_{}".format(engine)
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE TABLE {} "
|
"CREATE TABLE {} "
|
||||||
"(id UInt64, value String) "
|
"(id UInt64, value String) "
|
||||||
@ -238,7 +240,7 @@ def test_delete_from_table(started_cluster, engine):
|
|||||||
|
|
||||||
table_for_select = name
|
table_for_select = name
|
||||||
if not "Replicated" in engine:
|
if not "Replicated" in engine:
|
||||||
table_for_select = "cluster('testdb', {})".format(name)
|
table_for_select = "cluster('delete_from_table', {})".format(name)
|
||||||
for node in [main_node, dummy_node]:
|
for node in [main_node, dummy_node]:
|
||||||
assert_eq_with_retry(
|
assert_eq_with_retry(
|
||||||
node,
|
node,
|
||||||
@ -246,8 +248,8 @@ def test_delete_from_table(started_cluster, engine):
|
|||||||
expected,
|
expected,
|
||||||
)
|
)
|
||||||
|
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
main_node.query("DROP DATABASE delete_from_table SYNC")
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
dummy_node.query("DROP DATABASE delete_from_table SYNC")
|
||||||
|
|
||||||
|
|
||||||
def get_table_uuid(database, name):
|
def get_table_uuid(database, name):
|
||||||
@ -276,17 +278,17 @@ def fixture_attachable_part(started_cluster):
|
|||||||
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
||||||
def test_alter_attach(started_cluster, attachable_part, engine):
|
def test_alter_attach(started_cluster, attachable_part, engine):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE alter_attach ENGINE = Replicated('/test/alter_attach', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE alter_attach ENGINE = Replicated('/test/alter_attach', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
|
|
||||||
name = "alter_attach_test_{}".format(engine)
|
name = "alter_attach_test_{}".format(engine)
|
||||||
main_node.query(
|
main_node.query(
|
||||||
f"CREATE TABLE testdb.{name} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
f"CREATE TABLE alter_attach.{name} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
||||||
)
|
)
|
||||||
table_uuid = get_table_uuid("testdb", name)
|
table_uuid = get_table_uuid("alter_attach", name)
|
||||||
# Provide and attach a part to the main node
|
# Provide and attach a part to the main node
|
||||||
shutil.copytree(
|
shutil.copytree(
|
||||||
attachable_part,
|
attachable_part,
|
||||||
@ -295,113 +297,122 @@ def test_alter_attach(started_cluster, attachable_part, engine):
|
|||||||
f"database/store/{table_uuid[:3]}/{table_uuid}/detached/all_1_1_0",
|
f"database/store/{table_uuid[:3]}/{table_uuid}/detached/all_1_1_0",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
main_node.query(f"ALTER TABLE testdb.{name} ATTACH PART 'all_1_1_0'")
|
main_node.query(f"ALTER TABLE alter_attach.{name} ATTACH PART 'all_1_1_0'")
|
||||||
# On the main node, data is attached
|
# On the main node, data is attached
|
||||||
assert main_node.query(f"SELECT CounterID FROM testdb.{name}") == "123\n"
|
assert main_node.query(f"SELECT CounterID FROM alter_attach.{name}") == "123\n"
|
||||||
# On the other node, data is replicated only if using a Replicated table engine
|
# On the other node, data is replicated only if using a Replicated table engine
|
||||||
if engine == "ReplicatedMergeTree":
|
if engine == "ReplicatedMergeTree":
|
||||||
assert dummy_node.query(f"SELECT CounterID FROM testdb.{name}") == "123\n"
|
assert dummy_node.query(f"SELECT CounterID FROM alter_attach.{name}") == "123\n"
|
||||||
else:
|
else:
|
||||||
assert dummy_node.query(f"SELECT CounterID FROM testdb.{name}") == ""
|
assert dummy_node.query(f"SELECT CounterID FROM alter_attach.{name}") == ""
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
main_node.query("DROP DATABASE alter_attach SYNC")
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
dummy_node.query("DROP DATABASE alter_attach SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
||||||
def test_alter_drop_part(started_cluster, engine):
|
def test_alter_drop_part(started_cluster, engine):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE alter_drop_part ENGINE = Replicated('/test/alter_drop_part', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE alter_drop_part ENGINE = Replicated('/test/alter_drop_part', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
|
|
||||||
table = f"alter_drop_{engine}"
|
table = f"alter_drop_{engine}"
|
||||||
part_name = "all_0_0_0" if engine == "ReplicatedMergeTree" else "all_1_1_0"
|
part_name = "all_0_0_0" if engine == "ReplicatedMergeTree" else "all_1_1_0"
|
||||||
main_node.query(
|
main_node.query(
|
||||||
f"CREATE TABLE testdb.{table} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
f"CREATE TABLE alter_drop_part.{table} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
||||||
)
|
)
|
||||||
main_node.query(f"INSERT INTO testdb.{table} VALUES (123)")
|
main_node.query(f"INSERT INTO alter_drop_part.{table} VALUES (123)")
|
||||||
if engine == "MergeTree":
|
if engine == "MergeTree":
|
||||||
dummy_node.query(f"INSERT INTO testdb.{table} VALUES (456)")
|
dummy_node.query(f"INSERT INTO alter_drop_part.{table} VALUES (456)")
|
||||||
main_node.query(f"ALTER TABLE testdb.{table} DROP PART '{part_name}'")
|
main_node.query(f"ALTER TABLE alter_drop_part.{table} DROP PART '{part_name}'")
|
||||||
assert main_node.query(f"SELECT CounterID FROM testdb.{table}") == ""
|
assert main_node.query(f"SELECT CounterID FROM alter_drop_part.{table}") == ""
|
||||||
if engine == "ReplicatedMergeTree":
|
if engine == "ReplicatedMergeTree":
|
||||||
# The DROP operation is still replicated at the table engine level
|
# The DROP operation is still replicated at the table engine level
|
||||||
assert dummy_node.query(f"SELECT CounterID FROM testdb.{table}") == ""
|
assert dummy_node.query(f"SELECT CounterID FROM alter_drop_part.{table}") == ""
|
||||||
else:
|
else:
|
||||||
assert dummy_node.query(f"SELECT CounterID FROM testdb.{table}") == "456\n"
|
assert (
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
dummy_node.query(f"SELECT CounterID FROM alter_drop_part.{table}")
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
== "456\n"
|
||||||
|
)
|
||||||
|
main_node.query("DROP DATABASE alter_drop_part SYNC")
|
||||||
|
dummy_node.query("DROP DATABASE alter_drop_part SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
||||||
def test_alter_detach_part(started_cluster, engine):
|
def test_alter_detach_part(started_cluster, engine):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE alter_detach_part ENGINE = Replicated('/test/alter_detach_part', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE alter_detach_part ENGINE = Replicated('/test/alter_detach_part', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
|
|
||||||
table = f"alter_detach_{engine}"
|
table = f"alter_detach_{engine}"
|
||||||
part_name = "all_0_0_0" if engine == "ReplicatedMergeTree" else "all_1_1_0"
|
part_name = "all_0_0_0" if engine == "ReplicatedMergeTree" else "all_1_1_0"
|
||||||
main_node.query(
|
main_node.query(
|
||||||
f"CREATE TABLE testdb.{table} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
f"CREATE TABLE alter_detach_part.{table} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
||||||
)
|
)
|
||||||
main_node.query(f"INSERT INTO testdb.{table} VALUES (123)")
|
main_node.query(f"INSERT INTO alter_detach_part.{table} VALUES (123)")
|
||||||
if engine == "MergeTree":
|
if engine == "MergeTree":
|
||||||
dummy_node.query(f"INSERT INTO testdb.{table} VALUES (456)")
|
dummy_node.query(f"INSERT INTO alter_detach_part.{table} VALUES (456)")
|
||||||
main_node.query(f"ALTER TABLE testdb.{table} DETACH PART '{part_name}'")
|
main_node.query(f"ALTER TABLE alter_detach_part.{table} DETACH PART '{part_name}'")
|
||||||
detached_parts_query = f"SELECT name FROM system.detached_parts WHERE database='testdb' AND table='{table}'"
|
detached_parts_query = f"SELECT name FROM system.detached_parts WHERE database='alter_detach_part' AND table='{table}'"
|
||||||
assert main_node.query(detached_parts_query) == f"{part_name}\n"
|
assert main_node.query(detached_parts_query) == f"{part_name}\n"
|
||||||
if engine == "ReplicatedMergeTree":
|
if engine == "ReplicatedMergeTree":
|
||||||
# The detach operation is still replicated at the table engine level
|
# The detach operation is still replicated at the table engine level
|
||||||
assert dummy_node.query(detached_parts_query) == f"{part_name}\n"
|
assert dummy_node.query(detached_parts_query) == f"{part_name}\n"
|
||||||
else:
|
else:
|
||||||
assert dummy_node.query(detached_parts_query) == ""
|
assert dummy_node.query(detached_parts_query) == ""
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
main_node.query("DROP DATABASE alter_detach_part SYNC")
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
dummy_node.query("DROP DATABASE alter_detach_part SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
||||||
def test_alter_drop_detached_part(started_cluster, engine):
|
def test_alter_drop_detached_part(started_cluster, engine):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE alter_drop_detached_part ENGINE = Replicated('/test/alter_drop_detached_part', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE alter_drop_detached_part ENGINE = Replicated('/test/alter_drop_detached_part', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
|
|
||||||
table = f"alter_drop_detached_{engine}"
|
table = f"alter_drop_detached_{engine}"
|
||||||
part_name = "all_0_0_0" if engine == "ReplicatedMergeTree" else "all_1_1_0"
|
part_name = "all_0_0_0" if engine == "ReplicatedMergeTree" else "all_1_1_0"
|
||||||
main_node.query(
|
main_node.query(
|
||||||
f"CREATE TABLE testdb.{table} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
f"CREATE TABLE alter_drop_detached_part.{table} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)"
|
||||||
|
)
|
||||||
|
main_node.query(f"INSERT INTO alter_drop_detached_part.{table} VALUES (123)")
|
||||||
|
main_node.query(
|
||||||
|
f"ALTER TABLE alter_drop_detached_part.{table} DETACH PART '{part_name}'"
|
||||||
)
|
)
|
||||||
main_node.query(f"INSERT INTO testdb.{table} VALUES (123)")
|
|
||||||
main_node.query(f"ALTER TABLE testdb.{table} DETACH PART '{part_name}'")
|
|
||||||
if engine == "MergeTree":
|
if engine == "MergeTree":
|
||||||
dummy_node.query(f"INSERT INTO testdb.{table} VALUES (456)")
|
dummy_node.query(f"INSERT INTO alter_drop_detached_part.{table} VALUES (456)")
|
||||||
dummy_node.query(f"ALTER TABLE testdb.{table} DETACH PART '{part_name}'")
|
dummy_node.query(
|
||||||
main_node.query(f"ALTER TABLE testdb.{table} DROP DETACHED PART '{part_name}'")
|
f"ALTER TABLE alter_drop_detached_part.{table} DETACH PART '{part_name}'"
|
||||||
detached_parts_query = f"SELECT name FROM system.detached_parts WHERE database='testdb' AND table='{table}'"
|
)
|
||||||
|
main_node.query(
|
||||||
|
f"ALTER TABLE alter_drop_detached_part.{table} DROP DETACHED PART '{part_name}'"
|
||||||
|
)
|
||||||
|
detached_parts_query = f"SELECT name FROM system.detached_parts WHERE database='alter_drop_detached_part' AND table='{table}'"
|
||||||
assert main_node.query(detached_parts_query) == ""
|
assert main_node.query(detached_parts_query) == ""
|
||||||
assert dummy_node.query(detached_parts_query) == f"{part_name}\n"
|
assert dummy_node.query(detached_parts_query) == f"{part_name}\n"
|
||||||
|
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
main_node.query("DROP DATABASE alter_drop_detached_part SYNC")
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
dummy_node.query("DROP DATABASE alter_drop_detached_part SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
|
||||||
def test_alter_drop_partition(started_cluster, engine):
|
def test_alter_drop_partition(started_cluster, engine):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard1', 'replica1');"
|
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/test/alter_drop_partition', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard1', 'replica2');"
|
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/test/alter_drop_partition', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
snapshotting_node.query(
|
snapshotting_node.query(
|
||||||
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/clickhouse/databases/test_alter_drop_partition', 'shard2', 'replica1');"
|
"CREATE DATABASE alter_drop_partition ENGINE = Replicated('/test/alter_drop_partition', 'shard2', 'replica1');"
|
||||||
)
|
)
|
||||||
|
|
||||||
table = f"alter_drop_partition.alter_drop_{engine}"
|
table = f"alter_drop_partition.alter_drop_{engine}"
|
||||||
@ -430,52 +441,52 @@ def test_alter_drop_partition(started_cluster, engine):
|
|||||||
|
|
||||||
def test_alter_fetch(started_cluster):
|
def test_alter_fetch(started_cluster):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE alter_fetch ENGINE = Replicated('/test/alter_fetch', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE alter_fetch ENGINE = Replicated('/test/alter_fetch', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
|
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE TABLE testdb.fetch_source (CounterID UInt32) ENGINE = ReplicatedMergeTree ORDER BY (CounterID)"
|
"CREATE TABLE alter_fetch.fetch_source (CounterID UInt32) ENGINE = ReplicatedMergeTree ORDER BY (CounterID)"
|
||||||
)
|
)
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE TABLE testdb.fetch_target (CounterID UInt32) ENGINE = ReplicatedMergeTree ORDER BY (CounterID)"
|
"CREATE TABLE alter_fetch.fetch_target (CounterID UInt32) ENGINE = ReplicatedMergeTree ORDER BY (CounterID)"
|
||||||
)
|
)
|
||||||
main_node.query("INSERT INTO testdb.fetch_source VALUES (123)")
|
main_node.query("INSERT INTO alter_fetch.fetch_source VALUES (123)")
|
||||||
table_uuid = get_table_uuid("testdb", "fetch_source")
|
table_uuid = get_table_uuid("alter_fetch", "fetch_source")
|
||||||
main_node.query(
|
main_node.query(
|
||||||
f"ALTER TABLE testdb.fetch_target FETCH PART 'all_0_0_0' FROM '/clickhouse/tables/{table_uuid}/{{shard}}' "
|
f"ALTER TABLE alter_fetch.fetch_target FETCH PART 'all_0_0_0' FROM '/clickhouse/tables/{table_uuid}/{{shard}}' "
|
||||||
)
|
)
|
||||||
detached_parts_query = "SELECT name FROM system.detached_parts WHERE database='testdb' AND table='fetch_target'"
|
detached_parts_query = "SELECT name FROM system.detached_parts WHERE database='alter_fetch' AND table='fetch_target'"
|
||||||
assert main_node.query(detached_parts_query) == "all_0_0_0\n"
|
assert main_node.query(detached_parts_query) == "all_0_0_0\n"
|
||||||
assert dummy_node.query(detached_parts_query) == ""
|
assert dummy_node.query(detached_parts_query) == ""
|
||||||
|
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
main_node.query("DROP DATABASE alter_fetch SYNC")
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
dummy_node.query("DROP DATABASE alter_fetch SYNC")
|
||||||
|
|
||||||
|
|
||||||
def test_alters_from_different_replicas(started_cluster):
|
def test_alters_from_different_replicas(started_cluster):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE alters_from_different_replicas ENGINE = Replicated('/test/alters_from_different_replicas', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE alters_from_different_replicas ENGINE = Replicated('/test/alters_from_different_replicas', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
|
|
||||||
# test_alters_from_different_replicas
|
# test_alters_from_different_replicas
|
||||||
competing_node.query(
|
competing_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica3');"
|
"CREATE DATABASE alters_from_different_replicas ENGINE = Replicated('/test/alters_from_different_replicas', 'shard1', 'replica3');"
|
||||||
)
|
)
|
||||||
|
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE TABLE testdb.concurrent_test "
|
"CREATE TABLE alters_from_different_replicas.concurrent_test "
|
||||||
"(CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) "
|
"(CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) "
|
||||||
"ENGINE = MergeTree PARTITION BY toYYYYMM(StartDate) ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID);"
|
"ENGINE = MergeTree PARTITION BY toYYYYMM(StartDate) ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID);"
|
||||||
)
|
)
|
||||||
|
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE TABLE testdb.dist AS testdb.concurrent_test ENGINE = Distributed(testdb, testdb, concurrent_test, CounterID)"
|
"CREATE TABLE alters_from_different_replicas.dist AS alters_from_different_replicas.concurrent_test ENGINE = Distributed(alters_from_different_replicas, alters_from_different_replicas, concurrent_test, CounterID)"
|
||||||
)
|
)
|
||||||
|
|
||||||
dummy_node.stop_clickhouse(kill=True)
|
dummy_node.stop_clickhouse(kill=True)
|
||||||
@ -484,7 +495,7 @@ def test_alters_from_different_replicas(started_cluster):
|
|||||||
assert (
|
assert (
|
||||||
"There are 1 unfinished hosts (0 of them are currently active)"
|
"There are 1 unfinished hosts (0 of them are currently active)"
|
||||||
in competing_node.query_and_get_error(
|
in competing_node.query_and_get_error(
|
||||||
"ALTER TABLE testdb.concurrent_test ADD COLUMN Added0 UInt32;",
|
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added0 UInt32;",
|
||||||
settings=settings,
|
settings=settings,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -493,7 +504,7 @@ def test_alters_from_different_replicas(started_cluster):
|
|||||||
"distributed_ddl_output_mode": "null_status_on_timeout",
|
"distributed_ddl_output_mode": "null_status_on_timeout",
|
||||||
}
|
}
|
||||||
assert "shard1\treplica2\tQUEUED\t" in main_node.query(
|
assert "shard1\treplica2\tQUEUED\t" in main_node.query(
|
||||||
"ALTER TABLE testdb.concurrent_test ADD COLUMN Added2 UInt32;",
|
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added2 UInt32;",
|
||||||
settings=settings,
|
settings=settings,
|
||||||
)
|
)
|
||||||
settings = {
|
settings = {
|
||||||
@ -501,22 +512,22 @@ def test_alters_from_different_replicas(started_cluster):
|
|||||||
"distributed_ddl_output_mode": "never_throw",
|
"distributed_ddl_output_mode": "never_throw",
|
||||||
}
|
}
|
||||||
assert "shard1\treplica2\tQUEUED\t" in competing_node.query(
|
assert "shard1\treplica2\tQUEUED\t" in competing_node.query(
|
||||||
"ALTER TABLE testdb.concurrent_test ADD COLUMN Added1 UInt32 AFTER Added0;",
|
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added1 UInt32 AFTER Added0;",
|
||||||
settings=settings,
|
settings=settings,
|
||||||
)
|
)
|
||||||
dummy_node.start_clickhouse()
|
dummy_node.start_clickhouse()
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;"
|
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;"
|
||||||
)
|
)
|
||||||
competing_node.query(
|
competing_node.query(
|
||||||
"ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;"
|
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;"
|
||||||
)
|
)
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;"
|
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;"
|
||||||
)
|
)
|
||||||
|
|
||||||
expected = (
|
expected = (
|
||||||
"CREATE TABLE testdb.concurrent_test\\n(\\n `CounterID` UInt32,\\n `StartDate` Date,\\n `UserID` UInt32,\\n"
|
"CREATE TABLE alters_from_different_replicas.concurrent_test\\n(\\n `CounterID` UInt32,\\n `StartDate` Date,\\n `UserID` UInt32,\\n"
|
||||||
" `VisitID` UInt32,\\n `NestedColumn.A` Array(UInt8),\\n `NestedColumn.S` Array(String),\\n `ToDrop` UInt32,\\n"
|
" `VisitID` UInt32,\\n `NestedColumn.A` Array(UInt8),\\n `NestedColumn.S` Array(String),\\n `ToDrop` UInt32,\\n"
|
||||||
" `Added0` UInt32,\\n `Added1` UInt32,\\n `Added2` UInt32,\\n `AddedNested1.A` Array(UInt32),\\n"
|
" `Added0` UInt32,\\n `Added1` UInt32,\\n `Added2` UInt32,\\n `AddedNested1.A` Array(UInt32),\\n"
|
||||||
" `AddedNested1.B` Array(UInt64),\\n `AddedNested1.C` Array(String),\\n `AddedNested2.A` Array(UInt32),\\n"
|
" `AddedNested1.B` Array(UInt64),\\n `AddedNested1.C` Array(String),\\n `AddedNested2.A` Array(UInt32),\\n"
|
||||||
@ -524,51 +535,63 @@ def test_alters_from_different_replicas(started_cluster):
|
|||||||
"ENGINE = MergeTree\\nPARTITION BY toYYYYMM(StartDate)\\nORDER BY (CounterID, StartDate, intHash32(UserID), VisitID)\\nSETTINGS index_granularity = 8192"
|
"ENGINE = MergeTree\\nPARTITION BY toYYYYMM(StartDate)\\nORDER BY (CounterID, StartDate, intHash32(UserID), VisitID)\\nSETTINGS index_granularity = 8192"
|
||||||
)
|
)
|
||||||
|
|
||||||
assert_create_query([main_node, competing_node], "testdb.concurrent_test", expected)
|
assert_create_query(
|
||||||
|
[main_node, competing_node],
|
||||||
|
"alters_from_different_replicas.concurrent_test",
|
||||||
|
expected,
|
||||||
|
)
|
||||||
|
|
||||||
# test_create_replica_after_delay
|
# test_create_replica_after_delay
|
||||||
main_node.query("DROP TABLE testdb.concurrent_test SYNC")
|
main_node.query("DROP TABLE alters_from_different_replicas.concurrent_test SYNC")
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE TABLE testdb.concurrent_test "
|
"CREATE TABLE alters_from_different_replicas.concurrent_test "
|
||||||
"(CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) "
|
"(CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) "
|
||||||
"ENGINE = ReplicatedMergeTree ORDER BY CounterID;"
|
"ENGINE = ReplicatedMergeTree ORDER BY CounterID;"
|
||||||
)
|
)
|
||||||
|
|
||||||
expected = (
|
expected = (
|
||||||
"CREATE TABLE testdb.concurrent_test\\n(\\n `CounterID` UInt32,\\n `StartDate` Date,\\n `UserID` UInt32,\\n"
|
"CREATE TABLE alters_from_different_replicas.concurrent_test\\n(\\n `CounterID` UInt32,\\n `StartDate` Date,\\n `UserID` UInt32,\\n"
|
||||||
" `VisitID` UInt32,\\n `NestedColumn.A` Array(UInt8),\\n `NestedColumn.S` Array(String),\\n `ToDrop` UInt32\\n)\\n"
|
" `VisitID` UInt32,\\n `NestedColumn.A` Array(UInt8),\\n `NestedColumn.S` Array(String),\\n `ToDrop` UInt32\\n)\\n"
|
||||||
"ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nORDER BY CounterID\\nSETTINGS index_granularity = 8192"
|
"ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nORDER BY CounterID\\nSETTINGS index_granularity = 8192"
|
||||||
)
|
)
|
||||||
|
|
||||||
assert_create_query([main_node, competing_node], "testdb.concurrent_test", expected)
|
assert_create_query(
|
||||||
|
[main_node, competing_node],
|
||||||
|
"alters_from_different_replicas.concurrent_test",
|
||||||
|
expected,
|
||||||
|
)
|
||||||
|
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"INSERT INTO testdb.dist (CounterID, StartDate, UserID) SELECT number, addDays(toDate('2020-02-02'), number), intHash32(number) FROM numbers(10)"
|
"INSERT INTO alters_from_different_replicas.dist (CounterID, StartDate, UserID) SELECT number, addDays(toDate('2020-02-02'), number), intHash32(number) FROM numbers(10)"
|
||||||
)
|
)
|
||||||
|
|
||||||
# test_replica_restart
|
# test_replica_restart
|
||||||
main_node.restart_clickhouse()
|
main_node.restart_clickhouse()
|
||||||
|
|
||||||
expected = (
|
expected = (
|
||||||
"CREATE TABLE testdb.concurrent_test\\n(\\n `CounterID` UInt32,\\n `StartDate` Date,\\n `UserID` UInt32,\\n"
|
"CREATE TABLE alters_from_different_replicas.concurrent_test\\n(\\n `CounterID` UInt32,\\n `StartDate` Date,\\n `UserID` UInt32,\\n"
|
||||||
" `VisitID` UInt32,\\n `NestedColumn.A` Array(UInt8),\\n `NestedColumn.S` Array(String),\\n `ToDrop` UInt32\\n)\\n"
|
" `VisitID` UInt32,\\n `NestedColumn.A` Array(UInt8),\\n `NestedColumn.S` Array(String),\\n `ToDrop` UInt32\\n)\\n"
|
||||||
"ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nORDER BY CounterID\\nSETTINGS index_granularity = 8192"
|
"ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nORDER BY CounterID\\nSETTINGS index_granularity = 8192"
|
||||||
)
|
)
|
||||||
|
|
||||||
# test_snapshot_and_snapshot_recover
|
# test_snapshot_and_snapshot_recover
|
||||||
snapshotting_node.query(
|
snapshotting_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard2', 'replica1');"
|
"CREATE DATABASE alters_from_different_replicas ENGINE = Replicated('/test/alters_from_different_replicas', 'shard2', 'replica1');"
|
||||||
)
|
)
|
||||||
snapshot_recovering_node.query(
|
snapshot_recovering_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard2', 'replica2');"
|
"CREATE DATABASE alters_from_different_replicas ENGINE = Replicated('/test/alters_from_different_replicas', 'shard2', 'replica2');"
|
||||||
|
)
|
||||||
|
assert_create_query(
|
||||||
|
all_nodes, "alters_from_different_replicas.concurrent_test", expected
|
||||||
)
|
)
|
||||||
assert_create_query(all_nodes, "testdb.concurrent_test", expected)
|
|
||||||
|
|
||||||
main_node.query("SYSTEM FLUSH DISTRIBUTED testdb.dist")
|
main_node.query("SYSTEM FLUSH DISTRIBUTED alters_from_different_replicas.dist")
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"ALTER TABLE testdb.concurrent_test UPDATE StartDate = addYears(StartDate, 1) WHERE 1"
|
"ALTER TABLE alters_from_different_replicas.concurrent_test UPDATE StartDate = addYears(StartDate, 1) WHERE 1"
|
||||||
|
)
|
||||||
|
res = main_node.query(
|
||||||
|
"ALTER TABLE alters_from_different_replicas.concurrent_test DELETE WHERE UserID % 2"
|
||||||
)
|
)
|
||||||
res = main_node.query("ALTER TABLE testdb.concurrent_test DELETE WHERE UserID % 2")
|
|
||||||
assert (
|
assert (
|
||||||
"shard1\treplica1\tOK" in res
|
"shard1\treplica1\tOK" in res
|
||||||
and "shard1\treplica2\tOK" in res
|
and "shard1\treplica2\tOK" in res
|
||||||
@ -585,28 +608,34 @@ def test_alters_from_different_replicas(started_cluster):
|
|||||||
)
|
)
|
||||||
assert (
|
assert (
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"SELECT shard_num, replica_num, host_name FROM system.clusters WHERE cluster='testdb'"
|
"SELECT shard_num, replica_num, host_name FROM system.clusters WHERE cluster='alters_from_different_replicas'"
|
||||||
)
|
)
|
||||||
== expected
|
== expected
|
||||||
)
|
)
|
||||||
|
|
||||||
# test_drop_and_create_replica
|
# test_drop_and_create_replica
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
main_node.query("DROP DATABASE alters_from_different_replicas SYNC")
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE alters_from_different_replicas ENGINE = Replicated('/test/alters_from_different_replicas', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
|
|
||||||
expected = (
|
expected = (
|
||||||
"CREATE TABLE testdb.concurrent_test\\n(\\n `CounterID` UInt32,\\n `StartDate` Date,\\n `UserID` UInt32,\\n"
|
"CREATE TABLE alters_from_different_replicas.concurrent_test\\n(\\n `CounterID` UInt32,\\n `StartDate` Date,\\n `UserID` UInt32,\\n"
|
||||||
" `VisitID` UInt32,\\n `NestedColumn.A` Array(UInt8),\\n `NestedColumn.S` Array(String),\\n `ToDrop` UInt32\\n)\\n"
|
" `VisitID` UInt32,\\n `NestedColumn.A` Array(UInt8),\\n `NestedColumn.S` Array(String),\\n `ToDrop` UInt32\\n)\\n"
|
||||||
"ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nORDER BY CounterID\\nSETTINGS index_granularity = 8192"
|
"ENGINE = ReplicatedMergeTree(\\'/clickhouse/tables/{uuid}/{shard}\\', \\'{replica}\\')\\nORDER BY CounterID\\nSETTINGS index_granularity = 8192"
|
||||||
)
|
)
|
||||||
|
|
||||||
assert_create_query([main_node, competing_node], "testdb.concurrent_test", expected)
|
assert_create_query(
|
||||||
assert_create_query(all_nodes, "testdb.concurrent_test", expected)
|
[main_node, competing_node],
|
||||||
|
"alters_from_different_replicas.concurrent_test",
|
||||||
|
expected,
|
||||||
|
)
|
||||||
|
assert_create_query(
|
||||||
|
all_nodes, "alters_from_different_replicas.concurrent_test", expected
|
||||||
|
)
|
||||||
|
|
||||||
for node in all_nodes:
|
for node in all_nodes:
|
||||||
node.query("SYSTEM SYNC REPLICA testdb.concurrent_test")
|
node.query("SYSTEM SYNC REPLICA alters_from_different_replicas.concurrent_test")
|
||||||
|
|
||||||
expected = (
|
expected = (
|
||||||
"0\t2021-02-02\t4249604106\n"
|
"0\t2021-02-02\t4249604106\n"
|
||||||
@ -618,14 +647,14 @@ def test_alters_from_different_replicas(started_cluster):
|
|||||||
|
|
||||||
assert_eq_with_retry(
|
assert_eq_with_retry(
|
||||||
dummy_node,
|
dummy_node,
|
||||||
"SELECT CounterID, StartDate, UserID FROM testdb.dist ORDER BY CounterID",
|
"SELECT CounterID, StartDate, UserID FROM alters_from_different_replicas.dist ORDER BY CounterID",
|
||||||
expected,
|
expected,
|
||||||
)
|
)
|
||||||
main_node.query("DROP DATABASE testdb SYNC")
|
main_node.query("DROP DATABASE alters_from_different_replicas SYNC")
|
||||||
dummy_node.query("DROP DATABASE testdb SYNC")
|
dummy_node.query("DROP DATABASE alters_from_different_replicas SYNC")
|
||||||
competing_node.query("DROP DATABASE testdb SYNC")
|
competing_node.query("DROP DATABASE alters_from_different_replicas SYNC")
|
||||||
snapshotting_node.query("DROP DATABASE testdb SYNC")
|
snapshotting_node.query("DROP DATABASE alters_from_different_replicas SYNC")
|
||||||
snapshot_recovering_node.query("DROP DATABASE testdb SYNC")
|
snapshot_recovering_node.query("DROP DATABASE alters_from_different_replicas SYNC")
|
||||||
|
|
||||||
|
|
||||||
def create_some_tables(db):
|
def create_some_tables(db):
|
||||||
@ -1063,10 +1092,10 @@ def test_server_uuid(started_cluster):
|
|||||||
|
|
||||||
def test_sync_replica(started_cluster):
|
def test_sync_replica(started_cluster):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
"CREATE DATABASE test_sync_database ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');"
|
"CREATE DATABASE test_sync_database ENGINE = Replicated('/test/sync_replica', 'shard1', 'replica1');"
|
||||||
)
|
)
|
||||||
dummy_node.query(
|
dummy_node.query(
|
||||||
"CREATE DATABASE test_sync_database ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');"
|
"CREATE DATABASE test_sync_database ENGINE = Replicated('/test/sync_replica', 'shard1', 'replica2');"
|
||||||
)
|
)
|
||||||
|
|
||||||
number_of_tables = 1000
|
number_of_tables = 1000
|
||||||
@ -1113,17 +1142,20 @@ def test_sync_replica(started_cluster):
|
|||||||
)
|
)
|
||||||
|
|
||||||
lp1 = main_node.query(
|
lp1 = main_node.query(
|
||||||
"select value from system.zookeeper where path='/clickhouse/databases/test1/replicas/shard1|replica1' and name='log_ptr'"
|
"select value from system.zookeeper where path='/test/sync_replica/replicas/shard1|replica1' and name='log_ptr'"
|
||||||
)
|
)
|
||||||
lp2 = main_node.query(
|
lp2 = main_node.query(
|
||||||
"select value from system.zookeeper where path='/clickhouse/databases/test1/replicas/shard1|replica2' and name='log_ptr'"
|
"select value from system.zookeeper where path='/test/sync_replica/replicas/shard1|replica2' and name='log_ptr'"
|
||||||
)
|
)
|
||||||
max_lp = main_node.query(
|
max_lp = main_node.query(
|
||||||
"select value from system.zookeeper where path='/clickhouse/databases/test1/' and name='max_log_ptr'"
|
"select value from system.zookeeper where path='/test/sync_replica/' and name='max_log_ptr'"
|
||||||
)
|
)
|
||||||
assert lp1 == max_lp
|
assert lp1 == max_lp
|
||||||
assert lp2 == max_lp
|
assert lp2 == max_lp
|
||||||
|
|
||||||
|
main_node.query("DROP DATABASE test_sync_database SYNC")
|
||||||
|
dummy_node.query("DROP DATABASE test_sync_database SYNC")
|
||||||
|
|
||||||
|
|
||||||
def test_force_synchronous_settings(started_cluster):
|
def test_force_synchronous_settings(started_cluster):
|
||||||
main_node.query(
|
main_node.query(
|
||||||
|
Loading…
Reference in New Issue
Block a user