From 350a0fe1297f262ff002068fd20cb3ff62317c3b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 28 Mar 2019 22:58:41 +0300 Subject: [PATCH 1/2] Miscellaneous --- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 40 +++++++++---------- dbms/src/Storages/MergeTree/MergeTreeData.h | 7 ++-- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 139f8276a24..a41ff8d8c8a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -611,7 +611,7 @@ String MergeTreeData::MergingParams::getModeName() const Int64 MergeTreeData::getMaxBlockNumber() { - std::lock_guard lock_all(data_parts_mutex); + auto lock = lockParts(); Int64 max_block_num = 0; for (const DataPartPtr & part : data_parts_by_info) @@ -640,7 +640,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) DataPartsVector broken_parts_to_detach; size_t suspicious_broken_parts = 0; - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); data_parts_indexes.clear(); for (const String & file_name : part_file_names) @@ -866,7 +866,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts() std::vector parts_to_delete; { - std::lock_guard lock_parts(data_parts_mutex); + auto parts_lock = lockParts(); auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated); for (auto it = outdated_parts_range.begin(); it != outdated_parts_range.end(); ++it) @@ -900,7 +900,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts() void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector & parts) { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); for (auto & part : parts) { /// We should modify it under data_parts_mutex @@ -912,7 +912,7 @@ void MergeTreeData::rollbackDeletingParts(const MergeTreeData::DataPartsVector & void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & parts) { { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); /// TODO: use data_parts iterators instead of pointers for (auto & part : parts) @@ -980,7 +980,7 @@ void MergeTreeData::dropAllData() { LOG_TRACE(log, "dropAllData: waiting for locks."); - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); LOG_TRACE(log, "dropAllData: removing data from memory."); @@ -1717,7 +1717,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace( DataPartsVector covered_parts; { - std::unique_lock lock(data_parts_mutex); + auto lock = lockParts(); renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts); } return covered_parts; @@ -1814,7 +1814,7 @@ restore_covered) { LOG_INFO(log, "Renaming " << part_to_detach->relative_path << " to " << prefix << part_to_detach->name << " and forgiving it."); - auto data_parts_lock = lockParts(); + auto lock = lockParts(); auto it_part = data_parts_by_info.find(part_to_detach->info); if (it_part == data_parts_by_info.end()) @@ -1931,7 +1931,7 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part) { DataPartPtr part_to_delete; { - std::lock_guard lock_parts(data_parts_mutex); + auto lock = lockParts(); LOG_TRACE(log, "Trying to immediately remove part " << part->getNameWithState()); @@ -1967,7 +1967,7 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const { size_t res = 0; { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); for (auto & part : getDataPartsStateRange(DataPartState::Committed)) res += part->bytes_on_disk; @@ -1979,7 +1979,7 @@ size_t MergeTreeData::getTotalActiveSizeInBytes() const size_t MergeTreeData::getMaxPartsCountForPartition() const { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); size_t res = 0; size_t cur_count = 0; @@ -2006,7 +2006,7 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const std::optional MergeTreeData::getMinPartDataVersion() const { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); std::optional result; for (const DataPartPtr & part : getDataPartsStateRange(DataPartState::Committed)) @@ -2088,8 +2088,8 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info) { - DataPartsLock data_parts_lock(data_parts_mutex); - return getActiveContainingPart(part_info, DataPartState::Committed, data_parts_lock); + auto lock = lockParts(); + return getActiveContainingPart(part_info, DataPartState::Committed, lock); } MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) @@ -2103,7 +2103,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg { DataPartStateAndPartitionID state_with_partition{state, partition_id}; - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); return DataPartsVector( data_parts_by_state_and_info.lower_bound(state_with_partition), data_parts_by_state_and_info.upper_bound(state_with_partition)); @@ -2112,7 +2112,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const MergeTreePartInfo & part_info, const MergeTreeData::DataPartStates & valid_states) { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); auto it = data_parts_by_info.find(part_info); if (it == data_parts_by_info.end()) @@ -2331,7 +2331,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context String partition_id = partition.getID(*this); { - DataPartsLock data_parts_lock(data_parts_mutex); + auto data_parts_lock = lockParts(); DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock); if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value) { @@ -2352,7 +2352,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(const DataPartS DataPartsVector res; DataPartsVector buf; { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); for (auto state : affordable_states) { @@ -2378,7 +2378,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat { DataPartsVector res; { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); res.assign(data_parts_by_info.begin(), data_parts_by_info.end()); if (out_states != nullptr) @@ -2396,7 +2396,7 @@ MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affo { DataParts res; { - std::lock_guard lock(data_parts_mutex); + auto lock = lockParts(); for (auto state : affordable_states) { auto range = getDataPartsStateRange(state); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index ed2707c32d4..055a4d63b15 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -538,8 +538,7 @@ public: size_t getColumnCompressedSize(const std::string & name) const { - std::lock_guard lock{data_parts_mutex}; - + auto lock = lockParts(); const auto it = column_sizes.find(name); return it == std::end(column_sizes) ? 0 : it->second.data_compressed; } @@ -547,14 +546,14 @@ public: using ColumnSizeByName = std::unordered_map; ColumnSizeByName getColumnSizes() const { - std::lock_guard lock{data_parts_mutex}; + auto lock = lockParts(); return column_sizes; } /// Calculates column sizes in compressed form for the current state of data_parts. void recalculateColumnSizes() { - std::lock_guard lock{data_parts_mutex}; + auto lock = lockParts(); calculateColumnSizesImpl(); } From 3fb9814e3ef0ed4c03322c5b67fe04655b17f76b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 28 Mar 2019 22:59:07 +0300 Subject: [PATCH 2/2] Fixed race condition in fetchPart --- dbms/src/Storages/StorageReplicatedMergeTree.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index c51070c4b44..fd1859ff6e4 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2655,7 +2655,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin if (auto part = data.getPartIfExists(part_info, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting})) { - LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch"); + LOG_DEBUG(log, "Part " << part->name << " should be deleted after previous attempt before fetch"); /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. cleanup_thread.wakeup(); return false;