This commit is contained in:
Michael Kolupaev 2014-07-04 15:18:04 +04:00
parent 3925f6dcf9
commit 1425813cdf
2 changed files with 8 additions and 1 deletions

View File

@ -110,7 +110,7 @@ private:
CurrentlyMergingPartsTagger(const MergeTreeData::DataPartsVector & parts_, size_t total_size, StorageMergeTree & storage_) CurrentlyMergingPartsTagger(const MergeTreeData::DataPartsVector & parts_, size_t total_size, StorageMergeTree & storage_)
: parts(parts_), storage(storage_) : parts(parts_), storage(storage_)
{ {
/// Здесь не лочится мьютекс, так как конструктор вызывается внутри mergeThread, где он уже залочен. /// Здесь не лочится мьютекс, так как конструктор вызывается внутри mergeTask, где он уже залочен.
reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// Может бросить исключение. reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// Может бросить исключение.
for (const auto & part : parts) for (const auto & part : parts)
{ {

View File

@ -706,6 +706,13 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry) bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{ {
if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART) &&future_parts.count(entry.new_part_name))
{
LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name <<
" because another log entry for the same part is being processed. This shouldn't happen often.");
return false;
}
if (entry.type == LogEntry::MERGE_PARTS) if (entry.type == LogEntry::MERGE_PARTS)
{ {
/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции. /** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.