diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 2777d4b9849..c9fc5b39fae 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -27,7 +27,7 @@ void MergeTreeBlockOutputStream::write(const Block & block) PartLog::addNewPart(storage.context, part, watch.elapsed()); /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. - storage.merge_task_handle->wake(); + storage.background_task_handle->wake(); } } diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 27a1ea0ed23..44c330ed476 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -78,7 +78,7 @@ StorageMergeTree::StorageMergeTree( void StorageMergeTree::startup() { - merge_task_handle = background_pool.addTask([this] { return mergeTask(); }); + background_task_handle = background_pool.addTask([this] { return backgroundTask(); }); data.clearOldPartsFromFilesystem(); @@ -95,8 +95,8 @@ void StorageMergeTree::shutdown() return; shutdown_called = true; merger_mutator.actions_blocker.cancelForever(); - if (merge_task_handle) - background_pool.removeTask(merge_task_handle); + if (background_task_handle) + background_pool.removeTask(background_task_handle); } @@ -285,89 +285,22 @@ struct CurrentlyMergingPartsTagger }; -void StorageMergeTree::mutate(const MutationCommands & commands, const Context & context) +void StorageMergeTree::mutate(const MutationCommands & commands, const Context &) { - Int64 version; - decltype(current_mutations_by_version.end()) mutation_it; { std::lock_guard lock(currently_merging_mutex); - version = increment.get(); + Int64 version = increment.get(); MergeTreeMutationEntry entry; entry.create_time = time(nullptr); entry.block_number = version; entry.commands = commands; - mutation_it = current_mutations_by_version.emplace(version, std::move(entry)); + current_mutations_by_version.emplace(version, std::move(entry)); } - size_t parts_mutated = 0; - while (true) - { - std::optional tagger; - MergeTreeDataMergerMutator::FuturePart future_mutated_part; - bool some_locked = false; - { - std::lock_guard lock(currently_merging_mutex); - - Int64 prev_version = 0; - if (mutation_it != current_mutations_by_version.begin()) - prev_version = std::prev(mutation_it)->first; - - for (const auto & part : data.getDataPartsVector()) - { - Int64 part_mutation_version = getCurrentMutationVersion(part, lock); - - if (part_mutation_version >= version) - continue; - - if (part_mutation_version < prev_version) - { - LOG_TRACE(log, - "Part " << part->name << " has mutation version " << part_mutation_version - << ", will wait until it has version " << prev_version); - some_locked = true; - continue; - } - - if (currently_merging.count(part)) - { - LOG_TRACE(log, "Part " << part->name << " is currently locked, will wait."); - some_locked = true; - continue; - } - - auto new_part_info = part->info; - new_part_info.mutation = version; - - future_mutated_part.parts.push_back(part); - future_mutated_part.part_info = new_part_info; - future_mutated_part.name = part->getNewName(new_part_info); - tagger.emplace({part}, part->bytes_on_disk, *this); - break; - } - } - - if (!future_mutated_part.parts.empty()) - { - auto new_part = merger_mutator.mutatePartToTemporaryPart( - future_mutated_part, mutation_it->second.commands, context); - data.renameTempPartAndReplace(new_part); - ++parts_mutated; - } - else if (some_locked) - sleep(1); - else - break; - } - - { - std::lock_guard lock(currently_merging_mutex); - current_mutations_by_version.erase(mutation_it); - } - - LOG_TRACE(log, "Finished, mutated " << parts_mutated << " parts."); + background_task_handle->wake(); } @@ -420,17 +353,8 @@ bool StorageMergeTree::merge( bool deduplicate, String * out_disable_reason) { - /// Clear old parts. It does not matter to do it more frequently than each second. - if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1)) - { - data.clearOldPartsFromFilesystem(); - data.clearOldTemporaryDirectories(); - } - auto structure_lock = lockStructure(true, __PRETTY_FUNCTION__); - size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); - MergeTreeDataMergerMutator::FuturePart future_part; /// You must call destructor with unlocked `currently_merging_mutex`. @@ -455,6 +379,7 @@ bool StorageMergeTree::merge( } else { + size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); } @@ -531,7 +456,114 @@ bool StorageMergeTree::merge( } -bool StorageMergeTree::mergeTask() +bool StorageMergeTree::tryMutatePart() +{ + auto structure_lock = lockStructure(true, __PRETTY_FUNCTION__); + + MergeTreeDataMergerMutator::FuturePart future_part; + MutationCommands commands; + /// You must call destructor with unlocked `currently_merging_mutex`. + std::optional tagger; + { + auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); + + std::lock_guard lock(currently_merging_mutex); + + if (current_mutations_by_version.empty()) + return false; + + auto mutations_end_it = current_mutations_by_version.end(); + for (const auto & part : data.getDataPartsVector()) + { + if (currently_merging.count(part)) + continue; + + auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); + if (mutations_begin_it == mutations_end_it) + continue; + + auto estimated_needed_space = MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}); + if (estimated_needed_space > disk_space) + continue; + + for (auto it = mutations_begin_it; it != mutations_end_it; ++it) + commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end()); + + auto new_part_info = part->info; + new_part_info.mutation = current_mutations_by_version.rbegin()->first; + + future_part.parts.push_back(part); + future_part.part_info = new_part_info; + future_part.name = part->getNewName(new_part_info); + + tagger.emplace({part}, estimated_needed_space, *this); + break; + } + } + + if (!tagger) + return false; + + Stopwatch stopwatch; + MergeTreeData::MutableDataPartPtr new_part; + + auto write_part_log = [&] (const ExecutionStatus & execution_status) + { + try + { + auto part_log = context.getPartLog(database_name); + if (!part_log) + return; + + PartLogElement part_log_elem; + + part_log_elem.event_type = PartLogElement::MUTATE_PART; + + part_log_elem.error = static_cast(execution_status.code); + part_log_elem.exception = execution_status.message; + + part_log_elem.event_time = time(nullptr); + part_log_elem.duration_ms = stopwatch.elapsed() / 1000000; + + part_log_elem.database_name = database_name; + part_log_elem.table_name = table_name; + part_log_elem.part_name = future_part.name; + + if (new_part) + { + part_log_elem.bytes_compressed_on_disk = new_part->bytes_on_disk; + part_log_elem.rows = new_part->rows_count; + } + + part_log_elem.source_part_names.reserve(future_part.parts.size()); + for (const auto & source_part : future_part.parts) + part_log_elem.source_part_names.push_back(source_part->name); + + part_log->add(part_log_elem); + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + }; + + try + { + new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, context); + data.renameTempPartAndReplace(new_part); + write_part_log({}); + } + catch (...) + { + write_part_log(ExecutionStatus::fromCurrentException()); + throw; + } + + return true; +} + + +bool StorageMergeTree::backgroundTask() { if (shutdown_called) return false; @@ -541,8 +573,19 @@ bool StorageMergeTree::mergeTask() try { + /// Clear old parts. It is unnecessary to do it more than once a second. + if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1)) + { + data.clearOldPartsFromFilesystem(); + data.clearOldTemporaryDirectories(); + } + size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; - return merge(aio_threshold, false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/); ///TODO: read deduplicate option from table config + ///TODO: read deduplicate option from table config + if (merge(aio_threshold, false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/)) + return true; + + return tryMutatePart(); } catch (Exception & e) { diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index bcb53b5dd94..72d0557537a 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -120,7 +120,7 @@ private: std::atomic shutdown_called {false}; - BackgroundProcessingPool::TaskHandle merge_task_handle; + BackgroundProcessingPool::TaskHandle background_task_handle; /** Determines what parts should be merged and merges it. * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query). @@ -129,7 +129,10 @@ private: bool merge(size_t aio_threshold, bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr); - bool mergeTask(); + /// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true. + bool tryMutatePart(); + + bool backgroundTask(); Int64 getCurrentMutationVersion( const MergeTreeData::DataPartPtr & part, diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index d9668050c2e..c8b8b6d9706 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1155,7 +1155,6 @@ void StorageReplicatedMergeTree::writePartLog( if (merge_entry) { - part_log_elem.rows_read = (*merge_entry)->bytes_read_uncompressed; part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed;