remove outdated parts asynchronously

This commit is contained in:
Anton Popov 2022-10-14 14:52:57 +00:00
parent 71a223c1c9
commit 95fdb0a877
7 changed files with 497 additions and 385 deletions

View File

@ -1460,7 +1460,6 @@ void IMergeTreeDataPart::remove() const
return CanRemoveDescription{.can_remove_anything = can_remove, .files_not_to_remove = files_not_to_remove };
};
if (!isStoredOnDisk())
return;

View File

@ -992,6 +992,22 @@ void MergeTreeData::PartLoadingTree::add(const MergeTreePartInfo & info, const S
}
}
template <typename Func>
void MergeTreeData::PartLoadingTree::traverse(bool recursive, Func && func)
{
std::function<void(const NodePtr &)> traverse_impl = [&](const auto & node)
{
func(node);
if (recursive)
for (const auto & [_, child] : node->children)
traverse_impl(child);
};
for (const auto & elem : root_by_partition)
for (const auto & [_, node] : elem.second->children)
traverse_impl(node);
}
MergeTreeData::PartLoadingTree
MergeTreeData::PartLoadingTree::build(std::vector<PartLoadingInfo> nodes)
{
@ -1008,13 +1024,215 @@ MergeTreeData::PartLoadingTree::build(std::vector<PartLoadingInfo> nodes)
return tree;
}
void MergeTreeData::loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
static std::optional<size_t> calculatePartSizeSafe(
const MergeTreeData::DataPartPtr & part, Poco::Logger * log)
{
try
{
return part->data_part_storage->calculateTotalSizeOnDisk();
}
catch (...)
{
tryLogCurrentException(log, fmt::format("while calculating part size {} on path {}",
part->name, part->data_part_storage->getRelativePath()));
return {};
}
}
static void preparePartForRemoval(const MergeTreeMutableDataPartPtr & part)
{
part->remove_time.store(part->modification_time, std::memory_order_relaxed);
auto creation_csn = part->version.creation_csn.load(std::memory_order_relaxed);
if (creation_csn != Tx::RolledBackCSN && creation_csn != Tx::PrehistoricCSN && !part->version.isRemovalTIDLocked())
{
/// It's possible that covering part was created without transaction,
/// but if covered part was created with transaction (i.e. creation_tid is not prehistoric),
/// then it must have removal tid in metadata file.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Data part {} is Outdated and has creation TID {} and CSN {}, "
"but does not have removal tid. It's a bug or a result of manual intervention.",
part->name, part->version.creation_tid, creation_csn);
}
/// Explicitly set removal_tid_lock for parts w/o transaction (i.e. w/o txn_version.txt)
/// to avoid keeping part forever (see VersionMetadata::canBeRemoved())
if (!part->version.isRemovalTIDLocked())
{
TransactionInfoContext transaction_context{part->storage.getStorageID(), part->name};
part->version.lockRemovalTID(Tx::PrehistoricTID, transaction_context);
}
}
MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
const MergeTreePartInfo & part_info,
const String & part_name,
const DiskPtr & part_disk_ptr,
MergeTreeDataPartState to_state,
std::mutex & part_loading_mutex)
{
LOG_TRACE(log, "Loading {} part {} from disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());
LoadPartResult res;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, part_name);
res.part = createPart(part_name, part_info, data_part_storage);
String part_path = fs::path(relative_data_path) / part_name;
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
if (part_disk_ptr->exists(marker_path))
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist.
res.size_of_part = calculatePartSizeSafe(res.part, log);
res.is_broken = true;
auto part_size_str = res.size_of_part ? formatReadableSizeWithBinarySuffix(*res.size_of_part) : "failed to calculate size";
LOG_WARNING(log,
"Detaching stale part {} (size: {}), which should have been deleted after a move. "
"That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.",
res.part->data_part_storage->getFullPath(), part_size_str);
return res;
}
try
{
res.part->loadColumnsChecksumsIndexes(require_part_metadata, true);
}
catch (const Exception & e)
{
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (isNotEnoughMemoryErrorCode(e.code()))
throw;
res.is_broken = true;
tryLogCurrentException(log, fmt::format("while loading part {} on path {}", res.part->name, part_path));
}
catch (...)
{
res.is_broken = true;
tryLogCurrentException(log, fmt::format("while loading part {} on path {}", res.part->name, part_path));
}
/// Ignore broken parts that can appear as a result of hard server restart.
if (res.is_broken)
{
res.size_of_part = calculatePartSizeSafe(res.part, log);
auto part_size_str = res.size_of_part ? formatReadableSizeWithBinarySuffix(*res.size_of_part) : "failed to calculate size";
LOG_ERROR(log,
"Detaching broken part {}{} (size: {}). "
"If it happened after update, it is likely because of backward incompatibility. "
"You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name, part_size_str);
return res;
}
res.part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime();
res.part->loadVersionMetadata();
if (res.part->wasInvolvedInTransaction())
{
/// Check if CSNs were written after committing transaction, update and write if needed.
bool version_updated = false;
auto & version = res.part->version;
chassert(!version.creation_tid.isEmpty());
if (!res.part->version.creation_csn)
{
auto min = TransactionLog::getCSN(res.part->version.creation_tid);
if (!min)
{
/// Transaction that created this part was not committed. Remove part.
TransactionLog::assertTIDIsNotOutdated(res.part->version.creation_tid);
min = Tx::RolledBackCSN;
}
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has creation_tid={}, setting creation_csn={}",
res.part->name, res.part->version.creation_tid, min);
version.creation_csn = min;
version_updated = true;
}
if (!version.removal_tid.isEmpty() && !version.removal_csn)
{
auto max = TransactionLog::getCSN(version.removal_tid);
if (max)
{
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has removal_tid={}, setting removal_csn={}",
res.part->name, version.removal_tid, max);
version.removal_csn = max;
}
else
{
TransactionLog::assertTIDIsNotOutdated(version.removal_tid);
/// Transaction that tried to remove this part was not committed. Clear removal_tid.
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing removal_tid={}",
res.part->name, version.removal_tid);
version.unlockRemovalTID(version.removal_tid, TransactionInfoContext{getStorageID(), res.part->name});
}
version_updated = true;
}
/// Sanity checks
bool csn_order = !version.removal_csn || version.creation_csn <= version.removal_csn || version.removal_csn == Tx::PrehistoricCSN;
bool min_start_csn_order = version.creation_tid.start_csn <= version.creation_csn;
bool max_start_csn_order = version.removal_tid.start_csn <= version.removal_csn;
bool creation_csn_known = version.creation_csn;
if (!csn_order || !min_start_csn_order || !max_start_csn_order || !creation_csn_known)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid version metadata: {}", res.part->name, version.toString());
if (version_updated)
res.part->storeVersionMetadata(/* force */ true);
/// Deactivate part if creation was not committed or if removal was.
if (version.creation_csn == Tx::RolledBackCSN || version.removal_csn)
{
preparePartForRemoval(res.part);
to_state = DataPartState::Outdated;
}
}
res.part->setState(to_state);
DataPartIteratorByInfo it;
bool inserted;
{
std::lock_guard lock(part_loading_mutex);
std::tie(it, inserted) = data_parts_indexes.insert(res.part);
}
/// Remove duplicate parts with the same checksum.
if (!inserted)
{
if ((*it)->checksums.getTotalChecksumHex() == res.part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", data_part_storage->getFullPath());
res.part->is_duplicate = true;
return res;
}
else
throw Exception("Part " + res.part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART);
}
if (to_state == DataPartState::Active)
addPartContributionToDataVolume(res.part);
LOG_TRACE(log, "Finished loading {} part {} on disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());
return res;
};
std::vector<MergeTreeData::LoadPartResult> MergeTreeData::loadDataPartsFromDisk(
ThreadPool & pool,
size_t num_parts,
std::queue<PartLoadingTreeNodes> & parts_queue,
bool skip_sanity_checks,
const MergeTreeSettingsPtr & settings)
{
/// Parallel loading of data parts.
@ -1038,6 +1256,7 @@ void MergeTreeData::loadDataPartsFromDisk(
threads_queue.push(i);
}
std::vector<LoadPartResult> loaded_parts;
while (!parts_queue.empty())
{
assert(!threads_queue.empty());
@ -1077,132 +1296,11 @@ void MergeTreeData::loadDataPartsFromDisk(
return !parts.empty();
}));
size_t suspicious_broken_parts = 0;
size_t suspicious_broken_parts_bytes = 0;
std::atomic<bool> has_adaptive_parts = false;
std::atomic<bool> has_non_adaptive_parts = false;
std::atomic<bool> has_lightweight_in_parts = false;
std::mutex loading_mutex;
auto load_part = [&](const MergeTreePartInfo & part_info, const String & part_name, const DiskPtr & part_disk_ptr)
{
LOG_TRACE(log, "Loading part {} from disk {}", part_name, part_disk_ptr->getName());
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(single_disk_volume, relative_data_path, part_name);
auto part = createPart(part_name, part_info, data_part_storage);
bool broken = false;
String part_path = fs::path(relative_data_path) / part_name;
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
if (part_disk_ptr->exists(marker_path))
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
size_t size_of_part = data_part_storage->calculateTotalSizeOnDisk();
LOG_WARNING(log,
"Detaching stale part {}{} (size: {}), which should have been deleted after a move. "
"That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.",
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
std::lock_guard loading_lock(loading_mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
suspicious_broken_parts_bytes += size_of_part;
return false;
}
try
{
part->loadColumnsChecksumsIndexes(require_part_metadata, true);
}
catch (const Exception & e)
{
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (isNotEnoughMemoryErrorCode(e.code()))
throw;
broken = true;
tryLogCurrentException(log, fmt::format("while loading part {} on path {}", part->name, part_path));
}
catch (...)
{
broken = true;
tryLogCurrentException(log, fmt::format("while loading part {} on path {}", part->name, part_path));
}
/// Ignore broken parts that can appear as a result of hard server restart.
if (broken)
{
std::optional<size_t> size_of_part;
try
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
size_of_part = data_part_storage->calculateTotalSizeOnDisk();
}
catch (...)
{
tryLogCurrentException(log, fmt::format("while calculating part size {} on path {}", part->name, part_path));
}
std::string part_size_str = "failed to calculate size";
if (size_of_part.has_value())
part_size_str = formatReadableSizeWithBinarySuffix(*size_of_part);
LOG_ERROR(log,
"Detaching broken part {}{} (size: {}). "
"If it happened after update, it is likely because of backward incompatibility. "
"You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name, part_size_str);
std::lock_guard loading_lock(loading_mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
if (size_of_part.has_value())
suspicious_broken_parts_bytes += *size_of_part;
return false;
}
if (!part->index_granularity_info.mark_type.adaptive)
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
else
has_adaptive_parts.store(true, std::memory_order_relaxed);
/// Check if there is lightweight delete in part
if (part->hasLightweightDelete())
has_lightweight_in_parts.store(true, std::memory_order_relaxed);
part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime();
/// Assume that all parts are Active, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Active);
std::lock_guard loading_lock(loading_mutex);
auto [it, inserted] = data_parts_indexes.insert(part);
/// Remove duplicate parts with the same checksum.
if (!inserted)
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", data_part_storage->getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART);
}
addPartContributionToDataVolume(part);
LOG_TRACE(log, "Finished part {} load on disk {}", part_name, part_disk_ptr->getName());
return true;
};
std::mutex part_select_mutex;
try
{
std::mutex part_select_mutex;
std::mutex part_loading_mutex;
for (size_t thread = 0; thread < num_threads; ++thread)
{
pool.scheduleOrThrowOnError([&, thread]
@ -1235,15 +1333,19 @@ void MergeTreeData::loadDataPartsFromDisk(
remaining_thread_parts.erase(thread_idx);
}
bool part_is_ok = load_part(thread_part->info, thread_part->name, thread_part->disk);
thread_part->is_loaded = true;
auto res = loadDataPart(thread_part->info, thread_part->name, thread_part->disk, DataPartState::Active, part_loading_mutex);
bool is_active_part = res.part->getState() == DataPartState::Active && !res.part->is_duplicate;
if (!part_is_ok)
if (!is_active_part && !thread_part->children.empty())
{
std::lock_guard lock{part_select_mutex};
for (const auto & [_, node] : thread_part->children)
threads_parts[thread].push_back(node);
remaining_thread_parts.insert(thread);
}
loaded_parts.push_back(std::move(res));
}
});
}
@ -1256,53 +1358,37 @@ void MergeTreeData::loadDataPartsFromDisk(
}
pool.wait();
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
throw Exception(
"Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled",
ErrorCodes::LOGICAL_ERROR);
has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
if (has_lightweight_in_parts)
has_lightweight_delete_parts.store(true);
if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously many ({} parts, {} in total) broken parts to remove while maximum allowed broken parts count is {}. You can change the maximum value "
"with merge tree setting 'max_suspicious_broken_parts' in <merge_tree> configuration section or in table settings in .sql file "
"(don't forget to return setting back to default value)",
suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes), settings->max_suspicious_broken_parts);
if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously big size ({} parts, {} in total) of all broken parts to remove while maximum allowed broken parts size is {}. "
"You can change the maximum value with merge tree setting 'max_suspicious_broken_parts_bytes' in <merge_tree> configuration "
"section or in table settings in .sql file (don't forget to return setting back to default value)",
suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes),
formatReadableSizeWithBinarySuffix(settings->max_suspicious_broken_parts_bytes));
return loaded_parts;
}
void MergeTreeData::loadDataPartsFromWAL(
DataPartsVector & /* broken_parts_to_detach */,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal)
void MergeTreeData::loadDataPartsFromWAL(MutableDataPartsVector & parts_from_wal)
{
std::sort(parts_from_wal.begin(), parts_from_wal.end(), [](const auto & lhs, const auto & rhs)
{
return std::tie(lhs->info.level, lhs->info.mutation) > std::tie(rhs->info.level, rhs->info.mutation);
});
for (auto & part : parts_from_wal)
{
part->modification_time = time(nullptr);
/// Assume that all parts are Active, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Active);
auto lo = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{DataPartState::Active, part->info});
if (lo != data_parts_by_state_and_info.begin() && (*std::prev(lo))->info.contains(part->info))
continue;
if (lo != data_parts_by_state_and_info.end() && (*lo)->info.contains(part->info))
continue;
auto [it, inserted] = data_parts_indexes.insert(part);
if (!inserted)
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", part->data_part_storage->getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART);
}
@ -1402,9 +1488,10 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// Collect parts by disks' names.
std::map<String, PartLoadingTreeNodes> disk_part_map;
for (const auto & elem : loading_tree.getRoots())
for (const auto & [_, node] : elem.second->children)
disk_part_map[node->disk->getName()].emplace_back(node);
loading_tree.traverse(/*recursive=*/ false, [&](const auto & node)
{
disk_part_map[node->disk->getName()].emplace_back(node);
});
size_t num_parts = 0;
std::queue<PartLoadingTreeNodes> parts_queue;
@ -1426,9 +1513,40 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
DataPartsVector broken_parts_to_detach;
DataPartsVector duplicate_parts_to_remove;
size_t suspicious_broken_parts = 0;
size_t suspicious_broken_parts_bytes = 0;
bool have_adaptive_parts = false;
bool have_non_adaptive_parts = false;
bool have_lightweight_in_parts = false;
bool have_parts_with_version_metadata = false;
if (num_parts > 0)
loadDataPartsFromDisk(
broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
{
auto loaded_parts = loadDataPartsFromDisk(pool, num_parts, parts_queue, settings);
for (const auto & res : loaded_parts)
{
if (res.is_broken)
{
broken_parts_to_detach.push_back(res.part);
++suspicious_broken_parts;
if (res.size_of_part)
suspicious_broken_parts_bytes += *res.size_of_part;
}
else if (res.part->is_duplicate)
{
res.part->remove();
}
else
{
bool is_adaptive = res.part->index_granularity_info.mark_type.adaptive;
have_adaptive_parts |= is_adaptive;
have_non_adaptive_parts |= !is_adaptive;
have_lightweight_in_parts |= res.part->hasLightweightDelete();
have_parts_with_version_metadata |= res.part->wasInvolvedInTransaction();
}
}
}
if (settings->in_memory_parts_enable_wal)
{
@ -1478,8 +1596,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
parts_from_wal.insert(
parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end()));
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal);
loadDataPartsFromWAL(parts_from_wal);
num_parts += parts_from_wal.size();
}
@ -1490,6 +1607,30 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
return;
}
if (have_non_adaptive_parts && have_adaptive_parts && !settings->enable_mixed_granularity_parts)
throw Exception(
"Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled",
ErrorCodes::LOGICAL_ERROR);
has_non_adaptive_index_granularity_parts = have_non_adaptive_parts;
has_lightweight_delete_parts = have_lightweight_in_parts;
transactions_enabled = have_parts_with_version_metadata;
if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously many ({} parts, {} in total) broken parts to remove while maximum allowed broken parts count is {}. You can change the maximum value "
"with merge tree setting 'max_suspicious_broken_parts' in <merge_tree> configuration section or in table settings in .sql file "
"(don't forget to return setting back to default value)",
suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes), settings->max_suspicious_broken_parts);
if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously big size ({} parts, {} in total) of all broken parts to remove while maximum allowed broken parts size is {}. "
"You can change the maximum value with merge tree setting 'max_suspicious_broken_parts_bytes' in <merge_tree> configuration "
"section or in table settings in .sql file (don't forget to return setting back to default value)",
suspicious_broken_parts, formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes),
formatReadableSizeWithBinarySuffix(settings->max_suspicious_broken_parts_bytes));
for (auto & part : broken_parts_to_detach)
{
auto builder = part->data_part_storage->getBuilder();
@ -1497,178 +1638,69 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
builder->commit();
}
for (auto & part : duplicate_parts_to_remove)
part->remove();
auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it)
{
const DataPartPtr & part = *it;
part->remove_time.store(part->modification_time, std::memory_order_relaxed);
auto creation_csn = part->version.creation_csn.load(std::memory_order_relaxed);
if (creation_csn != Tx::RolledBackCSN && creation_csn != Tx::PrehistoricCSN && !part->version.isRemovalTIDLocked())
{
/// It's possible that covering part was created without transaction,
/// but if covered part was created with transaction (i.e. creation_tid is not prehistoric),
/// then it must have removal tid in metadata file.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Data part {} is Outdated and has creation TID {} and CSN {}, "
"but does not have removal tid. It's a bug or a result of manual intervention.",
part->name, part->version.creation_tid, creation_csn);
}
modifyPartState(it, DataPartState::Outdated);
removePartContributionToDataVolume(part);
/// Explicitly set removal_tid_lock for parts w/o transaction (i.e. w/o txn_version.txt)
/// to avoid keeping part forever (see VersionMetadata::canBeRemoved())
if (!part->version.isRemovalTIDLocked())
{
TransactionInfoContext transaction_context{getStorageID(), part->name};
part->version.lockRemovalTID(Tx::PrehistoricTID, transaction_context);
}
};
/// All parts are in "Active" state after loading
assert(std::find_if(data_parts_by_state_and_info.begin(), data_parts_by_state_and_info.end(),
[](const auto & part)
{
return part->getState() != DataPartState::Active;
}) == data_parts_by_state_and_info.end());
bool have_parts_with_version_metadata = false;
auto iter = data_parts_by_state_and_info.begin();
while (iter != data_parts_by_state_and_info.end() && (*iter)->getState() == DataPartState::Active)
{
const DataPartPtr & part = *iter;
part->loadVersionMetadata();
VersionMetadata & version = part->version;
if (part->wasInvolvedInTransaction())
{
have_parts_with_version_metadata = true;
}
else
{
++iter;
continue;
}
/// Check if CSNs were written after committing transaction, update and write if needed.
bool version_updated = false;
chassert(!version.creation_tid.isEmpty());
if (!part->version.creation_csn)
{
auto min = TransactionLog::getCSN(version.creation_tid);
if (!min)
{
/// Transaction that created this part was not committed. Remove part.
TransactionLog::assertTIDIsNotOutdated(version.creation_tid);
min = Tx::RolledBackCSN;
}
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has creation_tid={}, setting creation_csn={}",
part->name, version.creation_tid, min);
version.creation_csn = min;
version_updated = true;
}
if (!version.removal_tid.isEmpty() && !part->version.removal_csn)
{
auto max = TransactionLog::getCSN(version.removal_tid);
if (max)
{
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: part has removal_tid={}, setting removal_csn={}",
part->name, version.removal_tid, max);
version.removal_csn = max;
}
else
{
TransactionLog::assertTIDIsNotOutdated(version.removal_tid);
/// Transaction that tried to remove this part was not committed. Clear removal_tid.
LOG_TRACE(log, "Will fix version metadata of {} after unclean restart: clearing removal_tid={}",
part->name, version.removal_tid);
version.unlockRemovalTID(version.removal_tid, TransactionInfoContext{getStorageID(), part->name});
}
version_updated = true;
}
/// Sanity checks
bool csn_order = !version.removal_csn || version.creation_csn <= version.removal_csn || version.removal_csn == Tx::PrehistoricCSN;
bool min_start_csn_order = version.creation_tid.start_csn <= version.creation_csn;
bool max_start_csn_order = version.removal_tid.start_csn <= version.removal_csn;
bool creation_csn_known = version.creation_csn;
if (!csn_order || !min_start_csn_order || !max_start_csn_order || !creation_csn_known)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid version metadata: {}", part->name, version.toString());
if (version_updated)
part->storeVersionMetadata(/* force */ true);
/// Deactivate part if creation was not committed or if removal was.
if (version.creation_csn == Tx::RolledBackCSN || version.removal_csn)
{
auto next_it = std::next(iter);
deactivate_part(iter);
iter = next_it;
}
else
{
++iter;
}
}
if (have_parts_with_version_metadata)
transactions_enabled.store(true);
/// Delete from the set of current parts those parts that are covered by another part (those parts that
/// were merged), but that for some reason are still not deleted from the filesystem.
/// Deletion of files will be performed later in the clearOldParts() method.
auto active_parts_range = getDataPartsStateRange(DataPartState::Active);
auto prev_it = active_parts_range.begin();
auto end_it = active_parts_range.end();
bool less_than_two_active_parts = prev_it == end_it || std::next(prev_it) == end_it;
if (!less_than_two_active_parts)
{
(*prev_it)->assertState({DataPartState::Active});
auto curr_it = std::next(prev_it);
while (curr_it != data_parts_by_state_and_info.end() && (*curr_it)->getState() == DataPartState::Active)
{
(*curr_it)->assertState({DataPartState::Active});
/// Don't consider data parts belonging to different partitions.
if ((*curr_it)->info.partition_id != (*prev_it)->info.partition_id)
{
++prev_it;
++curr_it;
continue;
}
if ((*curr_it)->contains(**prev_it))
{
deactivate_part(prev_it);
prev_it = curr_it;
++curr_it;
}
else if ((*prev_it)->contains(**curr_it))
{
auto next = std::next(curr_it);
deactivate_part(curr_it);
curr_it = next;
}
else
{
++prev_it;
++curr_it;
}
}
}
resetObjectColumnsFromActiveParts(part_lock);
calculateColumnAndSecondaryIndexSizesImpl();
PartLoadingTreeNodes unloaded_parts;
loading_tree.traverse(/*recursive=*/ true, [&](const auto & node)
{
if (!node->is_loaded)
unloaded_parts.push_back(node);
});
{
std::lock_guard lock(unloaded_outdated_parts_mutex);
unloaded_outdated_parts = std::move(unloaded_parts);
}
LOG_DEBUG(log, "Loaded data parts ({} items)", data_parts_indexes.size());
data_parts_loading_finished = true;
}
void MergeTreeData::loadOutdatedDataParts(PartLoadingTreeNodes parts_to_load)
{
LOG_DEBUG(log, "Loading {} outdated data parts", parts_to_load.size());
if (parts_to_load.empty())
return;
for (const auto & part : parts_to_load)
{
auto res = loadDataPart(part->info, part->name, part->disk, MergeTreeDataPartState::Outdated, data_parts_mutex);
if (res.is_broken)
{
auto builder = res.part->data_part_storage->getBuilder();
res.part->renameToDetached("broken-on-start", builder); /// detached parts must not have '_' in prefixes
builder->commit();
}
else if (res.part->is_duplicate)
{
res.part->remove();
}
else
{
preparePartForRemoval(res.part);
}
}
LOG_DEBUG(log, "Loaded {} outdated data parts", parts_to_load.size());
}
void MergeTreeData::scheduleOutdatedDataPartsLoadingJob(BackgroundJobsAssignee & assignee)
{
std::lock_guard lock(unloaded_outdated_parts_mutex);
if (unloaded_outdated_parts.empty())
return;
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>(
[this, share_lock, parts_to_load = std::move(unloaded_outdated_parts)]
{
loadOutdatedDataParts(std::move(parts_to_load));
return false;
}, common_assignee_trigger, getStorageID()), /*need_trigger=*/ false);
}
/// Is the part directory old.
/// True if its modification time and the modification time of all files inside it is less then threshold.
/// (Only files on the first level of nesting are considered).

View File

@ -1021,7 +1021,6 @@ protected:
/// under lockForShare if rename is possible.
String relative_data_path;
/// Current column sizes in compressed and uncompressed form.
ColumnSizeByName column_sizes;
@ -1260,6 +1259,53 @@ protected:
void resetObjectColumnsFromActiveParts(const DataPartsLock & lock);
void updateObjectColumns(const DataPartPtr & part, const DataPartsLock & lock);
class PartLoadingTree
{
public:
struct Node
{
Node(const MergeTreePartInfo & info_, const String & name_, const DiskPtr & disk_)
: info(info_), name(name_), disk(disk_)
{
}
const MergeTreePartInfo info;
const String name;
const DiskPtr disk;
bool is_loaded = false;
std::map<MergeTreePartInfo, std::shared_ptr<Node>> children;
};
using NodePtr = std::shared_ptr<Node>;
using PartLoadingInfo = std::tuple<MergeTreePartInfo, String, DiskPtr>;
static PartLoadingTree build(std::vector<PartLoadingInfo> nodes);
template <typename Func>
void traverse(bool recursive, Func && func);
private:
void add(const MergeTreePartInfo & info, const String & name, const DiskPtr & disk);
std::unordered_map<String, NodePtr> root_by_partition;
};
using PartLoadingTreeNodes = std::vector<PartLoadingTree::NodePtr>;
struct LoadPartResult
{
bool is_broken = false;
std::optional<size_t> size_of_part;
MutableDataPartPtr part;
};
mutable std::mutex unloaded_outdated_parts_mutex;
PartLoadingTreeNodes unloaded_outdated_parts TSA_GUARDED_BY(unloaded_outdated_parts_mutex);
void loadOutdatedDataParts(PartLoadingTreeNodes parts_to_load);
void scheduleOutdatedDataPartsLoadingJob(BackgroundJobsAssignee & assignee);
static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
@ -1333,49 +1379,20 @@ private:
/// Returns default settings for storage with possible changes from global config.
virtual std::unique_ptr<MergeTreeSettings> getDefaultSettings() const = 0;
class PartLoadingTree
{
public:
struct Node
{
Node(const MergeTreePartInfo & info_, const String & name_, const DiskPtr & disk_)
: info(info_), name(name_), disk(disk_)
{
}
LoadPartResult loadDataPart(
const MergeTreePartInfo & part_info,
const String & part_name,
const DiskPtr & part_disk_ptr,
MergeTreeDataPartState to_state,
std::mutex & part_loading_mutex);
const MergeTreePartInfo info;
const String name;
const DiskPtr disk;
std::map<MergeTreePartInfo, std::shared_ptr<Node>> children;
};
using NodePtr = std::shared_ptr<Node>;
using PartLoadingInfo = std::tuple<MergeTreePartInfo, String, DiskPtr>;
static PartLoadingTree build(std::vector<PartLoadingInfo> nodes);
void add(const MergeTreePartInfo & info, const String & name, const DiskPtr & disk);
const std::unordered_map<String, NodePtr> & getRoots() const { return root_by_partition; }
private:
std::unordered_map<String, NodePtr> root_by_partition;
};
using PartLoadingTreeNodes = std::vector<PartLoadingTree::NodePtr>;
void loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
std::vector<LoadPartResult> loadDataPartsFromDisk(
ThreadPool & pool,
size_t num_parts,
std::queue<PartLoadingTreeNodes> & parts_queue,
bool skip_sanity_checks,
const MergeTreeSettingsPtr & settings);
void loadDataPartsFromWAL(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal);
void loadDataPartsFromWAL(MutableDataPartsVector & parts_from_wal);
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.

View File

@ -158,6 +158,9 @@ bool ReplicatedMergeTreeRestartingThread::runImpl()
storage.cleanup_thread.start();
storage.part_check_thread.start();
if (first_time)
storage.scheduleOutdatedDataPartsLoadingJob(storage.background_operations_assignee);
return true;
}

View File

@ -110,7 +110,6 @@ StorageMergeTree::StorageMergeTree(
increment.set(getMaxBlockNumber());
loadMutations();
loadDeduplicationLog();
}
@ -136,6 +135,7 @@ void StorageMergeTree::startup()
try
{
background_operations_assignee.start();
scheduleOutdatedDataPartsLoadingJob(background_operations_assignee);
startBackgroundMovesIfNeeded();
}
catch (...)
@ -1216,6 +1216,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
scheduled = true;
}
return scheduled;
}

View File

@ -0,0 +1,9 @@
<clickhouse>
<background_processing_pool_thread_sleep_seconds>1</background_processing_pool_thread_sleep_seconds>
<background_processing_pool_thread_sleep_seconds_random_part>0</background_processing_pool_thread_sleep_seconds_random_part>
<background_processing_pool_thread_sleep_seconds_if_nothing_to_do>0.0</background_processing_pool_thread_sleep_seconds_if_nothing_to_do>
<background_processing_pool_task_sleep_seconds_when_no_work_min>0</background_processing_pool_task_sleep_seconds_when_no_work_min>
<background_processing_pool_task_sleep_seconds_when_no_work_max>1</background_processing_pool_task_sleep_seconds_when_no_work_max>
<background_processing_pool_task_sleep_seconds_when_no_work_multiplier>1</background_processing_pool_task_sleep_seconds_when_no_work_multiplier>
<background_processing_pool_task_sleep_seconds_when_no_work_random_part>0</background_processing_pool_task_sleep_seconds_when_no_work_random_part>
</clickhouse>

View File

@ -1,12 +1,23 @@
import pytest
import helpers.client
import helpers.cluster
import time
from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk
cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True, stay_alive=True)
node2 = cluster.add_instance("node2", with_zookeeper=True, stay_alive=True)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/fast_background_pool.xml"],
with_zookeeper=True,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/fast_background_pool.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
@ -35,14 +46,14 @@ def test_merge_tree_load_parts(started_cluster):
node1.restart_clickhouse(kill=True)
for i in range(1, 21):
assert node1.contains_in_log(f"Loading part 44_{i}_{i}_0")
assert node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0")
node1.query("OPTIMIZE TABLE mt_load_parts FINAL")
node1.restart_clickhouse(kill=True)
assert node1.contains_in_log("Loading part 44_1_20")
assert node1.contains_in_log("Loading Active part 44_1_20")
for i in range(1, 21):
assert not node1.contains_in_log(f"Loading part 44_{i}_{i}_0")
assert not node1.contains_in_log(f"Loading Active part 44_{i}_{i}_0")
assert node1.query("SELECT count() FROM mt_load_parts") == "20\n"
assert (
@ -52,6 +63,45 @@ def test_merge_tree_load_parts(started_cluster):
== "1\n"
)
MAX_RETRY = 20
all_outdated_loaded = False
for _ in range(MAX_RETRY):
all_outdated_loaded = all(
[
node1.contains_in_log(f"Loading Outdated part 44_{i}_{i}_0")
for i in range(1, 21)
]
)
if all_outdated_loaded:
break
time.sleep(2)
assert all_outdated_loaded
node1.query("ALTER TABLE mt_load_parts MODIFY SETTING old_parts_lifetime = 1")
node1.query("DETACH TABLE mt_load_parts")
node1.query("ATTACH TABLE mt_load_parts")
table_path = node1.query(
"SELECT data_paths[1] FROM system.tables WHERE table = 'mt_load_parts'"
).strip()
part_dirs_ok = False
for _ in range(MAX_RETRY):
part_dirs = node1.exec_in_container(
["bash", "-c", f"ls {table_path}"], user="root"
)
part_dirs = list(
set(part_dirs.strip().split("\n")) - {"detached", "format_version.txt"}
)
part_dirs_ok = len(part_dirs) == 1 and part_dirs[0].startswith("44_1_20")
if part_dirs_ok:
break
time.sleep(2)
assert part_dirs_ok
def test_merge_tree_load_parts_corrupted(started_cluster):
for i, node in enumerate([node1, node2]):
@ -66,6 +116,7 @@ def test_merge_tree_load_parts_corrupted(started_cluster):
node1.query(
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 0, randomPrintableASCII(10))"
)
node1.query(
f"INSERT INTO mt_load_parts_2 VALUES ({partition}, 1, randomPrintableASCII(10))"
)
@ -101,18 +152,18 @@ def test_merge_tree_load_parts_corrupted(started_cluster):
def check_parts_loading(node, partition, loaded, failed, skipped):
for (min_block, max_block) in loaded:
part_name = f"{partition}_{min_block}_{max_block}"
assert node.contains_in_log(f"Loading part {part_name}")
assert node.contains_in_log(f"Finished part {part_name}")
assert node.contains_in_log(f"Loading Active part {part_name}")
assert node.contains_in_log(f"Finished loading Active part {part_name}")
for (min_block, max_block) in failed:
part_name = f"{partition}_{min_block}_{max_block}"
assert node.contains_in_log(f"Loading part {part_name}")
assert not node.contains_in_log(f"Finished part {part_name}")
assert node.contains_in_log(f"Loading Active part {part_name}")
assert not node.contains_in_log(f"Finished loading Active part {part_name}")
for (min_block, max_block) in skipped:
part_name = f"{partition}_{min_block}_{max_block}"
assert not node.contains_in_log(f"Loading part {part_name}")
assert not node.contains_in_log(f"Finished part {part_name}")
assert not node.contains_in_log(f"Loading Active part {part_name}")
assert not node.contains_in_log(f"Finished loading Active part {part_name}")
check_parts_loading(
node1, 111, loaded=[(0, 1), (2, 2)], failed=[(0, 2)], skipped=[(0, 0), (1, 1)]