do not load inactive parts

This commit is contained in:
Anton Popov 2022-10-08 01:39:29 +00:00
parent 3d06db9a37
commit 9f1104c2ad
4 changed files with 297 additions and 27 deletions

View File

@ -942,12 +942,77 @@ Int64 MergeTreeData::getMaxBlockNumber() const
return max_block_num;
}
void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const DiskPtr & disk)
{
auto & current_ptr = root_by_partition[info.partition_id];
if (!current_ptr)
current_ptr = std::make_shared<Node>(MergeTreePartInfo{}, disk);
auto * current = current_ptr.get();
while (true)
{
auto it = current->children.lower_bound(info);
if (it != current->children.begin())
{
auto prev = std::prev(it);
const auto & prev_info = prev->first;
if (prev_info.contains(info))
{
current = prev->second.get();
continue;
}
else if (!prev_info.isDisjoint(info))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Part {} intersects previous part {}. It is a bug!",
info.getPartName(), prev_info.getPartName());
}
}
if (it != current->children.end())
{
const auto & next_info = it->first;
if (next_info.contains(info))
{
current = it->second.get();
continue;
}
else if (!next_info.isDisjoint(info))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Part {} intersects next part {}. It is a bug!",
info.getPartName(), next_info.getPartName());
}
}
current->children.emplace(info, std::make_shared<Node>(info, disk));
break;
}
}
MergeTreeData::PartLoadingTree
MergeTreeData::PartLoadingTree::build(std::vector<PartInfoWithDisk> nodes)
{
std::sort(nodes.begin(), nodes.end(),
[](const auto & lhs, const auto & rhs)
{
return lhs.first.level > rhs.first.level;
});
PartLoadingTree tree;
for (const auto & [info, disk] : nodes)
tree.add(info, disk);
return tree;
}
void MergeTreeData::loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
ThreadPool & pool,
size_t num_parts,
std::queue<std::vector<std::pair<String, DiskPtr>>> & parts_queue,
std::queue<PartLoadingTreeNodes> & parts_queue,
bool skip_sanity_checks,
const MergeTreeSettingsPtr & settings)
{
@ -962,9 +1027,10 @@ void MergeTreeData::loadDataPartsFromDisk(
/// Prepare data parts for parallel loading. Threads will focus on given disk first, then steal
/// others' tasks when finish current disk part loading process.
std::vector<std::vector<std::pair<String, DiskPtr>>> threads_parts(num_threads);
std::vector<PartLoadingTreeNodes> threads_parts(num_threads);
std::set<size_t> remaining_thread_parts;
std::queue<size_t> threads_queue;
for (size_t i = 0; i < num_threads; ++i)
{
remaining_thread_parts.insert(i);
@ -977,11 +1043,12 @@ void MergeTreeData::loadDataPartsFromDisk(
size_t i = threads_queue.front();
auto & need_parts = parts_per_thread[i];
assert(need_parts > 0);
auto & thread_parts = threads_parts[i];
auto & current_parts = parts_queue.front();
assert(!current_parts.empty());
auto parts_to_grab = std::min(need_parts, current_parts.size());
auto parts_to_grab = std::min(need_parts, current_parts.size());
thread_parts.insert(thread_parts.end(), current_parts.end() - parts_to_grab, current_parts.end());
current_parts.resize(current_parts.size() - parts_to_grab);
need_parts -= parts_to_grab;
@ -993,13 +1060,16 @@ void MergeTreeData::loadDataPartsFromDisk(
/// If current disk still has some parts, push it to the tail.
if (!current_parts.empty())
parts_queue.push(std::move(current_parts));
parts_queue.pop();
/// If current thread still want some parts, push it to the tail.
if (need_parts > 0)
threads_queue.push(i);
threads_queue.pop();
}
assert(threads_queue.empty());
assert(std::all_of(threads_parts.begin(), threads_parts.end(), [](const std::vector<std::pair<String, DiskPtr>> & parts)
{
@ -1011,16 +1081,13 @@ void MergeTreeData::loadDataPartsFromDisk(
std::atomic<bool> has_adaptive_parts = false;
std::atomic<bool> has_non_adaptive_parts = false;
std::atomic<bool> has_lightweight_in_parts = false;
std::mutex loading_mutex;
std::mutex mutex;
auto load_part = [&](const String & part_name, const DiskPtr & part_disk_ptr)
auto load_part = [&](const MergeTreePartInfo & part_info, const DiskPtr & part_disk_ptr)
{
auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version);
if (!part_opt)
return;
auto part_name = part_info.getPartName();
LOG_TRACE(log, "Loading part {} from disk {}", part_name, part_disk_ptr->getName());
const auto & part_info = *part_opt;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, part_name);
auto part = createPart(part_name, part_info, data_part_storage);
@ -1028,6 +1095,7 @@ void MergeTreeData::loadDataPartsFromDisk(
String part_path = fs::path(relative_data_path) / part_name;
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
if (part_disk_ptr->exists(marker_path))
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
@ -1036,11 +1104,13 @@ void MergeTreeData::loadDataPartsFromDisk(
"Detaching stale part {}{} (size: {}), which should have been deleted after a move. "
"That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.",
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
std::lock_guard loading_lock(mutex);
std::lock_guard loading_lock(loading_mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
suspicious_broken_parts_bytes += size_of_part;
return;
return false;
}
try
@ -1082,19 +1152,21 @@ void MergeTreeData::loadDataPartsFromDisk(
if (size_of_part.has_value())
part_size_str = formatReadableSizeWithBinarySuffix(*size_of_part);
LOG_ERROR(log,
"Detaching broken part {}{} (size: {}). "
"If it happened after update, it is likely because of backward incompatibility. "
"You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name, part_size_str);
std::lock_guard loading_lock(mutex);
std::lock_guard loading_lock(loading_mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
if (size_of_part.has_value())
suspicious_broken_parts_bytes += *size_of_part;
return;
return false;
}
if (!part->index_granularity_info.mark_type.adaptive)
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
else
@ -1108,8 +1180,9 @@ void MergeTreeData::loadDataPartsFromDisk(
/// Assume that all parts are Active, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Active);
std::lock_guard loading_lock(mutex);
std::lock_guard loading_lock(loading_mutex);
auto [it, inserted] = data_parts_indexes.insert(part);
/// Remove duplicate parts with the same checksum.
if (!inserted)
{
@ -1124,6 +1197,7 @@ void MergeTreeData::loadDataPartsFromDisk(
addPartContributionToDataVolume(part);
LOG_TRACE(log, "Finished part {} load on disk {}", part_name, part_disk_ptr->getName());
return true;
};
std::mutex part_select_mutex;
@ -1135,15 +1209,16 @@ void MergeTreeData::loadDataPartsFromDisk(
{
while (true)
{
std::pair<String, DiskPtr> thread_part;
PartLoadingTree::NodePtr thread_part;
size_t thread_idx = thread;
{
const std::lock_guard lock{part_select_mutex};
std::lock_guard lock{part_select_mutex};
if (remaining_thread_parts.empty())
return;
/// Steal task if nothing to do
auto thread_idx = thread;
if (threads_parts[thread].empty())
{
// Try random steal tasks from the next thread
@ -1152,13 +1227,23 @@ void MergeTreeData::loadDataPartsFromDisk(
std::advance(it, distribution(thread_local_rng));
thread_idx = *it;
}
auto & thread_parts = threads_parts[thread_idx];
thread_part = thread_parts.back();
thread_parts.pop_back();
if (thread_parts.empty())
remaining_thread_parts.erase(thread_idx);
}
load_part(thread_part.first, thread_part.second);
bool part_is_ok = load_part(thread_part->info, thread_part->disk);
if (!part_is_ok)
{
std::lock_guard lock{part_select_mutex};
for (const auto & [_, node] : thread_part->children)
threads_parts[thread].push_back(node);
remaining_thread_parts.insert(thread);
}
}
});
}
@ -1287,17 +1372,14 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
}
/// Collect part names by disk.
std::map<String, std::vector<std::pair<String, DiskPtr>>> disk_part_map;
ThreadPool pool(disks.size());
std::vector<PartLoadingTree::PartInfoWithDisk> parts_with_disks;
for (const auto & disk_ptr : disks)
{
if (disk_ptr->isBroken())
continue;
auto & disk_parts = disk_part_map[disk_ptr->getName()];
pool.scheduleOrThrowOnError([&, disk_ptr]()
{
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
@ -1308,20 +1390,32 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version))
parts_with_disks.emplace_back(*part_info, disk_ptr);
}
});
}
pool.wait();
auto loading_tree = PartLoadingTree::build(std::move(parts_with_disks));
/// Collect parts by disks' names.
std::map<String, PartLoadingTreeNodes> disk_part_map;
for (const auto & elem : loading_tree.getRoots())
for (const auto & [_, node] : elem.second->children)
disk_part_map[node->disk->getName()].emplace_back(node);
size_t num_parts = 0;
std::queue<std::vector<std::pair<String, DiskPtr>>> parts_queue;
std::queue<PartLoadingTreeNodes> parts_queue;
for (auto & [disk_name, disk_parts] : disk_part_map)
{
LOG_INFO(log, "Found {} parts for disk '{}' to load", disk_name, disk_parts.size());
if (disk_parts.empty())
continue;
num_parts += disk_parts.size();
parts_queue.push(std::move(disk_parts));
}

View File

@ -1329,12 +1329,41 @@ private:
/// Returns default settings for storage with possible changes from global config.
virtual std::unique_ptr<MergeTreeSettings> getDefaultSettings() const = 0;
class PartLoadingTree
{
public:
struct Node
{
Node(const MergeTreePartInfo & info_, const DiskPtr & disk_)
: info(info_), disk(disk_)
{
}
MergeTreePartInfo info;
DiskPtr disk;
std::map<MergeTreePartInfo, std::shared_ptr<Node>> children;
};
using NodePtr = std::shared_ptr<Node>;
using PartInfoWithDisk = std::pair<MergeTreePartInfo, DiskPtr>;
static PartLoadingTree build(std::vector<PartInfoWithDisk> nodes);
void add(const MergeTreePartInfo & info, const DiskPtr & disk);
const std::unordered_map<String, NodePtr> & getRoots() const { return root_by_partition; }
private:
std::unordered_map<String, NodePtr> root_by_partition;
};
using PartLoadingTreeNodes = std::vector<PartLoadingTree::NodePtr>;
void loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
ThreadPool & pool,
size_t num_parts,
std::queue<std::vector<std::pair<String, DiskPtr>>> & parts_queue,
std::queue<PartLoadingTreeNodes> & parts_queue,
bool skip_sanity_checks,
const MergeTreeSettingsPtr & settings);

View File

@ -0,0 +1,147 @@
import pytest
import helpers.client
import helpers.cluster
from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk
cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True, stay_alive=True)
node2 = cluster.add_instance("node2", with_zookeeper=True, stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_merge_tree_load_parts(started_cluster):
node1.query(
"""
CREATE TABLE mt_load_parts (pk UInt32, id UInt32, s String)
ENGINE = MergeTree ORDER BY id PARTITION BY pk"""
)
node1.query("SYSTEM STOP MERGES mt_load_parts")
for i in range(20):
node1.query(
f"INSERT INTO mt_load_parts VALUES (44, {i}, randomPrintableASCII(10))"
)
node1.restart_clickhouse(kill=True)
for i in range(1, 21):
assert node1.contains_in_log(f"Loading part 44_{i}_{i}_0")
node1.query("OPTIMIZE TABLE mt_load_parts FINAL")
node1.restart_clickhouse(kill=True)
assert node1.contains_in_log("Loading part 44_1_20")
for i in range(1, 21):
assert not node1.contains_in_log(f"Loading part 44_{i}_{i}_0")
assert node1.query("SELECT count() FROM mt_load_parts") == "20\n"
assert (
node1.query(
"SELECT count() FROM system.parts WHERE table = 'mt_load_parts' AND active"
)
== "1\n"
)
def test_merge_tree_load_parts_corrupted(started_cluster):
for i, node in enumerate([node1, node2]):
node.query(
f"""
CREATE TABLE mt_load_parts_2 (pk UInt32, id UInt32, s String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/mt_load_parts_2', '{i}') ORDER BY id PARTITION BY pk"""
)
"""min-max blocks in created parts: 1_1_0, 2_2_0, 1_2_1, 3_3_0, 1_3_2"""
for partition in [111, 222, 333]:
node1.query(
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 0, randomPrintableASCII(10))"
)
node1.query(
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 1, randomPrintableASCII(10))"
)
node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL")
node1.query(
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 2, randomPrintableASCII(10))"
)
node1.query(f"OPTIMIZE TABLE mt_load_parts_2 PARTITION {partition} FINAL")
node2.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
def get_part_name(node, partition, min_block, max_block):
return node.query(
f"""
SELECT name FROM system.parts
WHERE table = 'mt_load_parts_2'
AND partition = '{partition}'
AND min_block_number = {min_block}
AND max_block_number = {max_block}"""
).strip()
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 111, 0, 2))
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 2))
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 222, 0, 1))
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 0, 1))
corrupt_part_data_on_disk(node1, "mt_load_parts_2", get_part_name(node1, 333, 2, 2))
node1.restart_clickhouse(kill=True)
def check_parts_loading(node, partition, loaded, failed, skipped):
for (min_block, max_block) in loaded:
part_name = f"{partition}_{min_block}_{max_block}"
assert node.contains_in_log(f"Loading part {part_name}")
assert node.contains_in_log(f"Finished part {part_name}")
for (min_block, max_block) in failed:
part_name = f"{partition}_{min_block}_{max_block}"
assert node.contains_in_log(f"Loading part {part_name}")
assert not node.contains_in_log(f"Finished part {part_name}")
for (min_block, max_block) in skipped:
part_name = f"{partition}_{min_block}_{max_block}"
assert not node.contains_in_log(f"Loading part {part_name}")
assert not node.contains_in_log(f"Finished part {part_name}")
check_parts_loading(
node1, 111, loaded=[(0, 1), (2, 2)], failed=[(0, 2)], skipped=[(0, 0), (1, 1)]
)
check_parts_loading(
node1, 222, loaded=[(0, 0), (1, 1), (2, 2)], failed=[(0, 2), (0, 1)], skipped=[]
)
check_parts_loading(
node1, 333, loaded=[(0, 2)], failed=[], skipped=[(0, 0), (1, 1), (2, 2), (0, 1)]
)
node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
node1.query("OPTIMIZE TABLE mt_load_parts_2 FINAL")
node1.query("SYSTEM SYNC REPLICA mt_load_parts_2", timeout=30)
assert (
node1.query(
"""
SELECT pk, count() FROM mt_load_parts_2
GROUP BY pk ORDER BY pk"""
)
== "111\t3\n222\t3\n333\t3\n"
)
assert (
node1.query(
"""
SELECT partition, count()
FROM system.parts WHERE table = 'mt_load_parts_2' AND active
GROUP BY partition ORDER BY partition"""
)
== "111\t1\n222\t1\n333\t1\n"
)