From 95fdb0a877c4b9d9308ceea6c264e36a1c616a92 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 14 Oct 2022 14:52:57 +0000 Subject: [PATCH] remove outdated parts asynchronously --- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 1 - src/Storages/MergeTree/MergeTreeData.cpp | 700 +++++++++--------- src/Storages/MergeTree/MergeTreeData.h | 93 ++- .../ReplicatedMergeTreeRestartingThread.cpp | 3 + src/Storages/StorageMergeTree.cpp | 3 +- .../configs/fast_background_pool.xml | 9 + .../test_merge_tree_load_parts/test.py | 73 +- 7 files changed, 497 insertions(+), 385 deletions(-) create mode 100644 tests/integration/test_merge_tree_load_parts/configs/fast_background_pool.xml diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 46323f12305..d8dda924b15 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1460,7 +1460,6 @@ void IMergeTreeDataPart::remove() const return CanRemoveDescription{.can_remove_anything = can_remove, .files_not_to_remove = files_not_to_remove }; }; - if (!isStoredOnDisk()) return; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 8c787e7c45a..1916db740be 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -992,6 +992,22 @@ void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const S } } +template +void MergeTreeData::PartLoadingTree::traverse(bool recursive, Func && func) +{ + std::function traverse_impl = [&](const auto & node) + { + func(node); + if (recursive) + for (const auto & [_, child] : node->children) + traverse_impl(child); + }; + + for (const auto & elem : root_by_partition) + for (const auto & [_, node] : elem.second->children) + traverse_impl(node); +} + MergeTreeData::PartLoadingTree MergeTreeData::PartLoadingTree::build(std::vector nodes) { @@ -1008,13 +1024,215 @@ MergeTreeData::PartLoadingTree::build(std::vector nodes) return tree; } -void MergeTreeData::loadDataPartsFromDisk( - DataPartsVector & broken_parts_to_detach, - DataPartsVector & duplicate_parts_to_remove, +static std::optional calculatePartSizeSafe( + const MergeTreeData::DataPartPtr & part, Poco::Logger * log) +{ + try + { + return part->data_part_storage->calculateTotalSizeOnDisk(); + } + catch (...) + { + tryLogCurrentException(log, fmt::format("while calculating part size {} on path {}", + part->name, part->data_part_storage->getRelativePath())); + return {}; + } +} + +static void preparePartForRemoval(const MergeTreeMutableDataPartPtr & part) +{ + part->remove_time.store(part->modification_time, std::memory_order_relaxed); + auto creation_csn = part->version.creation_csn.load(std::memory_order_relaxed); + if (creation_csn != Tx::RolledBackCSN && creation_csn != Tx::PrehistoricCSN && !part->version.isRemovalTIDLocked()) + { + /// It's possible that covering part was created without transaction, + /// but if covered part was created with transaction (i.e. creation_tid is not prehistoric), + /// then it must have removal tid in metadata file. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Data part {} is Outdated and has creation TID {} and CSN {}, " + "but does not have removal tid. It's a bug or a result of manual intervention.", + part->name, part->version.creation_tid, creation_csn); + } + + /// Explicitly set removal_tid_lock for parts w/o transaction (i.e. w/o txn_version.txt) + /// to avoid keeping part forever (see VersionMetadata::canBeRemoved()) + if (!part->version.isRemovalTIDLocked()) + { + TransactionInfoContext transaction_context{part->storage.getStorageID(), part->name}; + part->version.lockRemovalTID(Tx::PrehistoricTID, transaction_context); + } +} + +MergeTreeData::LoadPartResult MergeTreeData::loadDataPart( + const MergeTreePartInfo & part_info, + const String & part_name, + const DiskPtr & part_disk_ptr, + MergeTreeDataPartState to_state, + std::mutex & part_loading_mutex) +{ + LOG_TRACE(log, "Loading {} part {} from disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName()); + + LoadPartResult res; + auto single_disk_volume = std::make_shared("volume_" + part_name, part_disk_ptr, 0); + auto data_part_storage = std::make_shared(single_disk_volume, relative_data_path, part_name); + + res.part = createPart(part_name, part_info, data_part_storage); + + String part_path = fs::path(relative_data_path) / part_name; + String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME; + + if (part_disk_ptr->exists(marker_path)) + { + /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist. + res.size_of_part = calculatePartSizeSafe(res.part, log); + res.is_broken = true; + + auto part_size_str = res.size_of_part ? formatReadableSizeWithBinarySuffix(*res.size_of_part) : "failed to calculate size"; + + LOG_WARNING(log, + "Detaching stale part {} (size: {}), which should have been deleted after a move. " + "That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", + res.part->data_part_storage->getFullPath(), part_size_str); + + return res; + } + + try + { + res.part->loadColumnsChecksumsIndexes(require_part_metadata, true); + } + catch (const Exception & e) + { + /// Don't count the part as broken if there is not enough memory to load it. + /// In fact, there can be many similar situations. + /// But it is OK, because there is a safety guard against deleting too many parts. + if (isNotEnoughMemoryErrorCode(e.code())) + throw; + + res.is_broken = true; + tryLogCurrentException(log, fmt::format("while loading part {} on path {}", res.part->name, part_path)); + } + catch (...) + { + res.is_broken = true; + tryLogCurrentException(log, fmt::format("while loading part {} on path {}", res.part->name, part_path)); + } + + /// Ignore broken parts that can appear as a result of hard server restart. + if (res.is_broken) + { + res.size_of_part = calculatePartSizeSafe(res.part, log); + auto part_size_str = res.size_of_part ? formatReadableSizeWithBinarySuffix(*res.size_of_part) : "failed to calculate size"; + + LOG_ERROR(log, + "Detaching broken part {}{} (size: {}). " + "If it happened after update, it is likely because of backward incompatibility. " + "You need to resolve this manually", + getFullPathOnDisk(part_disk_ptr), part_name, part_size_str); + + return res; + } + + res.part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime(); + res.part->loadVersionMetadata(); + + if (res.part->wasInvolvedInTransaction()) + { + /// Check if CSNs were written after committing transaction, update and write if needed. + bool version_updated = false; + auto & version = res.part->version; + chassert(!version.creation_tid.isEmpty()); + + if (!res.part->version.creation_csn) + { + auto min = TransactionLog::getCSN(res.part->version.creation_tid); + if (!min) + { + /// Transaction that created this part was not committed. Remove part. + TransactionLog::assertTIDIsNotOutdated(res.part->version.creation_tid); + min = Tx::RolledBackCSN; + } + + LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has creation_tid={}, setting creation_csn={}", + res.part->name, res.part->version.creation_tid, min); + + version.creation_csn = min; + version_updated = true; + } + + if (!version.removal_tid.isEmpty() && !version.removal_csn) + { + auto max = TransactionLog::getCSN(version.removal_tid); + if (max) + { + LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has removal_tid={}, setting removal_csn={}", + res.part->name, version.removal_tid, max); + version.removal_csn = max; + } + else + { + TransactionLog::assertTIDIsNotOutdated(version.removal_tid); + /// Transaction that tried to remove this part was not committed. Clear removal_tid. + LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing removal_tid={}", + res.part->name, version.removal_tid); + version.unlockRemovalTID(version.removal_tid, TransactionInfoContext{getStorageID(), res.part->name}); + } + + version_updated = true; + } + + /// Sanity checks + bool csn_order = !version.removal_csn || version.creation_csn <= version.removal_csn || version.removal_csn == Tx::PrehistoricCSN; + bool min_start_csn_order = version.creation_tid.start_csn <= version.creation_csn; + bool max_start_csn_order = version.removal_tid.start_csn <= version.removal_csn; + bool creation_csn_known = version.creation_csn; + if (!csn_order || !min_start_csn_order || !max_start_csn_order || !creation_csn_known) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid version metadata: {}", res.part->name, version.toString()); + + if (version_updated) + res.part->storeVersionMetadata(/* force */ true); + + /// Deactivate part if creation was not committed or if removal was. + if (version.creation_csn == Tx::RolledBackCSN || version.removal_csn) + { + preparePartForRemoval(res.part); + to_state = DataPartState::Outdated; + } + } + + res.part->setState(to_state); + + DataPartIteratorByInfo it; + bool inserted; + + { + std::lock_guard lock(part_loading_mutex); + std::tie(it, inserted) = data_parts_indexes.insert(res.part); + } + + /// Remove duplicate parts with the same checksum. + if (!inserted) + { + if ((*it)->checksums.getTotalChecksumHex() == res.part->checksums.getTotalChecksumHex()) + { + LOG_ERROR(log, "Remove duplicate part {}", data_part_storage->getFullPath()); + res.part->is_duplicate = true; + return res; + } + else + throw Exception("Part " + res.part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART); + } + + if (to_state == DataPartState::Active) + addPartContributionToDataVolume(res.part); + + LOG_TRACE(log, "Finished loading {} part {} on disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName()); + return res; +}; + +std::vector MergeTreeData::loadDataPartsFromDisk( ThreadPool & pool, size_t num_parts, std::queue & parts_queue, - bool skip_sanity_checks, const MergeTreeSettingsPtr & settings) { /// Parallel loading of data parts. @@ -1038,6 +1256,7 @@ void MergeTreeData::loadDataPartsFromDisk( threads_queue.push(i); } + std::vector loaded_parts; while (!parts_queue.empty()) { assert(!threads_queue.empty()); @@ -1077,132 +1296,11 @@ void MergeTreeData::loadDataPartsFromDisk( return !parts.empty(); })); - size_t suspicious_broken_parts = 0; - size_t suspicious_broken_parts_bytes = 0; - std::atomic has_adaptive_parts = false; - std::atomic has_non_adaptive_parts = false; - std::atomic has_lightweight_in_parts = false; - std::mutex loading_mutex; - - auto load_part = [&](const MergeTreePartInfo & part_info, const String & part_name, const DiskPtr & part_disk_ptr) - { - LOG_TRACE(log, "Loading part {} from disk {}", part_name, part_disk_ptr->getName()); - - auto single_disk_volume = std::make_shared("volume_" + part_name, part_disk_ptr, 0); - auto data_part_storage = std::make_shared(single_disk_volume, relative_data_path, part_name); - auto part = createPart(part_name, part_info, data_part_storage); - bool broken = false; - - String part_path = fs::path(relative_data_path) / part_name; - String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME; - - if (part_disk_ptr->exists(marker_path)) - { - /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist - size_t size_of_part = data_part_storage->calculateTotalSizeOnDisk(); - LOG_WARNING(log, - "Detaching stale part {}{} (size: {}), which should have been deleted after a move. " - "That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", - getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part)); - - std::lock_guard loading_lock(loading_mutex); - - broken_parts_to_detach.push_back(part); - ++suspicious_broken_parts; - suspicious_broken_parts_bytes += size_of_part; - return false; - } - - try - { - part->loadColumnsChecksumsIndexes(require_part_metadata, true); - } - catch (const Exception & e) - { - /// Don't count the part as broken if there is not enough memory to load it. - /// In fact, there can be many similar situations. - /// But it is OK, because there is a safety guard against deleting too many parts. - if (isNotEnoughMemoryErrorCode(e.code())) - throw; - - broken = true; - tryLogCurrentException(log, fmt::format("while loading part {} on path {}", part->name, part_path)); - } - catch (...) - { - broken = true; - tryLogCurrentException(log, fmt::format("while loading part {} on path {}", part->name, part_path)); - } - - /// Ignore broken parts that can appear as a result of hard server restart. - if (broken) - { - std::optional size_of_part; - try - { - /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist - size_of_part = data_part_storage->calculateTotalSizeOnDisk(); - } - catch (...) - { - tryLogCurrentException(log, fmt::format("while calculating part size {} on path {}", part->name, part_path)); - } - - std::string part_size_str = "failed to calculate size"; - if (size_of_part.has_value()) - part_size_str = formatReadableSizeWithBinarySuffix(*size_of_part); - - LOG_ERROR(log, - "Detaching broken part {}{} (size: {}). " - "If it happened after update, it is likely because of backward incompatibility. " - "You need to resolve this manually", - getFullPathOnDisk(part_disk_ptr), part_name, part_size_str); - - std::lock_guard loading_lock(loading_mutex); - - broken_parts_to_detach.push_back(part); - ++suspicious_broken_parts; - if (size_of_part.has_value()) - suspicious_broken_parts_bytes += *size_of_part; - return false; - } - - if (!part->index_granularity_info.mark_type.adaptive) - has_non_adaptive_parts.store(true, std::memory_order_relaxed); - else - has_adaptive_parts.store(true, std::memory_order_relaxed); - - /// Check if there is lightweight delete in part - if (part->hasLightweightDelete()) - has_lightweight_in_parts.store(true, std::memory_order_relaxed); - - part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime(); - /// Assume that all parts are Active, covered parts will be detected and marked as Outdated later - part->setState(DataPartState::Active); - - std::lock_guard loading_lock(loading_mutex); - auto [it, inserted] = data_parts_indexes.insert(part); - - /// Remove duplicate parts with the same checksum. - if (!inserted) - { - if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex()) - { - LOG_ERROR(log, "Remove duplicate part {}", data_part_storage->getFullPath()); - duplicate_parts_to_remove.push_back(part); - } - else - throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART); - } - - addPartContributionToDataVolume(part); - LOG_TRACE(log, "Finished part {} load on disk {}", part_name, part_disk_ptr->getName()); - return true; - }; - - std::mutex part_select_mutex; try { + std::mutex part_select_mutex; + std::mutex part_loading_mutex; + for (size_t thread = 0; thread < num_threads; ++thread) { pool.scheduleOrThrowOnError([&, thread] @@ -1235,15 +1333,19 @@ void MergeTreeData::loadDataPartsFromDisk( remaining_thread_parts.erase(thread_idx); } - bool part_is_ok = load_part(thread_part->info, thread_part->name, thread_part->disk); + thread_part->is_loaded = true; + auto res = loadDataPart(thread_part->info, thread_part->name, thread_part->disk, DataPartState::Active, part_loading_mutex); + bool is_active_part = res.part->getState() == DataPartState::Active && !res.part->is_duplicate; - if (!part_is_ok) + if (!is_active_part && !thread_part->children.empty()) { std::lock_guard lock{part_select_mutex}; for (const auto & [_, node] : thread_part->children) threads_parts[thread].push_back(node); remaining_thread_parts.insert(thread); } + + loaded_parts.push_back(std::move(res)); } }); } @@ -1256,53 +1358,37 @@ void MergeTreeData::loadDataPartsFromDisk( } pool.wait(); - - if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts) - throw Exception( - "Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", - ErrorCodes::LOGICAL_ERROR); - - has_non_adaptive_index_granularity_parts = has_non_adaptive_parts; - - if (has_lightweight_in_parts) - has_lightweight_delete_parts.store(true); - - if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks) - throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, - "Suspiciously many ({} parts, {} in total) broken parts to remove while maximum allowed broken parts count is {}. You can change the maximum value " - "with merge tree setting 'max_suspicious_broken_parts' in configuration section or in table settings in .sql file " - "(don't forget to return setting back to default value)", - suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes), settings->max_suspicious_broken_parts); - - if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks) - throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, - "Suspiciously big size ({} parts, {} in total) of all broken parts to remove while maximum allowed broken parts size is {}. " - "You can change the maximum value with merge tree setting 'max_suspicious_broken_parts_bytes' in configuration " - "section or in table settings in .sql file (don't forget to return setting back to default value)", - suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes), - formatReadableSizeWithBinarySuffix(settings->max_suspicious_broken_parts_bytes)); + return loaded_parts; } -void MergeTreeData::loadDataPartsFromWAL( - DataPartsVector & /* broken_parts_to_detach */, - DataPartsVector & duplicate_parts_to_remove, - MutableDataPartsVector & parts_from_wal) +void MergeTreeData::loadDataPartsFromWAL(MutableDataPartsVector & parts_from_wal) { + std::sort(parts_from_wal.begin(), parts_from_wal.end(), [](const auto & lhs, const auto & rhs) + { + return std::tie(lhs->info.level, lhs->info.mutation) > std::tie(rhs->info.level, rhs->info.mutation); + }); + for (auto & part : parts_from_wal) { part->modification_time = time(nullptr); /// Assume that all parts are Active, covered parts will be detected and marked as Outdated later part->setState(DataPartState::Active); + auto lo = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{DataPartState::Active, part->info}); + + if (lo != data_parts_by_state_and_info.begin() && (*std::prev(lo))->info.contains(part->info)) + continue; + + if (lo != data_parts_by_state_and_info.end() && (*lo)->info.contains(part->info)) + continue; + auto [it, inserted] = data_parts_indexes.insert(part); + if (!inserted) { if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex()) - { LOG_ERROR(log, "Remove duplicate part {}", part->data_part_storage->getFullPath()); - duplicate_parts_to_remove.push_back(part); - } else throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART); } @@ -1402,9 +1488,10 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) /// Collect parts by disks' names. std::map disk_part_map; - for (const auto & elem : loading_tree.getRoots()) - for (const auto & [_, node] : elem.second->children) - disk_part_map[node->disk->getName()].emplace_back(node); + loading_tree.traverse(/*recursive=*/ false, [&](const auto & node) + { + disk_part_map[node->disk->getName()].emplace_back(node); + }); size_t num_parts = 0; std::queue parts_queue; @@ -1426,9 +1513,40 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) DataPartsVector broken_parts_to_detach; DataPartsVector duplicate_parts_to_remove; + size_t suspicious_broken_parts = 0; + size_t suspicious_broken_parts_bytes = 0; + bool have_adaptive_parts = false; + bool have_non_adaptive_parts = false; + bool have_lightweight_in_parts = false; + bool have_parts_with_version_metadata = false; + if (num_parts > 0) - loadDataPartsFromDisk( - broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings); + { + auto loaded_parts = loadDataPartsFromDisk(pool, num_parts, parts_queue, settings); + + for (const auto & res : loaded_parts) + { + if (res.is_broken) + { + broken_parts_to_detach.push_back(res.part); + ++suspicious_broken_parts; + if (res.size_of_part) + suspicious_broken_parts_bytes += *res.size_of_part; + } + else if (res.part->is_duplicate) + { + res.part->remove(); + } + else + { + bool is_adaptive = res.part->index_granularity_info.mark_type.adaptive; + have_adaptive_parts |= is_adaptive; + have_non_adaptive_parts |= !is_adaptive; + have_lightweight_in_parts |= res.part->hasLightweightDelete(); + have_parts_with_version_metadata |= res.part->wasInvolvedInTransaction(); + } + } + } if (settings->in_memory_parts_enable_wal) { @@ -1478,8 +1596,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) parts_from_wal.insert( parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end())); - loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal); - + loadDataPartsFromWAL(parts_from_wal); num_parts += parts_from_wal.size(); } @@ -1490,6 +1607,30 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) return; } + if (have_non_adaptive_parts && have_adaptive_parts && !settings->enable_mixed_granularity_parts) + throw Exception( + "Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", + ErrorCodes::LOGICAL_ERROR); + + has_non_adaptive_index_granularity_parts = have_non_adaptive_parts; + has_lightweight_delete_parts = have_lightweight_in_parts; + transactions_enabled = have_parts_with_version_metadata; + + if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks) + throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, + "Suspiciously many ({} parts, {} in total) broken parts to remove while maximum allowed broken parts count is {}. You can change the maximum value " + "with merge tree setting 'max_suspicious_broken_parts' in configuration section or in table settings in .sql file " + "(don't forget to return setting back to default value)", + suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes), settings->max_suspicious_broken_parts); + + if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks) + throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, + "Suspiciously big size ({} parts, {} in total) of all broken parts to remove while maximum allowed broken parts size is {}. " + "You can change the maximum value with merge tree setting 'max_suspicious_broken_parts_bytes' in configuration " + "section or in table settings in .sql file (don't forget to return setting back to default value)", + suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes), + formatReadableSizeWithBinarySuffix(settings->max_suspicious_broken_parts_bytes)); + for (auto & part : broken_parts_to_detach) { auto builder = part->data_part_storage->getBuilder(); @@ -1497,178 +1638,69 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) builder->commit(); } - for (auto & part : duplicate_parts_to_remove) - part->remove(); - - auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it) - { - const DataPartPtr & part = *it; - - part->remove_time.store(part->modification_time, std::memory_order_relaxed); - auto creation_csn = part->version.creation_csn.load(std::memory_order_relaxed); - if (creation_csn != Tx::RolledBackCSN && creation_csn != Tx::PrehistoricCSN && !part->version.isRemovalTIDLocked()) - { - /// It's possible that covering part was created without transaction, - /// but if covered part was created with transaction (i.e. creation_tid is not prehistoric), - /// then it must have removal tid in metadata file. - throw Exception(ErrorCodes::LOGICAL_ERROR, "Data part {} is Outdated and has creation TID {} and CSN {}, " - "but does not have removal tid. It's a bug or a result of manual intervention.", - part->name, part->version.creation_tid, creation_csn); - } - modifyPartState(it, DataPartState::Outdated); - removePartContributionToDataVolume(part); - - /// Explicitly set removal_tid_lock for parts w/o transaction (i.e. w/o txn_version.txt) - /// to avoid keeping part forever (see VersionMetadata::canBeRemoved()) - if (!part->version.isRemovalTIDLocked()) - { - TransactionInfoContext transaction_context{getStorageID(), part->name}; - part->version.lockRemovalTID(Tx::PrehistoricTID, transaction_context); - } - }; - - /// All parts are in "Active" state after loading - assert(std::find_if(data_parts_by_state_and_info.begin(), data_parts_by_state_and_info.end(), - [](const auto & part) - { - return part->getState() != DataPartState::Active; - }) == data_parts_by_state_and_info.end()); - - bool have_parts_with_version_metadata = false; - auto iter = data_parts_by_state_and_info.begin(); - while (iter != data_parts_by_state_and_info.end() && (*iter)->getState() == DataPartState::Active) - { - const DataPartPtr & part = *iter; - part->loadVersionMetadata(); - VersionMetadata & version = part->version; - if (part->wasInvolvedInTransaction()) - { - have_parts_with_version_metadata = true; - } - else - { - ++iter; - continue; - } - - /// Check if CSNs were written after committing transaction, update and write if needed. - bool version_updated = false; - chassert(!version.creation_tid.isEmpty()); - if (!part->version.creation_csn) - { - auto min = TransactionLog::getCSN(version.creation_tid); - if (!min) - { - /// Transaction that created this part was not committed. Remove part. - TransactionLog::assertTIDIsNotOutdated(version.creation_tid); - min = Tx::RolledBackCSN; - } - LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has creation_tid={}, setting creation_csn={}", - part->name, version.creation_tid, min); - version.creation_csn = min; - version_updated = true; - } - if (!version.removal_tid.isEmpty() && !part->version.removal_csn) - { - auto max = TransactionLog::getCSN(version.removal_tid); - if (max) - { - LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has removal_tid={}, setting removal_csn={}", - part->name, version.removal_tid, max); - version.removal_csn = max; - } - else - { - TransactionLog::assertTIDIsNotOutdated(version.removal_tid); - /// Transaction that tried to remove this part was not committed. Clear removal_tid. - LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing removal_tid={}", - part->name, version.removal_tid); - version.unlockRemovalTID(version.removal_tid, TransactionInfoContext{getStorageID(), part->name}); - } - version_updated = true; - } - - /// Sanity checks - bool csn_order = !version.removal_csn || version.creation_csn <= version.removal_csn || version.removal_csn == Tx::PrehistoricCSN; - bool min_start_csn_order = version.creation_tid.start_csn <= version.creation_csn; - bool max_start_csn_order = version.removal_tid.start_csn <= version.removal_csn; - bool creation_csn_known = version.creation_csn; - if (!csn_order || !min_start_csn_order || !max_start_csn_order || !creation_csn_known) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid version metadata: {}", part->name, version.toString()); - - if (version_updated) - part->storeVersionMetadata(/* force */ true); - - /// Deactivate part if creation was not committed or if removal was. - if (version.creation_csn == Tx::RolledBackCSN || version.removal_csn) - { - auto next_it = std::next(iter); - deactivate_part(iter); - iter = next_it; - } - else - { - ++iter; - } - } - - if (have_parts_with_version_metadata) - transactions_enabled.store(true); - - /// Delete from the set of current parts those parts that are covered by another part (those parts that - /// were merged), but that for some reason are still not deleted from the filesystem. - /// Deletion of files will be performed later in the clearOldParts() method. - - auto active_parts_range = getDataPartsStateRange(DataPartState::Active); - auto prev_it = active_parts_range.begin(); - auto end_it = active_parts_range.end(); - - bool less_than_two_active_parts = prev_it == end_it || std::next(prev_it) == end_it; - - if (!less_than_two_active_parts) - { - (*prev_it)->assertState({DataPartState::Active}); - auto curr_it = std::next(prev_it); - - while (curr_it != data_parts_by_state_and_info.end() && (*curr_it)->getState() == DataPartState::Active) - { - (*curr_it)->assertState({DataPartState::Active}); - - /// Don't consider data parts belonging to different partitions. - if ((*curr_it)->info.partition_id != (*prev_it)->info.partition_id) - { - ++prev_it; - ++curr_it; - continue; - } - - if ((*curr_it)->contains(**prev_it)) - { - deactivate_part(prev_it); - prev_it = curr_it; - ++curr_it; - } - else if ((*prev_it)->contains(**curr_it)) - { - auto next = std::next(curr_it); - deactivate_part(curr_it); - curr_it = next; - } - else - { - ++prev_it; - ++curr_it; - } - } - } - resetObjectColumnsFromActiveParts(part_lock); calculateColumnAndSecondaryIndexSizesImpl(); + PartLoadingTreeNodes unloaded_parts; + loading_tree.traverse(/*recursive=*/ true, [&](const auto & node) + { + if (!node->is_loaded) + unloaded_parts.push_back(node); + }); + + { + std::lock_guard lock(unloaded_outdated_parts_mutex); + unloaded_outdated_parts = std::move(unloaded_parts); + } + LOG_DEBUG(log, "Loaded data parts ({} items)", data_parts_indexes.size()); data_parts_loading_finished = true; } +void MergeTreeData::loadOutdatedDataParts(PartLoadingTreeNodes parts_to_load) +{ + LOG_DEBUG(log, "Loading {} outdated data parts", parts_to_load.size()); + if (parts_to_load.empty()) + return; + + for (const auto & part : parts_to_load) + { + auto res = loadDataPart(part->info, part->name, part->disk, MergeTreeDataPartState::Outdated, data_parts_mutex); + + if (res.is_broken) + { + auto builder = res.part->data_part_storage->getBuilder(); + res.part->renameToDetached("broken-on-start", builder); /// detached parts must not have '_' in prefixes + builder->commit(); + } + else if (res.part->is_duplicate) + { + res.part->remove(); + } + else + { + preparePartForRemoval(res.part); + } + } + + LOG_DEBUG(log, "Loaded {} outdated data parts", parts_to_load.size()); +} + +void MergeTreeData::scheduleOutdatedDataPartsLoadingJob(BackgroundJobsAssignee & assignee) +{ + std::lock_guard lock(unloaded_outdated_parts_mutex); + if (unloaded_outdated_parts.empty()) + return; + + auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); + assignee.scheduleCommonTask(std::make_shared( + [this, share_lock, parts_to_load = std::move(unloaded_outdated_parts)] + { + loadOutdatedDataParts(std::move(parts_to_load)); + return false; + }, common_assignee_trigger, getStorageID()), /*need_trigger=*/ false); +} + /// Is the part directory old. /// True if its modification time and the modification time of all files inside it is less then threshold. /// (Only files on the first level of nesting are considered). diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 5dbeaa0e958..69b41610643 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1021,7 +1021,6 @@ protected: /// under lockForShare if rename is possible. String relative_data_path; - /// Current column sizes in compressed and uncompressed form. ColumnSizeByName column_sizes; @@ -1260,6 +1259,53 @@ protected: void resetObjectColumnsFromActiveParts(const DataPartsLock & lock); void updateObjectColumns(const DataPartPtr & part, const DataPartsLock & lock); + class PartLoadingTree + { + public: + struct Node + { + Node(const MergeTreePartInfo & info_, const String & name_, const DiskPtr & disk_) + : info(info_), name(name_), disk(disk_) + { + } + + const MergeTreePartInfo info; + const String name; + const DiskPtr disk; + + bool is_loaded = false; + std::map> children; + }; + + using NodePtr = std::shared_ptr; + using PartLoadingInfo = std::tuple; + + static PartLoadingTree build(std::vector nodes); + + template + void traverse(bool recursive, Func && func); + + private: + void add(const MergeTreePartInfo & info, const String & name, const DiskPtr & disk); + + std::unordered_map root_by_partition; + }; + + using PartLoadingTreeNodes = std::vector; + + struct LoadPartResult + { + bool is_broken = false; + std::optional size_of_part; + MutableDataPartPtr part; + }; + + mutable std::mutex unloaded_outdated_parts_mutex; + PartLoadingTreeNodes unloaded_outdated_parts TSA_GUARDED_BY(unloaded_outdated_parts_mutex); + + void loadOutdatedDataParts(PartLoadingTreeNodes parts_to_load); + void scheduleOutdatedDataPartsLoadingJob(BackgroundJobsAssignee & assignee); + static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type); static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type); @@ -1333,49 +1379,20 @@ private: /// Returns default settings for storage with possible changes from global config. virtual std::unique_ptr getDefaultSettings() const = 0; - class PartLoadingTree - { - public: - struct Node - { - Node(const MergeTreePartInfo & info_, const String & name_, const DiskPtr & disk_) - : info(info_), name(name_), disk(disk_) - { - } + LoadPartResult loadDataPart( + const MergeTreePartInfo & part_info, + const String & part_name, + const DiskPtr & part_disk_ptr, + MergeTreeDataPartState to_state, + std::mutex & part_loading_mutex); - const MergeTreePartInfo info; - const String name; - const DiskPtr disk; - - std::map> children; - }; - - using NodePtr = std::shared_ptr; - using PartLoadingInfo = std::tuple; - - static PartLoadingTree build(std::vector nodes); - void add(const MergeTreePartInfo & info, const String & name, const DiskPtr & disk); - const std::unordered_map & getRoots() const { return root_by_partition; } - - private: - std::unordered_map root_by_partition; - }; - - using PartLoadingTreeNodes = std::vector; - - void loadDataPartsFromDisk( - DataPartsVector & broken_parts_to_detach, - DataPartsVector & duplicate_parts_to_remove, + std::vector loadDataPartsFromDisk( ThreadPool & pool, size_t num_parts, std::queue & parts_queue, - bool skip_sanity_checks, const MergeTreeSettingsPtr & settings); - void loadDataPartsFromWAL( - DataPartsVector & broken_parts_to_detach, - DataPartsVector & duplicate_parts_to_remove, - MutableDataPartsVector & parts_from_wal); + void loadDataPartsFromWAL(MutableDataPartsVector & parts_from_wal); /// Create zero-copy exclusive lock for part and disk. Useful for coordination of /// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree. diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index e2b23d75746..c68adfbf3c6 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -158,6 +158,9 @@ bool ReplicatedMergeTreeRestartingThread::runImpl() storage.cleanup_thread.start(); storage.part_check_thread.start(); + if (first_time) + storage.scheduleOutdatedDataPartsLoadingJob(storage.background_operations_assignee); + return true; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index aea853b6c39..7cf6bf04a16 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -110,7 +110,6 @@ StorageMergeTree::StorageMergeTree( increment.set(getMaxBlockNumber()); loadMutations(); - loadDeduplicationLog(); } @@ -136,6 +135,7 @@ void StorageMergeTree::startup() try { background_operations_assignee.start(); + scheduleOutdatedDataPartsLoadingJob(background_operations_assignee); startBackgroundMovesIfNeeded(); } catch (...) @@ -1216,6 +1216,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign scheduled = true; } + return scheduled; } diff --git a/tests/integration/test_merge_tree_load_parts/configs/fast_background_pool.xml b/tests/integration/test_merge_tree_load_parts/configs/fast_background_pool.xml new file mode 100644 index 00000000000..74038c5472f --- /dev/null +++ b/tests/integration/test_merge_tree_load_parts/configs/fast_background_pool.xml @@ -0,0 +1,9 @@ + + 1 + 0 + 0.0 + 0 + 1 + 1 + 0 + diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py index cb0d902f677..4b5dec68c6e 100644 --- a/tests/integration/test_merge_tree_load_parts/test.py +++ b/tests/integration/test_merge_tree_load_parts/test.py @@ -1,12 +1,23 @@ import pytest import helpers.client import helpers.cluster +import time from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk cluster = helpers.cluster.ClickHouseCluster(__file__) -node1 = cluster.add_instance("node1", with_zookeeper=True, stay_alive=True) -node2 = cluster.add_instance("node2", with_zookeeper=True, stay_alive=True) +node1 = cluster.add_instance( + "node1", + main_configs=["configs/fast_background_pool.xml"], + with_zookeeper=True, + stay_alive=True, +) +node2 = cluster.add_instance( + "node2", + main_configs=["configs/fast_background_pool.xml"], + with_zookeeper=True, + stay_alive=True, +) @pytest.fixture(scope="module") @@ -35,14 +46,14 @@ def test_merge_tree_load_parts(started_cluster): node1.restart_clickhouse(kill=True) for i in range(1, 21): - assert node1.contains_in_log(f"Loading part 44_{i}_{i}_0") + assert node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0") node1.query("OPTIMIZE TABLE mt_load_parts FINAL") node1.restart_clickhouse(kill=True) - assert node1.contains_in_log("Loading part 44_1_20") + assert node1.contains_in_log("Loading Active part 44_1_20") for i in range(1, 21): - assert not node1.contains_in_log(f"Loading part 44_{i}_{i}_0") + assert not node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0") assert node1.query("SELECT count() FROM mt_load_parts") == "20\n" assert ( @@ -52,6 +63,45 @@ def test_merge_tree_load_parts(started_cluster): == "1\n" ) + MAX_RETRY = 20 + + all_outdated_loaded = False + for _ in range(MAX_RETRY): + all_outdated_loaded = all( + [ + node1.contains_in_log(f"Loading Outdated part 44_{i}_{i}_0") + for i in range(1, 21) + ] + ) + if all_outdated_loaded: + break + time.sleep(2) + + assert all_outdated_loaded + + node1.query("ALTER TABLE mt_load_parts MODIFY SETTING old_parts_lifetime = 1") + node1.query("DETACH TABLE mt_load_parts") + node1.query("ATTACH TABLE mt_load_parts") + + table_path = node1.query( + "SELECT data_paths[1] FROM system.tables WHERE table = 'mt_load_parts'" + ).strip() + + part_dirs_ok = False + for _ in range(MAX_RETRY): + part_dirs = node1.exec_in_container( + ["bash", "-c", f"ls {table_path}"], user="root" + ) + part_dirs = list( + set(part_dirs.strip().split("\n")) - {"detached", "format_version.txt"} + ) + part_dirs_ok = len(part_dirs) == 1 and part_dirs[0].startswith("44_1_20") + if part_dirs_ok: + break + time.sleep(2) + + assert part_dirs_ok + def test_merge_tree_load_parts_corrupted(started_cluster): for i, node in enumerate([node1, node2]): @@ -66,6 +116,7 @@ def test_merge_tree_load_parts_corrupted(started_cluster): node1.query( f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 0, randomPrintableASCII(10))" ) + node1.query( f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 1, randomPrintableASCII(10))" ) @@ -101,18 +152,18 @@ def test_merge_tree_load_parts_corrupted(started_cluster): def check_parts_loading(node, partition, loaded, failed, skipped): for (min_block, max_block) in loaded: part_name = f"{partition}_{min_block}_{max_block}" - assert node.contains_in_log(f"Loading part {part_name}") - assert node.contains_in_log(f"Finished part {part_name}") + assert node.contains_in_log(f"Loading Active part {part_name}") + assert node.contains_in_log(f"Finished loading Active part {part_name}") for (min_block, max_block) in failed: part_name = f"{partition}_{min_block}_{max_block}" - assert node.contains_in_log(f"Loading part {part_name}") - assert not node.contains_in_log(f"Finished part {part_name}") + assert node.contains_in_log(f"Loading Active part {part_name}") + assert not node.contains_in_log(f"Finished loading Active part {part_name}") for (min_block, max_block) in skipped: part_name = f"{partition}_{min_block}_{max_block}" - assert not node.contains_in_log(f"Loading part {part_name}") - assert not node.contains_in_log(f"Finished part {part_name}") + assert not node.contains_in_log(f"Loading Active part {part_name}") + assert not node.contains_in_log(f"Finished loading Active part {part_name}") check_parts_loading( node1, 111, loaded=[(0, 1), (2, 2)], failed=[(0, 2)], skipped=[(0, 0), (1, 1)]