MergeSelector: development [#METR-21841].

This commit is contained in:
Alexey Milovidov 2016-10-30 14:05:45 +03:00
parent 13f9d5cac4
commit bee00bfcfc
3 changed files with 26 additions and 13 deletions

View File

@ -86,7 +86,7 @@ private:
/** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое.
* Вызывается под queue_mutex.
*/
bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger);
bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger, MergeTreeData & data);
/// После удаления элемента очереди, обновить времена insert-ов в оперативке. Выполняется под queue_mutex.
/// Возвращает информацию, какие времена изменились - эту информацию можно передать в updateTimesInZooKeeper.
@ -149,7 +149,7 @@ public:
* merger используется только чтобы проверить, не приостановлены ли мерджи.
*/
using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::unique_ptr<CurrentlyExecuting>>;
SelectedEntry selectEntryToProcess(MergeTreeDataMerger & merger);
SelectedEntry selectEntryToProcess(MergeTreeDataMerger & merger, MergeTreeData & data);
/** Выполнить функцию func для обработки действия.
* При этом, на время выполнения, отметить элемент очереди как выполняющийся

View File

@ -448,7 +448,11 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
}
bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger)
bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
const LogEntry & entry,
String & out_postpone_reason,
MergeTreeDataMerger & merger,
MergeTreeData & data)
{
/// mutex уже захвачен. Функция вызывается только из selectEntryToProcess.
@ -498,6 +502,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, Str
* Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
* Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
*/
size_t sum_parts_size_in_bytes = 0;
for (const auto & name : entry.parts_to_merge)
{
if (future_parts.count(name))
@ -508,6 +513,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, Str
out_postpone_reason = reason;
return false;
}
auto part = data.getPartIfExists(name);
if (part)
sum_parts_size_in_bytes += part->size_in_bytes;
}
if (merger.isCancelled())
@ -517,6 +526,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(const LogEntry & entry, Str
out_postpone_reason = reason;
return false;
}
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge();
if (sum_parts_size_in_bytes > max_parts_size_for_merge)
{
String reason = "Not executing log entry for part " + entry.new_part_name
+ " because its size (" + formatReadableSizeWithBinarySuffix(sum_parts_size_in_bytes)
+ ") is greater than current maximum (" + formatReadableSizeWithBinarySuffix(max_parts_size_for_merge) + ").";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
return false;
}
}
return true;
@ -546,7 +566,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
}
ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMerger & merger)
ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMerger & merger, MergeTreeData & data)
{
std::lock_guard<std::mutex> lock(mutex);
@ -557,7 +577,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP
if ((*it)->currently_executing)
continue;
if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger))
if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger, data))
{
entry = *it;
queue.splice(queue.end(), queue, it);

View File

@ -1137,13 +1137,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
if (!do_fetch)
{
size_t sum_parts_size_in_bytes = 0;
for (const MergeTreeData::DataPartPtr & part : parts)
sum_parts_size_in_bytes += part->size_in_bytes;
if (sum_parts_size_in_bytes > merger.getMaxPartsSizeForMerge())
return false;
size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts);
/// Может бросить исключение.
@ -1537,7 +1530,7 @@ bool StorageReplicatedMergeTree::queueTask()
try
{
selected = queue.selectEntryToProcess(merger);
selected = queue.selectEntryToProcess(merger, data);
}
catch (...)
{