Merge pull request #33753 from azat/mt-fix-count-race

Fix tiny race between count() and INSERT/merges/... in MergeTree
This commit is contained in:
alexey-milovidov 2022-01-20 06:33:54 +03:00 committed by GitHub
commit 994584df09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 19 deletions

View File

@ -2569,11 +2569,13 @@ bool MergeTreeData::renameTempPartAndReplace(
++reduce_parts;
}
decreaseDataVolume(reduce_bytes, reduce_rows, reduce_parts);
modifyPartState(part_it, DataPartState::Active);
addPartContributionToColumnAndSecondaryIndexSizes(part);
addPartContributionToDataVolume(part);
ssize_t diff_bytes = part->getBytesOnDisk() - reduce_bytes;
ssize_t diff_rows = part->rows_count - reduce_rows;
ssize_t diff_parts = 1 - reduce_parts;
increaseDataVolume(diff_bytes, diff_rows, diff_parts);
}
auto part_in_memory = asInMemoryPart(part);
@ -3102,8 +3104,9 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
auto part_it = data_parts_indexes.insert(part_copy).first;
modifyPartState(part_it, DataPartState::Active);
removePartContributionToDataVolume(original_active_part);
addPartContributionToDataVolume(part_copy);
ssize_t diff_bytes = part_copy->getBytesOnDisk() - original_active_part->getBytesOnDisk();
ssize_t diff_rows = part_copy->rows_count - original_active_part->rows_count;
increaseDataVolume(diff_bytes, diff_rows, /* parts= */ 0);
auto disk = original_active_part->volume->getDisk();
String marker_path = fs::path(original_active_part->getFullRelativePath()) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
@ -4304,8 +4307,11 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
data.addPartContributionToColumnAndSecondaryIndexSizes(part);
}
}
data.decreaseDataVolume(reduce_bytes, reduce_rows, reduce_parts);
data.increaseDataVolume(add_bytes, add_rows, add_parts);
ssize_t diff_bytes = add_bytes - reduce_bytes;
ssize_t diff_rows = add_rows - reduce_rows;
ssize_t diff_parts = add_parts - reduce_parts;
data.increaseDataVolume(diff_bytes, diff_rows, diff_parts);
}
clear();
@ -5690,23 +5696,16 @@ void MergeTreeData::addPartContributionToDataVolume(const DataPartPtr & part)
void MergeTreeData::removePartContributionToDataVolume(const DataPartPtr & part)
{
decreaseDataVolume(part->getBytesOnDisk(), part->rows_count, 1);
increaseDataVolume(-part->getBytesOnDisk(), -part->rows_count, -1);
}
void MergeTreeData::increaseDataVolume(size_t bytes, size_t rows, size_t parts)
void MergeTreeData::increaseDataVolume(ssize_t bytes, ssize_t rows, ssize_t parts)
{
total_active_size_bytes.fetch_add(bytes, std::memory_order_acq_rel);
total_active_size_rows.fetch_add(rows, std::memory_order_acq_rel);
total_active_size_parts.fetch_add(parts, std::memory_order_acq_rel);
}
void MergeTreeData::decreaseDataVolume(size_t bytes, size_t rows, size_t parts)
{
total_active_size_bytes.fetch_sub(bytes, std::memory_order_acq_rel);
total_active_size_rows.fetch_sub(rows, std::memory_order_acq_rel);
total_active_size_parts.fetch_sub(parts, std::memory_order_acq_rel);
}
void MergeTreeData::setDataVolume(size_t bytes, size_t rows, size_t parts)
{
total_active_size_bytes.store(bytes, std::memory_order_release);

View File

@ -1150,9 +1150,7 @@ private:
void addPartContributionToDataVolume(const DataPartPtr & part);
void removePartContributionToDataVolume(const DataPartPtr & part);
void increaseDataVolume(size_t bytes, size_t rows, size_t parts);
void decreaseDataVolume(size_t bytes, size_t rows, size_t parts);
void increaseDataVolume(ssize_t bytes, ssize_t rows, ssize_t parts);
void setDataVolume(size_t bytes, size_t rows, size_t parts);
std::atomic<size_t> total_active_size_bytes = 0;