From 5cb8605f61e48255986755089ff56c0a68da5953 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 21 Aug 2019 16:10:33 +0300 Subject: [PATCH 1/3] limit number of mutations in the queue --- .../Storages/MergeTree/MergeTreeSettings.h | 3 +- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 16 ++-- .../MergeTree/ReplicatedMergeTreeQueue.h | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 16 ++-- .../configs/merge_tree_queue.xml | 7 ++ .../test_replicated_mutations/test.py | 83 ++++++++++++++----- 6 files changed, 93 insertions(+), 34 deletions(-) create mode 100644 dbms/tests/integration/test_replicated_mutations/configs/merge_tree_queue.xml diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index e670000ecc5..36e82b96961 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -30,7 +30,8 @@ struct MergeTreeSettings : public SettingsCollection /** Merge settings. */ \ M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).") \ M(SettingUInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).") \ - M(SettingUInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.") \ + M(SettingUInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.") \ + M(SettingUInt64, max_replicated_mutations_in_queue, 8, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.") \ M(SettingUInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.") \ M(SettingSeconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.") \ M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.") \ diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 665e8c9bd5c..e6251502576 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1154,17 +1154,21 @@ bool ReplicatedMergeTreeQueue::processEntry( } -size_t ReplicatedMergeTreeQueue::countMergesAndPartMutations() const +std::pair ReplicatedMergeTreeQueue::countMergesAndPartMutations() const { std::lock_guard lock(state_mutex); - size_t count = 0; + size_t count_merges = 0; + size_t count_mutations = 0; for (const auto & entry : queue) - if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS - || entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART) - ++count; + { + if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS) + ++count_merges; + else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART) + ++count_mutations; + } - return count; + return std::make_pair(count_merges, count_mutations); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 47d82f4a9a2..5a84cfbb5a6 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -296,7 +296,7 @@ public: bool processEntry(std::function get_zookeeper, LogEntryPtr & entry, const std::function func); /// Count the number of merges and mutations of single parts in the queue. - size_t countMergesAndPartMutations() const; + std::pair countMergesAndPartMutations() const; /// Count the total number of active mutations. size_t countMutations() const; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 8b32cc32704..028235d9eef 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2193,27 +2193,31 @@ void StorageReplicatedMergeTree::mergeSelectingTask() /// If many merges is already queued, then will queue only small enough merges. /// Otherwise merge queue could be filled with only large merges, /// and in the same time, many small parts could be created and won't be merged. - size_t merges_and_mutations_queued = queue.countMergesAndPartMutations(); - if (merges_and_mutations_queued >= settings.max_replicated_merges_in_queue) + auto merges_and_mutations_queued = queue.countMergesAndPartMutations(); + size_t merges_and_mutations_sum = merges_and_mutations_queued.first + merges_and_mutations_queued.second; + if (merges_and_mutations_sum >= settings.max_replicated_merges_in_queue) { - LOG_TRACE(log, "Number of queued merges and part mutations (" << merges_and_mutations_queued - << ") is greater than max_replicated_merges_in_queue (" + LOG_TRACE(log, "Number of queued merges (" << merges_and_mutations_queued.first << ") and part mutations (" + << merges_and_mutations_queued.second << ") is greater than max_replicated_merges_in_queue (" << settings.max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate."); } else { UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge( - settings.max_replicated_merges_in_queue, merges_and_mutations_queued); + settings.max_replicated_merges_in_queue, merges_and_mutations_sum); UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation(); FutureMergedMutatedPart future_merged_part; + + /// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts if (max_source_parts_size_for_merge > 0 && merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred)) { success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, force_ttl); } - else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0) + else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0 + && merges_and_mutations_queued.second < settings.max_replicated_mutations_in_queue) { /// Choose a part to mutate. DataPartsVector data_parts = getDataPartsVector(); diff --git a/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_queue.xml b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_queue.xml new file mode 100644 index 00000000000..ccc63f2eaec --- /dev/null +++ b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_queue.xml @@ -0,0 +1,7 @@ + + + 50 + 8 + 4 + + \ No newline at end of file diff --git a/dbms/tests/integration/test_replicated_mutations/test.py b/dbms/tests/integration/test_replicated_mutations/test.py index 351ceff3608..dd42a70e280 100644 --- a/dbms/tests/integration/test_replicated_mutations/test.py +++ b/dbms/tests/integration/test_replicated_mutations/test.py @@ -10,21 +10,25 @@ from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) -node1 = cluster.add_instance('node1', with_zookeeper=True) +node1 = cluster.add_instance('node1', macros={'cluster': 'test1'}, with_zookeeper=True) # Check, that limits on max part size for merges doesn`t affect mutations -node2 = cluster.add_instance('node2', main_configs=["configs/merge_tree.xml"], with_zookeeper=True) -nodes = [node1, node2] +node2 = cluster.add_instance('node2', macros={'cluster': 'test1'}, main_configs=["configs/merge_tree.xml"], with_zookeeper=True) + +node3 = cluster.add_instance('node3', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_queue.xml"], with_zookeeper=True) +node4 = cluster.add_instance('node4', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_queue.xml"], with_zookeeper=True) + +all_nodes = [node1, node2, node3, node4] @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() - for node in nodes: + for node in all_nodes: node.query("DROP TABLE IF EXISTS test_mutations") - for node in nodes: - node.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/test_mutations', '{instance}') ORDER BY x PARTITION BY toYYYYMM(d)") + for node in all_nodes: + node.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}') ORDER BY x PARTITION BY toYYYYMM(d)") yield cluster @@ -33,7 +37,8 @@ def started_cluster(): class Runner: - def __init__(self): + def __init__(self, nodes): + self.nodes = nodes self.mtx = threading.Lock() self.total_inserted_xs = 0 self.total_inserted_rows = 0 @@ -49,6 +54,8 @@ class Runner: self.stop_ev = threading.Event() + self.exceptions = [] + def do_insert(self, thread_num): self.stop_ev.wait(random.random()) @@ -76,7 +83,7 @@ class Runner: try: print 'thread {}: insert for {}: {}'.format(thread_num, date_str, ','.join(str(x) for x in xs)) - random.choice(nodes).query("INSERT INTO test_mutations FORMAT TSV", payload) + random.choice(self.nodes).query("INSERT INTO test_mutations FORMAT TSV", payload) with self.mtx: for x in xs: @@ -86,6 +93,7 @@ class Runner: except Exception, e: print 'Exception while inserting,', e + self.exceptions.append(e) finally: with self.mtx: for x in xs: @@ -113,7 +121,7 @@ class Runner: try: print 'thread {}: delete {} * {}'.format(thread_num, to_delete_count, x) - random.choice(nodes).query("ALTER TABLE test_mutations DELETE WHERE x = {}".format(x)) + random.choice(self.nodes).query("ALTER TABLE test_mutations DELETE WHERE x = {}".format(x)) with self.mtx: self.total_mutations += 1 @@ -130,10 +138,23 @@ class Runner: self.stop_ev.wait(1.0 + random.random() * 2) +def wait_for_mutations(nodes, number_of_mutations): + for i in range(100): # wait for replication 80 seconds max + time.sleep(0.8) + + def get_done_mutations(node): + return int(node.query("SELECT sum(is_done) FROM system.mutations WHERE table = 'test_mutations'").rstrip()) + + if all([get_done_mutations(n) == number_of_mutations for n in nodes]): + return True + return False + + def test_mutations(started_cluster): DURATION_SECONDS = 30 + nodes = [node1, node2] - runner = Runner() + runner = Runner(nodes) threads = [] for thread_num in range(5): @@ -155,16 +176,7 @@ def test_mutations(started_cluster): assert runner.total_inserted_rows > 0 assert runner.total_mutations > 0 - all_done = False - for i in range(100): # wait for replication 80 seconds max - time.sleep(0.8) - - def get_done_mutations(node): - return int(node.query("SELECT sum(is_done) FROM system.mutations WHERE table = 'test_mutations'").rstrip()) - - if all([get_done_mutations(n) == runner.total_mutations for n in nodes]): - all_done = True - break + all_done = wait_for_mutations(nodes, runner.total_mutations) print node1.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames") assert all_done @@ -174,3 +186,34 @@ def test_mutations(started_cluster): for i, node in enumerate(nodes): actual_sums.append(int(node.query("SELECT sum(x) FROM test_mutations").rstrip())) assert actual_sums[i] == expected_sum + + +def test_mutations_dont_prevent_merges(started_cluster): + nodes = [node3, node4] + for year in range(2000, 2008): + rows = '' + date_str = '{}-01-{}'.format(year, random.randint(1, 10)) + for i in range(10): + rows += '{} {} {}\n'.format(date_str, random.randint(1, 10), i) + node3.query("INSERT INTO test_mutations FORMAT TSV", rows) + + # will run mutations of 8 parts in parallel, mutations will sleep for about 20 seconds + node3.query("ALTER TABLE test_mutations UPDATE i = sleepEachRow(2) WHERE 1") + + runner = Runner(nodes) + threads = [] + for thread_num in range(10): + threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, ))) + + # will insert approx 4-5 new parts per 1 second into each partition + for t in threads: + t.start() + + all_done = wait_for_mutations(nodes, 1) + + runner.stop_ev.set() + for t in threads: + t.join() + + assert all_done + assert all([str(e).find("Too many parts") < 0 for e in runner.exceptions]) From acce56095fc78c7d4fbe6e80746c4241fe698d89 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 22 Aug 2019 22:35:46 +0300 Subject: [PATCH 2/3] limit number of background threads for mutations --- .../MergeTree/MergeTreeDataMergerMutator.cpp | 9 ++++++++- .../src/Storages/MergeTree/MergeTreeSettings.h | 1 + .../MergeTree/ReplicatedMergeTreeQueue.cpp | 18 +++++++++++------- dbms/src/Storages/StorageMergeTree.cpp | 7 ++----- .../Storages/StorageReplicatedMergeTree.cpp | 3 +-- 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 74193fa7156..5a9affaacd4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -157,7 +157,14 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_siz UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() { - return static_cast(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE); + size_t total_threads_in_pool = pool.getNumberOfThreads(); + size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); + + /// Allow mutations only if there are enough threads, leave free threads for merges else + if (total_threads_in_pool - busy_threads_in_pool >= data.settings.number_of_free_entries_in_pool_to_execute_mutation) + return static_cast(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE); + + return 0; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 36e82b96961..afd0772a937 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -33,6 +33,7 @@ struct MergeTreeSettings : public SettingsCollection M(SettingUInt64, max_replicated_merges_in_queue, 16, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.") \ M(SettingUInt64, max_replicated_mutations_in_queue, 8, "How many tasks of mutating parts are allowed simultaneously in ReplicatedMergeTree queue.") \ M(SettingUInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.") \ + M(SettingUInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"") \ M(SettingSeconds, old_parts_lifetime, 8 * 60, "How many seconds to keep obsolete parts.") \ M(SettingSeconds, temporary_directories_lifetime, 86400, "How many seconds to keep tmp_-directories.") \ \ diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index e6251502576..fd65f14fedb 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -956,15 +956,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( return false; } - /** Execute merge only if there are enough free threads in background pool to do merges of that size. - * But if all threads are free (maximal size of merge is allowed) then execute any merge, - * (because it may be ordered by OPTIMIZE or early with differrent settings). + UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge() + : merger_mutator.getMaxSourcePartSizeForMutation(); + /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed), + * then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size, + * because it may be ordered by OPTIMIZE or early with different settings. + * Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges, + * because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL). */ - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); - if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool - && sum_parts_size_in_bytes > max_source_parts_size) + bool ignore_max_size = (entry.type == LogEntry::MERGE_PARTS) && (max_source_parts_size == data.settings.max_bytes_to_merge_at_max_space_in_pool); + + if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size) { - String reason = "Not executing log entry for part " + entry.new_part_name + String reason = "Not executing log entry " + entry.typeToString() + " for part " + entry.new_part_name + " because source parts size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes) + ") is greater than the current maximum (" + formatReadableSizeWithBinarySuffix(max_source_parts_size) + ")."; LOG_DEBUG(log, reason); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index d062bb197ca..779efe95a8c 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -624,8 +624,6 @@ bool StorageMergeTree::tryMutatePart() /// You must call destructor with unlocked `currently_merging_mutex`. std::optional tagger; { - auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); - std::lock_guard lock(currently_merging_mutex); if (current_mutations_by_version.empty()) @@ -641,8 +639,7 @@ bool StorageMergeTree::tryMutatePart() if (mutations_begin_it == mutations_end_it) continue; - auto estimated_needed_space = MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}); - if (estimated_needed_space > disk_space) + if (merger_mutator.getMaxSourcePartSizeForMutation() < part->bytes_on_disk) continue; for (auto it = mutations_begin_it; it != mutations_end_it; ++it) @@ -655,7 +652,7 @@ bool StorageMergeTree::tryMutatePart() future_part.part_info = new_part_info; future_part.name = part->getNewName(new_part_info); - tagger.emplace(future_part, estimated_needed_space, *this); + tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this); break; } } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 028235d9eef..7a946400658 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2208,14 +2208,13 @@ void StorageReplicatedMergeTree::mergeSelectingTask() UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation(); FutureMergedMutatedPart future_merged_part; - - /// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts if (max_source_parts_size_for_merge > 0 && merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred)) { success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, force_ttl); } + /// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0 && merges_and_mutations_queued.second < settings.max_replicated_mutations_in_queue) { From 9c054419323fb8db91b586f6c19208507c7452c5 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 22 Aug 2019 22:54:42 +0300 Subject: [PATCH 3/3] better test --- .../configs/merge_tree_max_parts.xml | 6 +++ .../configs/merge_tree_queue.xml | 7 --- .../test_replicated_mutations/test.py | 50 ++++++++++++------- 3 files changed, 39 insertions(+), 24 deletions(-) create mode 100644 dbms/tests/integration/test_replicated_mutations/configs/merge_tree_max_parts.xml delete mode 100644 dbms/tests/integration/test_replicated_mutations/configs/merge_tree_queue.xml diff --git a/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_max_parts.xml b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_max_parts.xml new file mode 100644 index 00000000000..60047dcab2c --- /dev/null +++ b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_max_parts.xml @@ -0,0 +1,6 @@ + + + 50 + 50 + + \ No newline at end of file diff --git a/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_queue.xml b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_queue.xml deleted file mode 100644 index ccc63f2eaec..00000000000 --- a/dbms/tests/integration/test_replicated_mutations/configs/merge_tree_queue.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - 50 - 8 - 4 - - \ No newline at end of file diff --git a/dbms/tests/integration/test_replicated_mutations/test.py b/dbms/tests/integration/test_replicated_mutations/test.py index dd42a70e280..0347ba4782c 100644 --- a/dbms/tests/integration/test_replicated_mutations/test.py +++ b/dbms/tests/integration/test_replicated_mutations/test.py @@ -14,10 +14,12 @@ node1 = cluster.add_instance('node1', macros={'cluster': 'test1'}, with_zookeepe # Check, that limits on max part size for merges doesn`t affect mutations node2 = cluster.add_instance('node2', macros={'cluster': 'test1'}, main_configs=["configs/merge_tree.xml"], with_zookeeper=True) -node3 = cluster.add_instance('node3', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_queue.xml"], with_zookeeper=True) -node4 = cluster.add_instance('node4', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_queue.xml"], with_zookeeper=True) +node3 = cluster.add_instance('node3', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_max_parts.xml"], with_zookeeper=True) +node4 = cluster.add_instance('node4', macros={'cluster': 'test2'}, main_configs=["configs/merge_tree_max_parts.xml"], with_zookeeper=True) -all_nodes = [node1, node2, node3, node4] +node5 = cluster.add_instance('node5', macros={'cluster': 'test3'}, main_configs=["configs/merge_tree_max_parts.xml"]) + +all_nodes = [node1, node2, node3, node4, node5] @pytest.fixture(scope="module") def started_cluster(): @@ -27,9 +29,11 @@ def started_cluster(): for node in all_nodes: node.query("DROP TABLE IF EXISTS test_mutations") - for node in all_nodes: + for node in [node1, node2, node3, node4]: node.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}') ORDER BY x PARTITION BY toYYYYMM(d)") + node5.query("CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE MergeTree() ORDER BY x PARTITION BY toYYYYMM(d)") + yield cluster finally: @@ -56,7 +60,7 @@ class Runner: self.exceptions = [] - def do_insert(self, thread_num): + def do_insert(self, thread_num, partitions_num): self.stop_ev.wait(random.random()) # Each thread inserts a small random number of rows with random year, month 01 and day determined @@ -74,7 +78,7 @@ class Runner: for x in xs: self.currently_inserting_xs[x] += 1 - year = 2000 + random.randint(0, 10) + year = 2000 + random.randint(0, partitions_num) date_str = '{year}-{month}-{day}'.format(year=year, month=month, day=day) payload = '' for x in xs: @@ -158,7 +162,7 @@ def test_mutations(started_cluster): threads = [] for thread_num in range(5): - threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, ))) + threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, 10))) for thread_num in (11, 12, 13): threads.append(threading.Thread(target=runner.do_delete, args=(thread_num,))) @@ -178,7 +182,9 @@ def test_mutations(started_cluster): all_done = wait_for_mutations(nodes, runner.total_mutations) - print node1.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames") + print "Total mutations: ", runner.total_mutations + for node in nodes: + print node.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames") assert all_done expected_sum = runner.total_inserted_xs - runner.total_deleted_xs @@ -188,24 +194,30 @@ def test_mutations(started_cluster): assert actual_sums[i] == expected_sum -def test_mutations_dont_prevent_merges(started_cluster): - nodes = [node3, node4] - for year in range(2000, 2008): +@pytest.mark.parametrize( + ('nodes', ), + [ + ([node5, ], ), # MergeTree + ([node3, node4], ), # ReplicatedMergeTree + ] +) +def test_mutations_dont_prevent_merges(started_cluster, nodes): + for year in range(2000, 2016): rows = '' date_str = '{}-01-{}'.format(year, random.randint(1, 10)) for i in range(10): rows += '{} {} {}\n'.format(date_str, random.randint(1, 10), i) - node3.query("INSERT INTO test_mutations FORMAT TSV", rows) + nodes[0].query("INSERT INTO test_mutations FORMAT TSV", rows) - # will run mutations of 8 parts in parallel, mutations will sleep for about 20 seconds - node3.query("ALTER TABLE test_mutations UPDATE i = sleepEachRow(2) WHERE 1") + # will run mutations of 16 parts in parallel, mutations will sleep for about 20 seconds + nodes[0].query("ALTER TABLE test_mutations UPDATE i = sleepEachRow(2) WHERE 1") runner = Runner(nodes) threads = [] - for thread_num in range(10): - threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, ))) + for thread_num in range(2): + threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, 0))) - # will insert approx 4-5 new parts per 1 second into each partition + # will insert approx 8-10 new parts per 1 second into one partition for t in threads: t.start() @@ -215,5 +227,9 @@ def test_mutations_dont_prevent_merges(started_cluster): for t in threads: t.join() + for node in nodes: + print node.query("SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames") + print node.query("SELECT partition, count(name), sum(active), sum(active*rows) FROM system.parts WHERE table ='test_mutations' GROUP BY partition FORMAT TSVWithNames") + assert all_done assert all([str(e).find("Too many parts") < 0 for e in runner.exceptions])