From 9f1104c2ad01b99455bd2270aeddd2dc2fea8dfe Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Sat, 8 Oct 2022 01:39:29 +0000 Subject: [PATCH] do not load inactive parts --- src/Storages/MergeTree/MergeTreeData.cpp | 146 +++++++++++++---- src/Storages/MergeTree/MergeTreeData.h | 31 +++- .../test_merge_tree_load_parts/__init__.py | 0 .../test_merge_tree_load_parts/test.py | 147 ++++++++++++++++++ 4 files changed, 297 insertions(+), 27 deletions(-) create mode 100644 tests/integration/test_merge_tree_load_parts/__init__.py create mode 100644 tests/integration/test_merge_tree_load_parts/test.py diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 83bedb34344..afe32833cd1 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -942,12 +942,77 @@ Int64 MergeTreeData::getMaxBlockNumber() const return max_block_num; } +void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const DiskPtr & disk) +{ + auto & current_ptr = root_by_partition[info.partition_id]; + if (!current_ptr) + current_ptr = std::make_shared(MergeTreePartInfo{}, disk); + + auto * current = current_ptr.get(); + while (true) + { + auto it = current->children.lower_bound(info); + if (it != current->children.begin()) + { + auto prev = std::prev(it); + const auto & prev_info = prev->first; + + if (prev_info.contains(info)) + { + current = prev->second.get(); + continue; + } + else if (!prev_info.isDisjoint(info)) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Part {} intersects previous part {}. It is a bug!", + info.getPartName(), prev_info.getPartName()); + } + } + + if (it != current->children.end()) + { + const auto & next_info = it->first; + + if (next_info.contains(info)) + { + current = it->second.get(); + continue; + } + else if (!next_info.isDisjoint(info)) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Part {} intersects next part {}. It is a bug!", + info.getPartName(), next_info.getPartName()); + } + } + + current->children.emplace(info, std::make_shared(info, disk)); + break; + } +} + +MergeTreeData::PartLoadingTree +MergeTreeData::PartLoadingTree::build(std::vector nodes) +{ + std::sort(nodes.begin(), nodes.end(), + [](const auto & lhs, const auto & rhs) + { + return lhs.first.level > rhs.first.level; + }); + + PartLoadingTree tree; + for (const auto & [info, disk] : nodes) + tree.add(info, disk); + return tree; +} + void MergeTreeData::loadDataPartsFromDisk( DataPartsVector & broken_parts_to_detach, DataPartsVector & duplicate_parts_to_remove, ThreadPool & pool, size_t num_parts, - std::queue>> & parts_queue, + std::queue & parts_queue, bool skip_sanity_checks, const MergeTreeSettingsPtr & settings) { @@ -962,9 +1027,10 @@ void MergeTreeData::loadDataPartsFromDisk( /// Prepare data parts for parallel loading. Threads will focus on given disk first, then steal /// others' tasks when finish current disk part loading process. - std::vector>> threads_parts(num_threads); + std::vector threads_parts(num_threads); std::set remaining_thread_parts; std::queue threads_queue; + for (size_t i = 0; i < num_threads; ++i) { remaining_thread_parts.insert(i); @@ -977,11 +1043,12 @@ void MergeTreeData::loadDataPartsFromDisk( size_t i = threads_queue.front(); auto & need_parts = parts_per_thread[i]; assert(need_parts > 0); + auto & thread_parts = threads_parts[i]; auto & current_parts = parts_queue.front(); assert(!current_parts.empty()); - auto parts_to_grab = std::min(need_parts, current_parts.size()); + auto parts_to_grab = std::min(need_parts, current_parts.size()); thread_parts.insert(thread_parts.end(), current_parts.end() - parts_to_grab, current_parts.end()); current_parts.resize(current_parts.size() - parts_to_grab); need_parts -= parts_to_grab; @@ -993,13 +1060,16 @@ void MergeTreeData::loadDataPartsFromDisk( /// If current disk still has some parts, push it to the tail. if (!current_parts.empty()) parts_queue.push(std::move(current_parts)); + parts_queue.pop(); /// If current thread still want some parts, push it to the tail. if (need_parts > 0) threads_queue.push(i); + threads_queue.pop(); } + assert(threads_queue.empty()); assert(std::all_of(threads_parts.begin(), threads_parts.end(), [](const std::vector> & parts) { @@ -1011,16 +1081,13 @@ void MergeTreeData::loadDataPartsFromDisk( std::atomic has_adaptive_parts = false; std::atomic has_non_adaptive_parts = false; std::atomic has_lightweight_in_parts = false; + std::mutex loading_mutex; - std::mutex mutex; - auto load_part = [&](const String & part_name, const DiskPtr & part_disk_ptr) + auto load_part = [&](const MergeTreePartInfo & part_info, const DiskPtr & part_disk_ptr) { - auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version); - if (!part_opt) - return; - + auto part_name = part_info.getPartName(); LOG_TRACE(log, "Loading part {} from disk {}", part_name, part_disk_ptr->getName()); - const auto & part_info = *part_opt; + auto single_disk_volume = std::make_shared("volume_" + part_name, part_disk_ptr, 0); auto data_part_storage = std::make_shared(single_disk_volume, relative_data_path, part_name); auto part = createPart(part_name, part_info, data_part_storage); @@ -1028,6 +1095,7 @@ void MergeTreeData::loadDataPartsFromDisk( String part_path = fs::path(relative_data_path) / part_name; String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME; + if (part_disk_ptr->exists(marker_path)) { /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist @@ -1036,11 +1104,13 @@ void MergeTreeData::loadDataPartsFromDisk( "Detaching stale part {}{} (size: {}), which should have been deleted after a move. " "That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part)); - std::lock_guard loading_lock(mutex); + + std::lock_guard loading_lock(loading_mutex); + broken_parts_to_detach.push_back(part); ++suspicious_broken_parts; suspicious_broken_parts_bytes += size_of_part; - return; + return false; } try @@ -1082,19 +1152,21 @@ void MergeTreeData::loadDataPartsFromDisk( if (size_of_part.has_value()) part_size_str = formatReadableSizeWithBinarySuffix(*size_of_part); - LOG_ERROR(log, "Detaching broken part {}{} (size: {}). " "If it happened after update, it is likely because of backward incompatibility. " "You need to resolve this manually", getFullPathOnDisk(part_disk_ptr), part_name, part_size_str); - std::lock_guard loading_lock(mutex); + + std::lock_guard loading_lock(loading_mutex); + broken_parts_to_detach.push_back(part); ++suspicious_broken_parts; if (size_of_part.has_value()) suspicious_broken_parts_bytes += *size_of_part; - return; + return false; } + if (!part->index_granularity_info.mark_type.adaptive) has_non_adaptive_parts.store(true, std::memory_order_relaxed); else @@ -1108,8 +1180,9 @@ void MergeTreeData::loadDataPartsFromDisk( /// Assume that all parts are Active, covered parts will be detected and marked as Outdated later part->setState(DataPartState::Active); - std::lock_guard loading_lock(mutex); + std::lock_guard loading_lock(loading_mutex); auto [it, inserted] = data_parts_indexes.insert(part); + /// Remove duplicate parts with the same checksum. if (!inserted) { @@ -1124,6 +1197,7 @@ void MergeTreeData::loadDataPartsFromDisk( addPartContributionToDataVolume(part); LOG_TRACE(log, "Finished part {} load on disk {}", part_name, part_disk_ptr->getName()); + return true; }; std::mutex part_select_mutex; @@ -1135,15 +1209,16 @@ void MergeTreeData::loadDataPartsFromDisk( { while (true) { - std::pair thread_part; + PartLoadingTree::NodePtr thread_part; + size_t thread_idx = thread; + { - const std::lock_guard lock{part_select_mutex}; + std::lock_guard lock{part_select_mutex}; if (remaining_thread_parts.empty()) return; /// Steal task if nothing to do - auto thread_idx = thread; if (threads_parts[thread].empty()) { // Try random steal tasks from the next thread @@ -1152,13 +1227,23 @@ void MergeTreeData::loadDataPartsFromDisk( std::advance(it, distribution(thread_local_rng)); thread_idx = *it; } + auto & thread_parts = threads_parts[thread_idx]; thread_part = thread_parts.back(); thread_parts.pop_back(); if (thread_parts.empty()) remaining_thread_parts.erase(thread_idx); } - load_part(thread_part.first, thread_part.second); + + bool part_is_ok = load_part(thread_part->info, thread_part->disk); + + if (!part_is_ok) + { + std::lock_guard lock{part_select_mutex}; + for (const auto & [_, node] : thread_part->children) + threads_parts[thread].push_back(node); + remaining_thread_parts.insert(thread); + } } }); } @@ -1287,17 +1372,14 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) } } - /// Collect part names by disk. - std::map>> disk_part_map; ThreadPool pool(disks.size()); + std::vector parts_with_disks; for (const auto & disk_ptr : disks) { if (disk_ptr->isBroken()) continue; - auto & disk_parts = disk_part_map[disk_ptr->getName()]; - pool.scheduleOrThrowOnError([&, disk_ptr]() { for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) @@ -1308,20 +1390,32 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) continue; if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME)) - disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr)); + if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version)) + parts_with_disks.emplace_back(*part_info, disk_ptr); } }); } + pool.wait(); + auto loading_tree = PartLoadingTree::build(std::move(parts_with_disks)); + /// Collect parts by disks' names. + std::map disk_part_map; + + for (const auto & elem : loading_tree.getRoots()) + for (const auto & [_, node] : elem.second->children) + disk_part_map[node->disk->getName()].emplace_back(node); + size_t num_parts = 0; - std::queue>> parts_queue; + std::queue parts_queue; + for (auto & [disk_name, disk_parts] : disk_part_map) { LOG_INFO(log, "Found {} parts for disk '{}' to load", disk_name, disk_parts.size()); if (disk_parts.empty()) continue; + num_parts += disk_parts.size(); parts_queue.push(std::move(disk_parts)); } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index c3a70a9893b..54a8e4fffe8 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1329,12 +1329,41 @@ private: /// Returns default settings for storage with possible changes from global config. virtual std::unique_ptr getDefaultSettings() const = 0; + class PartLoadingTree + { + public: + struct Node + { + Node(const MergeTreePartInfo & info_, const DiskPtr & disk_) + : info(info_), disk(disk_) + { + } + + MergeTreePartInfo info; + DiskPtr disk; + + std::map> children; + }; + + using NodePtr = std::shared_ptr; + using PartInfoWithDisk = std::pair; + + static PartLoadingTree build(std::vector nodes); + void add(const MergeTreePartInfo & info, const DiskPtr & disk); + const std::unordered_map & getRoots() const { return root_by_partition; } + + private: + std::unordered_map root_by_partition; + }; + + using PartLoadingTreeNodes = std::vector; + void loadDataPartsFromDisk( DataPartsVector & broken_parts_to_detach, DataPartsVector & duplicate_parts_to_remove, ThreadPool & pool, size_t num_parts, - std::queue>> & parts_queue, + std::queue & parts_queue, bool skip_sanity_checks, const MergeTreeSettingsPtr & settings); diff --git a/tests/integration/test_merge_tree_load_parts/__init__.py b/tests/integration/test_merge_tree_load_parts/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py new file mode 100644 index 00000000000..cb0d902f677 --- /dev/null +++ b/tests/integration/test_merge_tree_load_parts/test.py @@ -0,0 +1,147 @@ +import pytest +import helpers.client +import helpers.cluster +from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk + + +cluster = helpers.cluster.ClickHouseCluster(__file__) +node1 = cluster.add_instance("node1", with_zookeeper=True, stay_alive=True) +node2 = cluster.add_instance("node2", with_zookeeper=True, stay_alive=True) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def test_merge_tree_load_parts(started_cluster): + node1.query( + """ + CREATE TABLE mt_load_parts (pk UInt32, id UInt32, s String) + ENGINE = MergeTree ORDER BY id PARTITION BY pk""" + ) + + node1.query("SYSTEM STOP MERGES mt_load_parts") + + for i in range(20): + node1.query( + f"INSERT INTO mt_load_parts VALUES (44, {i}, randomPrintableASCII(10))" + ) + + node1.restart_clickhouse(kill=True) + for i in range(1, 21): + assert node1.contains_in_log(f"Loading part 44_{i}_{i}_0") + + node1.query("OPTIMIZE TABLE mt_load_parts FINAL") + node1.restart_clickhouse(kill=True) + + assert node1.contains_in_log("Loading part 44_1_20") + for i in range(1, 21): + assert not node1.contains_in_log(f"Loading part 44_{i}_{i}_0") + + assert node1.query("SELECT count() FROM mt_load_parts") == "20\n" + assert ( + node1.query( + "SELECT count() FROM system.parts WHERE table = 'mt_load_parts' AND active" + ) + == "1\n" + ) + + +def test_merge_tree_load_parts_corrupted(started_cluster): + for i, node in enumerate([node1, node2]): + node.query( + f""" + CREATE TABLE mt_load_parts_2 (pk UInt32, id UInt32, s String) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/mt_load_parts_2', '{i}') ORDER BY id PARTITION BY pk""" + ) + + """min-max blocks in created parts: 1_1_0, 2_2_0, 1_2_1, 3_3_0, 1_3_2""" + for partition in [111, 222, 333]: + node1.query( + f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 0, randomPrintableASCII(10))" + ) + node1.query( + f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 1, randomPrintableASCII(10))" + ) + + node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL") + + node1.query( + f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 2, randomPrintableASCII(10))" + ) + + node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL") + + node2.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30) + + def get_part_name(node, partition, min_block, max_block): + return node.query( + f""" + SELECT name FROM system.parts + WHERE table = 'mt_load_parts_2' + AND partition = '{partition}' + AND min_block_number = {min_block} + AND max_block_number = {max_block}""" + ).strip() + + corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 111, 0, 2)) + corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 2)) + corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 1)) + corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 0, 1)) + corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 2, 2)) + + node1.restart_clickhouse(kill=True) + + def check_parts_loading(node, partition, loaded, failed, skipped): + for (min_block, max_block) in loaded: + part_name = f"{partition}_{min_block}_{max_block}" + assert node.contains_in_log(f"Loading part {part_name}") + assert node.contains_in_log(f"Finished part {part_name}") + + for (min_block, max_block) in failed: + part_name = f"{partition}_{min_block}_{max_block}" + assert node.contains_in_log(f"Loading part {part_name}") + assert not node.contains_in_log(f"Finished part {part_name}") + + for (min_block, max_block) in skipped: + part_name = f"{partition}_{min_block}_{max_block}" + assert not node.contains_in_log(f"Loading part {part_name}") + assert not node.contains_in_log(f"Finished part {part_name}") + + check_parts_loading( + node1, 111, loaded=[(0, 1), (2, 2)], failed=[(0, 2)], skipped=[(0, 0), (1, 1)] + ) + check_parts_loading( + node1, 222, loaded=[(0, 0), (1, 1), (2, 2)], failed=[(0, 2), (0, 1)], skipped=[] + ) + check_parts_loading( + node1, 333, loaded=[(0, 2)], failed=[], skipped=[(0, 0), (1, 1), (2, 2), (0, 1)] + ) + + node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30) + node1.query("OPTIMIZE TABLE mt_load_parts_2 FINAL") + node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30) + + assert ( + node1.query( + """ + SELECT pk, count() FROM mt_load_parts_2 + GROUP BY pk ORDER BY pk""" + ) + == "111\t3\n222\t3\n333\t3\n" + ) + assert ( + node1.query( + """ + SELECT partition, count() + FROM system.parts WHERE table = 'mt_load_parts_2' AND active + GROUP BY partition ORDER BY partition""" + ) + == "111\t1\n222\t1\n333\t1\n" + )