diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h index dc35af33a32..c7d338a57ce 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -86,7 +86,7 @@ private: /** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое. * Вызывается под queue_mutex. */ - bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger); + bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger, MergeTreeData & data); /// После удаления элемента очереди, обновить времена insert-ов в оперативке. Выполняется под queue_mutex. /// Возвращает информацию, какие времена изменились - эту информацию можно передать в updateTimesInZooKeeper. @@ -149,7 +149,7 @@ public: * merger используется только чтобы проверить, не приостановлены ли мерджи. */ using SelectedEntry = std::pair>; - SelectedEntry selectEntryToProcess(MergeTreeDataMerger & merger); + SelectedEntry selectEntryToProcess(MergeTreeDataMerger & merger, MergeTreeData & data); /** Выполнить функцию func для обработки действия. * При этом, на время выполнения, отметить элемент очереди как выполняющийся diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index f435116261f..6ef3b5aa6ae 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -448,7 +448,11 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z } -bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger) +bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( + const LogEntry & entry, + String & out_postpone_reason, + MergeTreeDataMerger & merger, + MergeTreeData & data) { /// mutex уже захвачен. Функция вызывается только из selectEntryToProcess. @@ -498,6 +502,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, Str * Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок. * Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди. */ + size_t sum_parts_size_in_bytes = 0; for (const auto & name : entry.parts_to_merge) { if (future_parts.count(name)) @@ -508,6 +513,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, Str out_postpone_reason = reason; return false; } + + auto part = data.getPartIfExists(name); + if (part) + sum_parts_size_in_bytes += part->size_in_bytes; } if (merger.isCancelled()) @@ -517,6 +526,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, Str out_postpone_reason = reason; return false; } + + size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(); + if (sum_parts_size_in_bytes > max_parts_size_for_merge) + { + String reason = "Not executing log entry for part " + entry.new_part_name + + " because its size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes) + + ") is greater than current maximum (" + formatReadableSizeWithBinarySuffix(max_parts_size_for_merge) + ")."; + LOG_DEBUG(log, reason); + out_postpone_reason = reason; + return false; + } } return true; @@ -546,7 +566,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting() } -ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMerger & merger) +ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMerger & merger, MergeTreeData & data) { std::lock_guard lock(mutex); @@ -557,7 +577,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP if ((*it)->currently_executing) continue; - if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger)) + if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger, data)) { entry = *it; queue.splice(queue.end(), queue, it); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index cdce86f185b..d067cfa7709 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1137,13 +1137,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) if (!do_fetch) { - size_t sum_parts_size_in_bytes = 0; - for (const MergeTreeData::DataPartPtr & part : parts) - sum_parts_size_in_bytes += part->size_in_bytes; - - if (sum_parts_size_in_bytes > merger.getMaxPartsSizeForMerge()) - return false; - size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts); /// Может бросить исключение. @@ -1537,7 +1530,7 @@ bool StorageReplicatedMergeTree::queueTask() try { - selected = queue.selectEntryToProcess(merger); + selected = queue.selectEntryToProcess(merger, data); } catch (...) {