This commit is contained in:
Alexey Milovidov 2015-09-20 09:31:19 +03:00
parent 227b41b0eb
commit b14bbf3928

View File

@ -800,12 +800,41 @@ void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_ev
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry) bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{ {
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART) /// queue_mutex уже захвачен. Функция вызывается только из queueTask.
&& future_parts.count(entry.new_part_name))
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
{
/// Проверим, не создаётся ли сейчас этот же кусок другим действием.
if (future_parts.count(entry.new_part_name))
{ {
LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name
<< " because another log entry for the same part is being processed. This shouldn't happen often."); << " because another log entry for the same part is being processed. This shouldn't happen often.");
return false; return false;
/** Когда соответствующее действие завершится, то shouldExecuteLogEntry, в следующий раз, пройдёт успешно,
* и элемент очереди будет обработан. Сразу же в функции executeLogEntry будет выяснено, что кусок у нас уже есть,
* и элемент очереди будет сразу считаться обработанным.
*/
}
/// Более сложная проверка - не создаётся ли сейчас другим действием кусок, который покроет этот кусок.
/// NOTE То, что выше - избыточно, но оставлено ради более удобного сообщения в логе.
ActiveDataPartSet::Part result_part;
ActiveDataPartSet::parsePartName(entry.new_part_name, result_part);
/// Оно может тормозить при большом размере future_parts. Но он не может быть большим, так как ограничен BackgroundProcessingPool.
for (const auto & future_part_name : future_parts)
{
ActiveDataPartSet::Part future_part;
ActiveDataPartSet::parsePartName(future_part_name, future_part);
if (future_part.contains(result_part))
{
LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name
<< " because another log entry for covering part " << future_part_name << " is being processed.");
return false;
}
}
} }
if (entry.type == LogEntry::MERGE_PARTS) if (entry.type == LogEntry::MERGE_PARTS)
@ -819,7 +848,8 @@ bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{ {
if (future_parts.count(name)) if (future_parts.count(name))
{ {
LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " because part " << name << " is not ready yet."); LOG_TRACE(log, "Not merging into part " << entry.new_part_name
<< " because part " << name << " is not ready yet (log entry for that part is being processed).");
return false; return false;
} }
} }