From 120e27a38546058855108365d05558b72b835512 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 20 Aug 2019 21:00:48 +0300 Subject: [PATCH] Lock parts with dirty hack --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 19 +- .../MergeTree/ReplicatedMergeTreeQueue.h | 4 +- .../Storages/StorageReplicatedMergeTree.cpp | 9 +- .../config.d/storage_configuration.xml | 99 +++++----- .../integration/test_multiple_disks/test.py | 170 +++++++++++++++--- 5 files changed, 217 insertions(+), 84 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 161543e86bf..23e8b5e05fa 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -39,7 +39,7 @@ void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & } -void ReplicatedMergeTreeQueue::disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts) +Names ReplicatedMergeTreeQueue::disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts) { std::lock_guard lock(state_mutex); for (const auto & data_part : data_parts) @@ -49,8 +49,16 @@ void ReplicatedMergeTreeQueue::disableMergesForParts(const MergeTreeData::DataPa + " already assigned to background operation.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED); } + Names result; for (const auto & data_part : data_parts) - virtual_parts.add(data_part->name); + { + MergeTreePartInfo info_copy = data_part->info; + info_copy.partition_id = "disabled"; /// Fake name to block this part + auto fake_part_name = info_copy.getPartName(); + virtual_parts.add(fake_part_name); + result.emplace_back(fake_part_name); + } + return result; } @@ -401,10 +409,15 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info) { - std::unique_lock lock(state_mutex); + std::lock_guard lock(state_mutex); return virtual_parts.remove(part_info); } +bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const String & part_name) +{ + std::lock_guard lock(state_mutex); + return virtual_parts.remove(part_name); +} void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 73965d7872b..63bcf10eaa9 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -253,6 +253,8 @@ public: bool removeFromVirtualParts(const MergeTreePartInfo & part_info); + bool removeFromVirtualParts(const String & part_name); + /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value. * If watch_callback is not empty, will call it when new entries appear in the log. * If there were new entries, notifies storage.queue_task_handle. @@ -331,7 +333,7 @@ public: /// Add part to virtual_parts, which means that part must exist /// after processing replication log up to log_pointer. /// Throws exception if any part was in virtual parts - void disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts); + Names disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts); /// Cheks that part is already in virtual parts bool isPartAssignedToBackgroundOperation(const MergeTreeData::DataPartPtr & data_part) const; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 7faf25a17a4..e54ae73b3f8 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2176,6 +2176,8 @@ struct CurrentlyMovingPartsTagger ReplicatedMergeTreeQueue & queue; + Names remove_names; + public: CurrentlyMovingPartsTagger(MergeTreeMovingParts parts_, ReplicatedMergeTreeQueue & queue_) : parts(std::move(parts_)) @@ -2186,14 +2188,15 @@ public: data_parts.emplace_back(moving_part.part); /// Throws exception if some parts already exists - queue.disableMergesForParts(data_parts); + remove_names = queue.disableMergesForParts(data_parts); } ~CurrentlyMovingPartsTagger() { /// May return false, but we don't care, it's ok. - for (auto & part : parts) - queue.removeFromVirtualParts(part.part->info); + for (auto & part_name : remove_names) + if (!queue.removeFromVirtualParts(part_name)) + std::terminate(); } }; diff --git a/dbms/tests/integration/test_multiple_disks/configs/config.d/storage_configuration.xml b/dbms/tests/integration/test_multiple_disks/configs/config.d/storage_configuration.xml index 3e430e399b4..c9e512c3749 100644 --- a/dbms/tests/integration/test_multiple_disks/configs/config.d/storage_configuration.xml +++ b/dbms/tests/integration/test_multiple_disks/configs/config.d/storage_configuration.xml @@ -2,20 +2,16 @@ - - 1000 + + 1024 - - - - /jbod1/ - 10000000 /jbod2/ - 10000000 + 10485760 + /external/ @@ -23,63 +19,64 @@ - - - - - - - - - + + + +
+ jbod1 +
+ + external + +
+
+ + + + +
+ jbod1 + jbod2 + 10485760 + +
+ + external + +
+
+ + + + +
+ jbod1 +
+ + external + +
+ 0.7 +
- default - 2000000 + 2097152 + external - 20000000 + 20971520 + - - - -
- jbod1 - jbod2 - 10000000 - -
- - external - - -
-
- - - - - - - - - - - - - - -
- \ No newline at end of file + diff --git a/dbms/tests/integration/test_multiple_disks/test.py b/dbms/tests/integration/test_multiple_disks/test.py index 4330339d9c5..6e78b279a70 100644 --- a/dbms/tests/integration/test_multiple_disks/test.py +++ b/dbms/tests/integration/test_multiple_disks/test.py @@ -1,6 +1,8 @@ import time import pytest import os +import random +import string from helpers.cluster import ClickHouseCluster @@ -44,43 +46,159 @@ def start_cluster(): # Check that configuration is valid -def test_config(start_cluster): +def test_config_parser(start_cluster): assert node1.query("select name, path, keep_free_space from system.disks") == "default\t/var/lib/clickhouse/data/\t1000\nexternal\t/external/\t0\njbod1\t/jbod1/\t10000000\njbod2\t/jbod2/\t10000000\n" assert node2.query("select name, path, keep_free_space from system.disks") == "default\t/var/lib/clickhouse/data/\t1000\nexternal\t/external/\t0\njbod1\t/jbod1/\t10000000\njbod2\t/jbod2/\t10000000\n" assert node1.query("select * from system.storage_policies") == "" \ "default\tdefault\t0\t['default']\t18446744073709551615\n" \ "default_disk_with_external\tsmall\t0\t['default']\t2000000\n" \ "default_disk_with_external\tbig\t1\t['external']\t20000000\n" \ - "jbod_with_external\tmain\t0\t['jbod1','jbod2']\t10000000\n" \ - "jbod_with_external\texternal\t1\t['external']\t18446744073709551615\n" + "jbods_with_external\tmain\t0\t['jbod1','jbod2']\t10000000\n" \ + "jbods_with_external\texternal\t1\t['external']\t18446744073709551615\n" assert node2.query("select * from system.storage_policies") == "" \ "default\tdefault\t0\t['default']\t18446744073709551615\n" \ "default_disk_with_external\tsmall\t0\t['default']\t2000000\n" \ "default_disk_with_external\tbig\t1\t['external']\t20000000\n" \ - "jbod_with_external\tmain\t0\t['jbod1','jbod2']\t10000000\n" \ - "jbod_with_external\texternal\t1\t['external']\t18446744073709551615\n" + "jbods_with_external\tmain\t0\t['jbod1','jbod2']\t10000000\n" \ + "jbods_with_external\texternal\t1\t['external']\t18446744073709551615\n" -def test_write_on_second_volume(start_cluster): - node1.query("create table node1_mt ( d UInt64 ) ENGINE = MergeTree() ORDER BY d SETTINGS storage_policy_name='jbod_with_external'") - n = 1000 - flag = True - i = 0 - used_disks = set() - retries = 0 - # Keep insert while external disk do not contain parts - while flag and i < 1000: - s = ["(" + str(n * i + k) + ")" for k in range(n)] - node1.query("insert into node1_mt values " + ', '.join(s)) - used_disks_ = node1.query("select distinct disk_name from system.parts where table == 'node1_mt'").strip().split("\n") - used_disks.update(used_disks_) - flag = "external" not in used_disks_ - i += 1 +def get_random_string(length): + return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length)) + +def get_used_disks_for_table(node, table_name): + return node.query("select disk_name from system.parts where table == '{}' order by modification_time".format(table_name)).strip().split('\n') + +def test_round_robin(start_cluster): + try: + node1.query(""" + CREATE TABLE mt_on_jbod ( + d UInt64 + ) ENGINE = MergeTree() + ORDER BY d + SETTINGS storage_policy_name='jbods_with_external' + """) + + # first should go to the jbod1 + node1.query("insert into mt_on_jbod select * from numbers(10000)") + used_disk = get_used_disks_for_table(node1, 'mt_on_jbod') + assert len(used_disk) == 1, 'More than one disk used for single insert' + assert used_disk[0] == 'jbod1', 'First disk should by jbod1' + + node1.query("insert into mt_on_jbod select * from numbers(10000)") + used_disks = get_used_disks_for_table(node1, 'mt_on_jbod') + + assert len(used_disks) == 2, 'Two disks should be used for two parts' + assert used_disks[0] == 'jbod1' + assert used_disks[1] == 'jbod2' + + node1.query("insert into mt_on_jbod select * from numbers(10000)") + used_disks = get_used_disks_for_table(node1, 'mt_on_jbod') + + assert len(used_disks) == 3 + assert used_disks[0] == 'jbod1' + assert used_disks[1] == 'jbod2' + assert used_disks[2] == 'jbod1' + finally: + node1.query("DROP TABLE IF EXISTS mt_ob_jbod") + +def test_max_data_part_size(start_cluster): + try: + node1.query(""" + CREATE TABLE mt_with_huge_part ( + s1 String + ) ENGINE = MergeTree() + ORDER BY tuple() + SETTINGS storage_policy_name='jbods_with_external' + """) + data = [] # 10MB in total + for i in range(10): + data.append(get_random_string(1024 * 1024)) # 1MB row + + node1.query("INSERT INTO mt_with_huge_part VALUES {}".format(','.join(["('" + x + "')" for x in data]))) + used_disks = get_used_disks_for_table(node1, 'mt_with_huge_part') + assert len(used_disks) == 1 + assert used_disks[0] == 'external' + finally: + node1.query("DROP TABLE IF EXISTS mt_with_huge_part") + +def test_jbod_overflow(start_cluster): + try: + node1.query(""" + CREATE TABLE mt_with_overflow ( + s1 String + ) ENGINE = MergeTree() + ORDER BY tuple() + SETTINGS storage_policy_name='small_jbod_with_external' + """) + data = [] # 5MB in total + for i in range(5): + data.append(get_random_string(1024 * 1024)) # 1MB row + + node1.query("SYSTEM STOP MERGES") + + # small jbod size is 40MB, so lets insert 5MB batch 7 times + for i in range(7): + node1.query("INSERT INTO mt_with_overflow VALUES {}".format(','.join(["('" + x + "')" for x in data]))) + + used_disks = get_used_disks_for_table(node1, 'mt_with_overflow') + assert all(disk == 'jbod1' for disk in used_disks) + + # should go to the external disk (jbod is overflown) + data = [] # 10MB in total + for i in range(10): + data.append(get_random_string(1024 * 1024)) # 1MB row + + node1.query("INSERT INTO mt_with_overflow VALUES {}".format(','.join(["('" + x + "')" for x in data]))) + + used_disks = get_used_disks_for_table(node1, 'mt_with_overflow') + + assert used_disks[-1] == 'external' + + node1.query("SYSTEM START MERGES") + node1.query("OPTIMIZE TABLE mt_with_overflow FINAL") + + disks_for_merges = node1.query("SELECT disk_name FROM system.parts WHERE table == 'mt_with_overflow' AND level >= 1 ORDER BY modification_time").strip().split('\n') + + assert all(disk == 'external' for disk in disks_for_merges) + + finally: + node1.query("DROP TABLE IF EXISTS mt_with_overflow") + +@pytest.mark.parametrize("name,engine", [ + ("moving_mt","MergeTree()"), + ("moving_replicated_mt","ReplicatedMergeTree('/clickhouse/sometable', '1')",), +]) +def test_background_move(start_cluster, name, engine): + try: + node1.query(""" + CREATE TABLE {name} ( + s1 String + ) ENGINE = {engine} + ORDER BY tuple() + SETTINGS storage_policy_name='moving_jbod_with_external' + """.format(name=name, engine=engine)) + + for i in range(5): + data = [] # 5MB in total + for i in range(5): + data.append(get_random_string(1024 * 1024)) # 1MB row + # small jbod size is 40MB, so lets insert 5MB batch 2 times (less than 70%) + node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data]))) + + + time.sleep(5) + used_disks = get_used_disks_for_table(node1, name) + + # Maximum two parts on jbod1 + assert sum(1 for x in used_disks if x == 'jbod1') <= 2 + + # first (oldest) part was moved to external + assert used_disks[0] == 'external' + + finally: + node1.query("DROP TABLE IF EXISTS {name}".format(name=name)) - # Check if all disks from policy was used - assert used_disks == {'jbod1', 'jbod2', 'external'} - node1.query("drop table node1_mt") - assert node1.query("select distinct disk_name from system.parts where table == 'node1_mt'").strip().split("\n") == [''] def test_default(start_cluster): @@ -182,12 +300,12 @@ select name, data_paths, storage_policy from system.tables where name='table_wit INSERT INTO table_with_storage_policy_default SELECT rand64() FROM numbers(100); CREATE TABLE table_with_storage_policy_default_explicit (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default'; CREATE TABLE table_with_storage_policy_default_disk_with_external (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default_disk_with_external'; -CREATE TABLE table_with_storage_policy_jbod_with_external (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbod_with_external'; +CREATE TABLE table_with_storage_policy_jbod_with_external (id UInt64) Engine=MergeTree() ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbods_with_external'; CREATE TABLE replicated_table_with_storage_policy_default (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id); CREATE TABLE replicated_table_with_storage_policy_default_explicit (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default'; CREATE TABLE replicated_table_with_storage_policy_default_disk_with_external (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='default_disk_with_external'; -CREATE TABLE replicated_table_with_storage_policy_jbod_with_external (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbod_with_external'; +CREATE TABLE replicated_table_with_storage_policy_jbod_with_external (id UInt64) Engine=ReplicatedMergeTree('/clickhouse/tables/{database}/{table}', '{replica}') ORDER BY (id) SETTINGS storage_table_with_storage_policy_name='jbods_with_external'; ```