|
|
|
@ -244,7 +244,7 @@ def test_query_parser(start_cluster):
|
|
|
|
|
try:
|
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE table_with_absent_policy (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS table_with_absent_policy (
|
|
|
|
|
d UInt64
|
|
|
|
|
) ENGINE = MergeTree()
|
|
|
|
|
ORDER BY d
|
|
|
|
@ -253,7 +253,7 @@ def test_query_parser(start_cluster):
|
|
|
|
|
|
|
|
|
|
with pytest.raises(QueryRuntimeException):
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE table_with_absent_policy (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS table_with_absent_policy (
|
|
|
|
|
d UInt64
|
|
|
|
|
) ENGINE = MergeTree()
|
|
|
|
|
ORDER BY d
|
|
|
|
@ -261,7 +261,7 @@ def test_query_parser(start_cluster):
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE table_with_normal_policy (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS table_with_normal_policy (
|
|
|
|
|
d UInt64
|
|
|
|
|
) ENGINE = MergeTree()
|
|
|
|
|
ORDER BY d
|
|
|
|
@ -295,8 +295,8 @@ def test_query_parser(start_cluster):
|
|
|
|
|
])
|
|
|
|
|
def test_alter_policy(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
d UInt64
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
|
ORDER BY d
|
|
|
|
@ -314,7 +314,7 @@ def test_alter_policy(start_cluster, name, engine):
|
|
|
|
|
assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
|
|
|
|
|
name=name)) == "small_jbod_with_external\n"
|
|
|
|
|
|
|
|
|
|
node1.query("""ALTER TABLE {name} MODIFY SETTING storage_policy='jbods_with_external'""".format(name=name))
|
|
|
|
|
node1.query_with_retry("""ALTER TABLE {name} MODIFY SETTING storage_policy='jbods_with_external'""".format(name=name))
|
|
|
|
|
|
|
|
|
|
assert node1.query("""SELECT storage_policy FROM system.tables WHERE name = '{name}'""".format(
|
|
|
|
|
name=name)) == "jbods_with_external\n"
|
|
|
|
@ -327,7 +327,7 @@ def test_alter_policy(start_cluster, name, engine):
|
|
|
|
|
name=name)) == "jbods_with_external\n"
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_random_string(length):
|
|
|
|
@ -349,13 +349,13 @@ def test_no_warning_about_zero_max_data_part_size(start_cluster):
|
|
|
|
|
|
|
|
|
|
for node in (node1, node2):
|
|
|
|
|
node.query("""
|
|
|
|
|
CREATE TABLE default.test_warning_table (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS default.test_warning_table (
|
|
|
|
|
s String
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
|
SETTINGS storage_policy='small_jbod_with_external'
|
|
|
|
|
""")
|
|
|
|
|
node.query("DROP TABLE default.test_warning_table SYNC")
|
|
|
|
|
node.query("DROP TABLE IF EXISTS default.test_warning_table SYNC")
|
|
|
|
|
log = get_log(node)
|
|
|
|
|
assert not re.search("Warning.*Volume.*special_warning_zero_volume", log)
|
|
|
|
|
assert not re.search("Warning.*Volume.*special_warning_default_volume", log)
|
|
|
|
@ -369,8 +369,8 @@ def test_no_warning_about_zero_max_data_part_size(start_cluster):
|
|
|
|
|
])
|
|
|
|
|
def test_round_robin(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
d UInt64
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
|
ORDER BY d
|
|
|
|
@ -378,17 +378,17 @@ def test_round_robin(start_cluster, name, engine):
|
|
|
|
|
""".format(name=name, engine=engine))
|
|
|
|
|
|
|
|
|
|
# first should go to the jbod1
|
|
|
|
|
node1.query("insert into {} select * from numbers(10000)".format(name))
|
|
|
|
|
node1.query_with_retry("insert into {} select * from numbers(10000)".format(name))
|
|
|
|
|
used_disk = get_used_disks_for_table(node1, name)
|
|
|
|
|
assert len(used_disk) == 1, 'More than one disk used for single insert'
|
|
|
|
|
|
|
|
|
|
node1.query("insert into {} select * from numbers(10000, 10000)".format(name))
|
|
|
|
|
node1.query_with_retry("insert into {} select * from numbers(10000, 10000)".format(name))
|
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
|
|
|
|
|
|
assert len(used_disks) == 2, 'Two disks should be used for two parts'
|
|
|
|
|
assert used_disks[0] != used_disks[1], "Should write to different disks"
|
|
|
|
|
|
|
|
|
|
node1.query("insert into {} select * from numbers(20000, 10000)".format(name))
|
|
|
|
|
node1.query_with_retry("insert into {} select * from numbers(20000, 10000)".format(name))
|
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
|
|
|
|
|
|
# jbod1 -> jbod2 -> jbod1 -> jbod2 ... etc
|
|
|
|
@ -396,7 +396,7 @@ def test_round_robin(start_cluster, name, engine):
|
|
|
|
|
assert used_disks[0] != used_disks[1]
|
|
|
|
|
assert used_disks[2] == used_disks[0]
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
|
|
@ -407,8 +407,8 @@ def test_max_data_part_size(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
assert int(*node1.query("""SELECT max_data_part_size FROM system.storage_policies WHERE policy_name = 'jbods_with_external' AND volume_name = 'main'""").splitlines()) == 10*1024*1024
|
|
|
|
|
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
s1 String
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -418,12 +418,12 @@ def test_max_data_part_size(start_cluster, name, engine):
|
|
|
|
|
for i in range(10):
|
|
|
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
|
|
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
|
assert len(used_disks) == 1
|
|
|
|
|
assert used_disks[0] == 'external'
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
|
|
@ -432,8 +432,8 @@ def test_max_data_part_size(start_cluster, name, engine):
|
|
|
|
|
])
|
|
|
|
|
def test_jbod_overflow(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
s1 String
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -447,7 +447,7 @@ def test_jbod_overflow(start_cluster, name, engine):
|
|
|
|
|
data = [] # 5MB in total
|
|
|
|
|
for i in range(5):
|
|
|
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
|
assert all(disk == 'jbod1' for disk in used_disks)
|
|
|
|
@ -457,7 +457,7 @@ def test_jbod_overflow(start_cluster, name, engine):
|
|
|
|
|
for i in range(10):
|
|
|
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
|
|
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
|
|
|
|
|
@ -466,7 +466,7 @@ def test_jbod_overflow(start_cluster, name, engine):
|
|
|
|
|
node1.query(f"SYSTEM START MERGES {name}")
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
|
|
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
|
|
|
|
|
node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
|
|
|
|
disks_for_merges = node1.query(
|
|
|
|
@ -476,7 +476,7 @@ def test_jbod_overflow(start_cluster, name, engine):
|
|
|
|
|
assert all(disk == 'external' for disk in disks_for_merges)
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
|
|
@ -485,8 +485,8 @@ def test_jbod_overflow(start_cluster, name, engine):
|
|
|
|
|
])
|
|
|
|
|
def test_background_move(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
s1 String
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -500,7 +500,7 @@ def test_background_move(start_cluster, name, engine):
|
|
|
|
|
for i in range(5):
|
|
|
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
|
# small jbod size is 40MB, so lets insert 5MB batch 5 times
|
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
|
|
|
|
|
@ -526,7 +526,7 @@ def test_background_move(start_cluster, name, engine):
|
|
|
|
|
node1.query(f"SYSTEM START MERGES {name}")
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
|
|
@ -535,16 +535,16 @@ def test_background_move(start_cluster, name, engine):
|
|
|
|
|
])
|
|
|
|
|
def test_start_stop_moves(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
s1 String
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
|
SETTINGS storage_policy='moving_jbod_with_external'
|
|
|
|
|
""".format(name=name, engine=engine))
|
|
|
|
|
|
|
|
|
|
node1.query("INSERT INTO {} VALUES ('HELLO')".format(name))
|
|
|
|
|
node1.query("INSERT INTO {} VALUES ('WORLD')".format(name))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES ('HELLO')".format(name))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES ('WORLD')".format(name))
|
|
|
|
|
|
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
|
assert all(d == "jbod1" for d in used_disks), "All writes shoud go to jbods"
|
|
|
|
@ -571,7 +571,7 @@ def test_start_stop_moves(start_cluster, name, engine):
|
|
|
|
|
|
|
|
|
|
assert disk == "external"
|
|
|
|
|
|
|
|
|
|
node1.query("TRUNCATE TABLE {}".format(name))
|
|
|
|
|
node1.query_with_retry("TRUNCATE TABLE {}".format(name))
|
|
|
|
|
|
|
|
|
|
node1.query("SYSTEM STOP MOVES {}".format(name))
|
|
|
|
|
node1.query("SYSTEM STOP MERGES {}".format(name))
|
|
|
|
@ -581,7 +581,7 @@ def test_start_stop_moves(start_cluster, name, engine):
|
|
|
|
|
for i in range(5):
|
|
|
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
|
# jbod size is 40MB, so lets insert 5MB batch 7 times
|
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
used_disks = get_used_disks_for_table(node1, name)
|
|
|
|
|
|
|
|
|
@ -613,7 +613,7 @@ def test_start_stop_moves(start_cluster, name, engine):
|
|
|
|
|
assert used_disks[0] == 'external'
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_path_for_part_from_part_log(node, table, part_name):
|
|
|
|
@ -640,7 +640,7 @@ def get_paths_for_partition_from_part_log(node, table, partition_id):
|
|
|
|
|
def test_alter_move(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
EventDate Date,
|
|
|
|
|
number UInt64
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
@ -713,7 +713,7 @@ def test_alter_move_half_of_partition(start_cluster, volume_or_disk):
|
|
|
|
|
engine = "MergeTree()"
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
EventDate Date,
|
|
|
|
|
number UInt64
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
@ -762,7 +762,7 @@ def test_alter_double_move_partition(start_cluster, volume_or_disk):
|
|
|
|
|
engine = "MergeTree()"
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
EventDate Date,
|
|
|
|
|
number UInt64
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
@ -833,8 +833,8 @@ def produce_alter_move(node, name):
|
|
|
|
|
])
|
|
|
|
|
def test_concurrent_alter_move(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
EventDate Date,
|
|
|
|
|
number UInt64
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
@ -850,7 +850,7 @@ def test_concurrent_alter_move(start_cluster, name, engine):
|
|
|
|
|
day = random.randint(11, 30)
|
|
|
|
|
value = values.pop()
|
|
|
|
|
month = '0' + str(random.choice([3, 4]))
|
|
|
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
|
|
|
|
|
|
def alter_move(num):
|
|
|
|
|
for i in range(num):
|
|
|
|
@ -862,7 +862,7 @@ def test_concurrent_alter_move(start_cluster, name, engine):
|
|
|
|
|
|
|
|
|
|
def optimize_table(num):
|
|
|
|
|
for i in range(num):
|
|
|
|
|
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
|
|
|
|
|
node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
|
|
|
|
|
|
|
|
|
|
p = Pool(15)
|
|
|
|
|
tasks = []
|
|
|
|
@ -889,7 +889,7 @@ def test_concurrent_alter_move(start_cluster, name, engine):
|
|
|
|
|
def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
EventDate Date,
|
|
|
|
|
number UInt64
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
@ -905,7 +905,7 @@ def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
|
|
|
|
|
day = random.randint(11, 30)
|
|
|
|
|
value = values.pop()
|
|
|
|
|
month = '0' + str(random.choice([3, 4]))
|
|
|
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
|
|
|
|
|
|
def alter_move(num):
|
|
|
|
|
for i in range(num):
|
|
|
|
@ -931,7 +931,7 @@ def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
|
|
|
|
|
assert node1.query("SELECT 1") == "1\n"
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
|
|
@ -940,8 +940,8 @@ def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
|
|
|
|
|
])
|
|
|
|
|
def test_detach_attach(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
s1 String
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -951,7 +951,7 @@ def test_detach_attach(start_cluster, name, engine):
|
|
|
|
|
data = [] # 5MB in total
|
|
|
|
|
for i in range(5):
|
|
|
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
node1.query("ALTER TABLE {} DETACH PARTITION tuple()".format(name))
|
|
|
|
|
assert node1.query("SELECT count() FROM {}".format(name)).strip() == "0"
|
|
|
|
@ -962,7 +962,7 @@ def test_detach_attach(start_cluster, name, engine):
|
|
|
|
|
assert node1.query("SELECT count() FROM {}".format(name)).strip() == "5"
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
|
|
@ -971,8 +971,8 @@ def test_detach_attach(start_cluster, name, engine):
|
|
|
|
|
])
|
|
|
|
|
def test_mutate_to_another_disk(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
s1 String
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -983,7 +983,7 @@ def test_mutate_to_another_disk(start_cluster, name, engine):
|
|
|
|
|
data = [] # 5MB in total
|
|
|
|
|
for i in range(5):
|
|
|
|
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
|
|
|
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
node1.query("ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name))
|
|
|
|
|
|
|
|
|
@ -996,7 +996,7 @@ def test_mutate_to_another_disk(start_cluster, name, engine):
|
|
|
|
|
assert node1.query("SELECT sum(endsWith(s1, 'x')) FROM {}".format(name)) == "25\n"
|
|
|
|
|
else: # mutation failed, let's try on another disk
|
|
|
|
|
print("Mutation failed")
|
|
|
|
|
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
|
|
|
|
|
node1.query_with_retry("OPTIMIZE TABLE {} FINAL".format(name))
|
|
|
|
|
node1.query("ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name))
|
|
|
|
|
retry = 20
|
|
|
|
|
while node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != "" and retry > 0:
|
|
|
|
@ -1008,7 +1008,7 @@ def test_mutate_to_another_disk(start_cluster, name, engine):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("name,engine", [
|
|
|
|
@ -1017,8 +1017,8 @@ def test_mutate_to_another_disk(start_cluster, name, engine):
|
|
|
|
|
])
|
|
|
|
|
def test_concurrent_alter_modify(start_cluster, name, engine):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
node1.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
EventDate Date,
|
|
|
|
|
number UInt64
|
|
|
|
|
) ENGINE = {engine}
|
|
|
|
@ -1034,7 +1034,7 @@ def test_concurrent_alter_modify(start_cluster, name, engine):
|
|
|
|
|
day = random.randint(11, 30)
|
|
|
|
|
value = values.pop()
|
|
|
|
|
month = '0' + str(random.choice([3, 4]))
|
|
|
|
|
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
|
node1.query_with_retry("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
|
|
|
|
|
|
|
|
|
|
def alter_move(num):
|
|
|
|
|
for i in range(num):
|
|
|
|
@ -1066,14 +1066,14 @@ def test_concurrent_alter_modify(start_cluster, name, engine):
|
|
|
|
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
node1.query_with_retry(f"DROP TABLE IF EXISTS {name} SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_simple_replication_and_moves(start_cluster):
|
|
|
|
|
try:
|
|
|
|
|
for i, node in enumerate([node1, node2]):
|
|
|
|
|
node.query("""
|
|
|
|
|
CREATE TABLE replicated_table_for_moves (
|
|
|
|
|
node.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS replicated_table_for_moves (
|
|
|
|
|
s1 String
|
|
|
|
|
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_moves', '{}')
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -1086,13 +1086,13 @@ def test_simple_replication_and_moves(start_cluster):
|
|
|
|
|
data = [] # 1MB in total
|
|
|
|
|
for i in range(2):
|
|
|
|
|
data.append(get_random_string(512 * 1024)) # 500KB value
|
|
|
|
|
node.query("INSERT INTO replicated_table_for_moves VALUES {}".format(
|
|
|
|
|
node.query_with_retry("INSERT INTO replicated_table_for_moves VALUES {}".format(
|
|
|
|
|
','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
def optimize(num):
|
|
|
|
|
for i in range(num):
|
|
|
|
|
node = random.choice([node1, node2])
|
|
|
|
|
node.query("OPTIMIZE TABLE replicated_table_for_moves FINAL")
|
|
|
|
|
node.query_with_retry("OPTIMIZE TABLE replicated_table_for_moves FINAL")
|
|
|
|
|
|
|
|
|
|
p = Pool(60)
|
|
|
|
|
tasks = []
|
|
|
|
@ -1102,8 +1102,8 @@ def test_simple_replication_and_moves(start_cluster):
|
|
|
|
|
for task in tasks:
|
|
|
|
|
task.get(timeout=60)
|
|
|
|
|
|
|
|
|
|
node1.query("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
|
|
|
|
|
node2.query("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
|
|
|
|
|
node1.query_with_retry("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
|
|
|
|
|
node2.query_with_retry("SYSTEM SYNC REPLICA replicated_table_for_moves", timeout=5)
|
|
|
|
|
|
|
|
|
|
node1.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
|
|
|
|
|
node2.query("SELECT COUNT() FROM replicated_table_for_moves") == "40\n"
|
|
|
|
@ -1116,9 +1116,9 @@ def test_simple_replication_and_moves(start_cluster):
|
|
|
|
|
node1.query("SYSTEM STOP MERGES")
|
|
|
|
|
node2.query("SYSTEM STOP MERGES")
|
|
|
|
|
|
|
|
|
|
node1.query(
|
|
|
|
|
node1.query_with_retry(
|
|
|
|
|
"INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
node2.query(
|
|
|
|
|
node2.query_with_retry(
|
|
|
|
|
"INSERT INTO replicated_table_for_moves VALUES {}".format(','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
time.sleep(3) # nothing was moved
|
|
|
|
@ -1139,8 +1139,8 @@ def test_simple_replication_and_moves(start_cluster):
|
|
|
|
|
def test_download_appropriate_disk(start_cluster):
|
|
|
|
|
try:
|
|
|
|
|
for i, node in enumerate([node1, node2]):
|
|
|
|
|
node.query("""
|
|
|
|
|
CREATE TABLE replicated_table_for_download (
|
|
|
|
|
node.query_with_retry("""
|
|
|
|
|
CREATE TABLE IF NOT EXISTS replicated_table_for_download (
|
|
|
|
|
s1 String
|
|
|
|
|
) ENGINE = ReplicatedMergeTree('/clickhouse/replicated_table_for_download', '{}')
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -1150,13 +1150,13 @@ def test_download_appropriate_disk(start_cluster):
|
|
|
|
|
data = []
|
|
|
|
|
for i in range(50):
|
|
|
|
|
data.append(get_random_string(1024 * 1024)) # 1MB value
|
|
|
|
|
node1.query(
|
|
|
|
|
node1.query_with_retry(
|
|
|
|
|
"INSERT INTO replicated_table_for_download VALUES {}".format(','.join(["('" + x + "')" for x in data])))
|
|
|
|
|
|
|
|
|
|
for _ in range(10):
|
|
|
|
|
try:
|
|
|
|
|
print("Syncing replica")
|
|
|
|
|
node2.query("SYSTEM SYNC REPLICA replicated_table_for_download")
|
|
|
|
|
node2.query_with_retry("SYSTEM SYNC REPLICA replicated_table_for_download")
|
|
|
|
|
break
|
|
|
|
|
except:
|
|
|
|
|
time.sleep(0.5)
|
|
|
|
@ -1167,13 +1167,13 @@ def test_download_appropriate_disk(start_cluster):
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
for node in [node1, node2]:
|
|
|
|
|
node.query("DROP TABLE IF EXISTS replicated_table_for_download SYNC")
|
|
|
|
|
node.query_with_retry("DROP TABLE IF EXISTS replicated_table_for_download SYNC")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_rename(start_cluster):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE default.renaming_table (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS default.renaming_table (
|
|
|
|
|
s String
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -1212,7 +1212,7 @@ def test_rename(start_cluster):
|
|
|
|
|
def test_freeze(start_cluster):
|
|
|
|
|
try:
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE default.freezing_table (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS default.freezing_table (
|
|
|
|
|
d Date,
|
|
|
|
|
s String
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
@ -1249,7 +1249,7 @@ def test_kill_while_insert(start_cluster):
|
|
|
|
|
name = "test_kill_while_insert"
|
|
|
|
|
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
s String
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -1294,7 +1294,7 @@ def test_move_while_merge(start_cluster):
|
|
|
|
|
name = "test_move_while_merge"
|
|
|
|
|
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
n Int64
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
|
ORDER BY sleep(2)
|
|
|
|
@ -1353,7 +1353,7 @@ def test_move_across_policies_does_not_work(start_cluster):
|
|
|
|
|
name = "test_move_across_policies_does_not_work"
|
|
|
|
|
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
n Int64
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -1361,7 +1361,7 @@ def test_move_across_policies_does_not_work(start_cluster):
|
|
|
|
|
""".format(name=name))
|
|
|
|
|
|
|
|
|
|
node1.query("""
|
|
|
|
|
CREATE TABLE {name}2 (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name}2 (
|
|
|
|
|
n Int64
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -1393,7 +1393,7 @@ def test_move_across_policies_does_not_work(start_cluster):
|
|
|
|
|
def _insert_merge_execute(node, name, policy, parts, cmds, parts_before_cmds, parts_after_cmds):
|
|
|
|
|
try:
|
|
|
|
|
node.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
n Int64
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
@ -1430,7 +1430,7 @@ def _check_merges_are_working(node, storage_policy, volume, shall_work):
|
|
|
|
|
name = "_check_merges_are_working_{storage_policy}_{volume}".format(storage_policy=storage_policy, volume=volume)
|
|
|
|
|
|
|
|
|
|
node.query("""
|
|
|
|
|
CREATE TABLE {name} (
|
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
|
n Int64
|
|
|
|
|
) ENGINE = MergeTree
|
|
|
|
|
ORDER BY tuple()
|
|
|
|
|