diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 790b95a9fa9..f20505a6673 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -896,6 +896,261 @@ Int64 MergeTreeData::getMaxBlockNumber() const return max_block_num; } +void MergeTreeData::loadDataPartsFromDisk( + DataPartsVector & broken_parts_to_detach, + DataPartsVector & duplicate_parts_to_remove, + ThreadPool & pool, + size_t num_parts, + std::queue>> & parts_queue, + bool skip_sanity_checks, + const MergeTreeSettingsPtr & settings) +{ + /// Parallel loading of data parts. + pool.setMaxThreads(std::min(size_t(settings->max_part_loading_threads), num_parts)); + size_t num_threads = pool.getMaxThreads(); + std::vector parts_per_thread(num_threads, num_parts / num_threads); + for (size_t i = 0ul; i < num_parts % num_threads; ++i) + ++parts_per_thread[i]; + + /// Prepare data parts for parallel loading. Threads will focus on given disk first, then steal + /// others' tasks when finish current disk part loading process. + std::vector>> threads_parts(num_threads); + std::set remaining_thread_parts; + std::queue threads_queue; + for (size_t i = 0; i < num_threads; ++i) + { + remaining_thread_parts.insert(i); + threads_queue.push(i); + } + + while (!parts_queue.empty()) + { + assert(!threads_queue.empty()); + size_t i = threads_queue.front(); + auto & need_parts = parts_per_thread[i]; + assert(need_parts > 0); + auto & thread_parts = threads_parts[i]; + auto & current_parts = parts_queue.front(); + assert(!current_parts.empty()); + auto parts_to_grab = std::min(need_parts, current_parts.size()); + + thread_parts.insert(thread_parts.end(), current_parts.end() - parts_to_grab, current_parts.end()); + current_parts.resize(current_parts.size() - parts_to_grab); + need_parts -= parts_to_grab; + + /// Before processing next thread, change disk if possible. + /// Different threads will likely start loading parts from different disk, + /// which may improve read parallelism for JBOD. + + /// If current disk still has some parts, push it to the tail. + if (!current_parts.empty()) + parts_queue.push(std::move(current_parts)); + parts_queue.pop(); + + /// If current thread still want some parts, push it to the tail. + if (need_parts > 0) + threads_queue.push(i); + threads_queue.pop(); + } + assert(threads_queue.empty()); + assert(std::all_of(threads_parts.begin(), threads_parts.end(), [](const std::vector> & parts) + { + return !parts.empty(); + })); + + size_t suspicious_broken_parts = 0; + size_t suspicious_broken_parts_bytes = 0; + std::atomic has_adaptive_parts = false; + std::atomic has_non_adaptive_parts = false; + + std::mutex mutex; + auto load_part = [&](const String & part_name, const DiskPtr & part_disk_ptr) + { + auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version); + if (!part_opt) + return; + const auto & part_info = *part_opt; + auto single_disk_volume = std::make_shared("volume_" + part_name, part_disk_ptr, 0); + auto part = createPart(part_name, part_info, single_disk_volume, part_name); + bool broken = false; + + String part_path = fs::path(relative_data_path) / part_name; + String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME; + if (part_disk_ptr->exists(marker_path)) + { + /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist + size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath()); + LOG_WARNING(log, + "Detaching stale part {}{} (size: {}), which should have been deleted after a move. " + "That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", + getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part)); + std::lock_guard loading_lock(mutex); + broken_parts_to_detach.push_back(part); + ++suspicious_broken_parts; + suspicious_broken_parts_bytes += size_of_part; + return; + } + + try + { + part->loadColumnsChecksumsIndexes(require_part_metadata, true); + } + catch (const Exception & e) + { + /// Don't count the part as broken if there is not enough memory to load it. + /// In fact, there can be many similar situations. + /// But it is OK, because there is a safety guard against deleting too many parts. + if (isNotEnoughMemoryErrorCode(e.code())) + throw; + + broken = true; + tryLogCurrentException(__PRETTY_FUNCTION__); + } + catch (...) + { + broken = true; + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + /// Ignore broken parts that can appear as a result of hard server restart. + if (broken) + { + /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist + size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath()); + + LOG_ERROR(log, + "Detaching broken part {}{} (size: {}). " + "If it happened after update, it is likely because of backward incompability. " + "You need to resolve this manually", + getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part)); + std::lock_guard loading_lock(mutex); + broken_parts_to_detach.push_back(part); + ++suspicious_broken_parts; + suspicious_broken_parts_bytes += size_of_part; + return; + } + if (!part->index_granularity_info.is_adaptive) + has_non_adaptive_parts.store(true, std::memory_order_relaxed); + else + has_adaptive_parts.store(true, std::memory_order_relaxed); + + part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime(); + /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later + part->setState(DataPartState::Committed); + + std::lock_guard loading_lock(mutex); + auto [it, inserted] = data_parts_indexes.insert(part); + /// Remove duplicate parts with the same checksum. + if (!inserted) + { + if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex()) + { + LOG_ERROR(log, "Remove duplicate part {}", part->getFullPath()); + duplicate_parts_to_remove.push_back(part); + } + else + throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART); + } + + addPartContributionToDataVolume(part); + }; + + std::mutex part_select_mutex; + try + { + for (size_t thread = 0; thread < num_threads; ++thread) + { + pool.scheduleOrThrowOnError([&, thread] + { + while (true) + { + std::pair thread_part; + { + const std::lock_guard lock{part_select_mutex}; + + if (remaining_thread_parts.empty()) + return; + + /// Steal task if nothing to do + auto thread_idx = thread; + if (threads_parts[thread].empty()) + { + // Try random steal tasks from the next thread + std::uniform_int_distribution distribution(0, remaining_thread_parts.size() - 1); + auto it = remaining_thread_parts.begin(); + std::advance(it, distribution(thread_local_rng)); + thread_idx = *it; + } + auto & thread_parts = threads_parts[thread_idx]; + thread_part = thread_parts.back(); + thread_parts.pop_back(); + if (thread_parts.empty()) + remaining_thread_parts.erase(thread_idx); + } + load_part(thread_part.first, thread_part.second); + } + }); + } + } + catch (...) + { + /// If this is not done, then in case of an exception, tasks will be destroyed before the threads are completed, and it will be bad. + pool.wait(); + throw; + } + + pool.wait(); + + if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts) + throw Exception( + "Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", + ErrorCodes::LOGICAL_ERROR); + + has_non_adaptive_index_granularity_parts = has_non_adaptive_parts; + + if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks) + throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, + "Suspiciously many ({}) broken parts to remove.", + suspicious_broken_parts); + + if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks) + throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, + "Suspiciously big size ({}) of all broken parts to remove.", + formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes)); +} + + +void MergeTreeData::loadDataPartsFromWAL( + DataPartsVector & /* broken_parts_to_detach */, + DataPartsVector & duplicate_parts_to_remove, + MutableDataPartsVector & parts_from_wal, + DataPartsLock & part_lock) +{ + for (auto & part : parts_from_wal) + { + if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock)) + continue; + + part->modification_time = time(nullptr); + /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later + part->setState(DataPartState::Committed); + + auto [it, inserted] = data_parts_indexes.insert(part); + if (!inserted) + { + if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex()) + { + LOG_ERROR(log, "Remove duplicate part {}", part->getFullPath()); + duplicate_parts_to_remove.push_back(part); + } + else + throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART); + } + + addPartContributionToDataVolume(part); + } +} + void MergeTreeData::loadDataParts(bool skip_sanity_checks) { @@ -903,7 +1158,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) auto metadata_snapshot = getInMemoryMetadataPtr(); const auto settings = getSettings(); - std::vector> part_names_with_disks; MutableDataPartsVector parts_from_wal; Strings part_file_names; @@ -933,193 +1187,90 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) } } - /// Reversed order to load part from low priority disks firstly. - /// Used for keep part on low priority disk if duplication found - for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it) + /// Collect part names by disk. + std::map>> disk_part_map; + std::map disk_wal_part_map; + ThreadPool pool(disks.size()); + std::mutex wal_init_lock; + for (const auto & disk_ptr : disks) { - auto disk_ptr = *disk_it; + auto & disk_parts = disk_part_map[disk_ptr->getName()]; + auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()]; - for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) + pool.scheduleOrThrowOnError([&, disk_ptr]() { - /// Skip temporary directories, file 'format_version.txt' and directory 'detached'. - if (startsWith(it->name(), "tmp") - || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME - || it->name() == MergeTreeData::DETACHED_DIR_NAME) - continue; - - if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME)) - part_names_with_disks.emplace_back(it->name(), disk_ptr); - else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal) + for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) { - /// Create and correctly initialize global WAL object - write_ahead_log = std::make_shared(*this, disk_ptr, it->name()); - for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext())) - parts_from_wal.push_back(std::move(part)); + /// Skip temporary directories, file 'format_version.txt' and directory 'detached'. + if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME + || it->name() == MergeTreeData::DETACHED_DIR_NAME) + continue; + + if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME)) + disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr)); + else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal) + { + std::unique_lock lock(wal_init_lock); + if (write_ahead_log != nullptr) + throw Exception( + "There are multiple WAL files appeared in current storage policy. You need to resolve this manually", + ErrorCodes::CORRUPTED_DATA); + + write_ahead_log = std::make_shared(*this, disk_ptr, it->name()); + for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext())) + disk_wal_parts.push_back(std::move(part)); + } + else if (settings->in_memory_parts_enable_wal) + { + MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name()); + for (auto && part : wal.restore(metadata_snapshot, getContext())) + disk_wal_parts.push_back(std::move(part)); + } } - else if (settings->in_memory_parts_enable_wal) - { - MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name()); - for (auto && part : wal.restore(metadata_snapshot, getContext())) - parts_from_wal.push_back(std::move(part)); - } - } - } - - auto part_lock = lockParts(); - data_parts_indexes.clear(); - - if (part_names_with_disks.empty() && parts_from_wal.empty()) - { - LOG_DEBUG(log, "There are no data parts"); - return; - } - - /// Parallel loading of data parts. - size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_names_with_disks.size()); - - std::mutex mutex; - - DataPartsVector broken_parts_to_detach; - size_t suspicious_broken_parts = 0; - size_t suspicious_broken_parts_bytes = 0; - - std::atomic has_adaptive_parts = false; - std::atomic has_non_adaptive_parts = false; - - ThreadPool pool(num_threads); - - for (auto & part_names_with_disk : part_names_with_disks) - { - pool.scheduleOrThrowOnError([&] - { - const auto & [part_name, part_disk_ptr] = part_names_with_disk; - - auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version); - - if (!part_opt) - return; - - auto single_disk_volume = std::make_shared("volume_" + part_name, part_disk_ptr, 0); - auto part = createPart(part_name, *part_opt, single_disk_volume, part_name); - bool broken = false; - - String part_path = fs::path(relative_data_path) / part_name; - String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME; - - if (part_disk_ptr->exists(marker_path)) - { - /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist - size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath()); - LOG_WARNING(log, - "Detaching stale part {}{} (size: {}), which should have been deleted after a move. " - "That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", - getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part)); - std::lock_guard loading_lock(mutex); - - broken_parts_to_detach.push_back(part); - - ++suspicious_broken_parts; - suspicious_broken_parts_bytes += size_of_part; - - return; - } - - try - { - part->loadColumnsChecksumsIndexes(require_part_metadata, true); - } - catch (const Exception & e) - { - /// Don't count the part as broken if there is not enough memory to load it. - /// In fact, there can be many similar situations. - /// But it is OK, because there is a safety guard against deleting too many parts. - if (isNotEnoughMemoryErrorCode(e.code())) - throw; - - broken = true; - tryLogCurrentException(__PRETTY_FUNCTION__); - } - catch (...) - { - broken = true; - tryLogCurrentException(__PRETTY_FUNCTION__); - } - - /// Ignore broken parts that can appear as a result of hard server restart. - if (broken) - { - /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist - size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath()); - - LOG_ERROR(log, - "Detaching broken part {}{} (size: {}). " - "If it happened after update, it is likely because of backward incompability. " - "You need to resolve this manually", - getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part)); - std::lock_guard loading_lock(mutex); - - broken_parts_to_detach.push_back(part); - - ++suspicious_broken_parts; - suspicious_broken_parts_bytes += size_of_part; - - return; - } - - if (!part->index_granularity_info.is_adaptive) - has_non_adaptive_parts.store(true, std::memory_order_relaxed); - else - has_adaptive_parts.store(true, std::memory_order_relaxed); - - part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime(); - - /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later - part->setState(DataPartState::Committed); - - std::lock_guard loading_lock(mutex); - - if (!data_parts_indexes.insert(part).second) - throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists", part->name); - - addPartContributionToDataVolume(part); }); } pool.wait(); - for (auto & part : parts_from_wal) + for (auto & [_, disk_wal_parts] : disk_wal_part_map) + parts_from_wal.insert( + parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end())); + + size_t num_parts = 0; + std::queue>> parts_queue; + for (auto & [_, disk_parts] : disk_part_map) { - if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock)) + if (disk_parts.empty()) continue; - - part->modification_time = time(nullptr); - /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later - part->setState(DataPartState::Committed); - - if (!data_parts_indexes.insert(part).second) - throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); - - addPartContributionToDataVolume(part); + num_parts += disk_parts.size(); + parts_queue.push(std::move(disk_parts)); } - if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts) - throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR); + auto part_lock = lockParts(); + data_parts_indexes.clear(); - has_non_adaptive_index_granularity_parts = has_non_adaptive_parts; + if (num_parts == 0 && parts_from_wal.empty()) + { + LOG_DEBUG(log, "There are no data parts"); + return; + } - if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks) - throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, - "Suspiciously many ({}) broken parts to remove.", - suspicious_broken_parts); - if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks) - throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, - "Suspiciously big size ({}) of all broken parts to remove.", - formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes)); + DataPartsVector broken_parts_to_detach; + DataPartsVector duplicate_parts_to_remove; + + if (num_parts > 0) + loadDataPartsFromDisk( + broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings); + + if (!parts_from_wal.empty()) + loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock); for (auto & part : broken_parts_to_detach) part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes + for (auto & part : duplicate_parts_to_remove) + part->remove(); /// Delete from the set of current parts those parts that are covered by another part (those parts that /// were merged), but that for some reason are still not deleted from the filesystem. diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index e7f1db8f3ec..b87a756bf9f 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1143,6 +1143,21 @@ private: /// Returns default settings for storage with possible changes from global config. virtual std::unique_ptr getDefaultSettings() const = 0; + + void loadDataPartsFromDisk( + DataPartsVector & broken_parts_to_detach, + DataPartsVector & duplicate_parts_to_remove, + ThreadPool & pool, + size_t num_parts, + std::queue>> & parts_queue, + bool skip_sanity_checks, + const MergeTreeSettingsPtr & settings); + + void loadDataPartsFromWAL( + DataPartsVector & broken_parts_to_detach, + DataPartsVector & duplicate_parts_to_remove, + MutableDataPartsVector & parts_from_wal, + DataPartsLock & part_lock); }; /// RAII struct to record big parts that are submerging or emerging.