Speed up part loading for JBOD

This commit is contained in:
Amos Bird 2021-08-30 23:29:09 +08:00
parent d0a3420c3b
commit 23602f4607
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
2 changed files with 333 additions and 167 deletions

View File

@ -897,6 +897,261 @@ Int64 MergeTreeData::getMaxBlockNumber() const
return max_block_num;
}
void MergeTreeData::loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
ThreadPool & pool,
size_t num_parts,
std::queue<std::vector<std::pair<String, DiskPtr>>> & parts_queue,
bool skip_sanity_checks,
const MergeTreeSettingsPtr & settings)
{
/// Parallel loading of data parts.
pool.setMaxThreads(std::min(size_t(settings->max_part_loading_threads), num_parts));
size_t num_threads = pool.getMaxThreads();
std::vector<size_t> parts_per_thread(num_threads, num_parts / num_threads);
for (size_t i = 0ul; i < num_parts % num_threads; ++i)
++parts_per_thread[i];
/// Prepare data parts for parallel loading. Threads will focus on given disk first, then steal
/// others' tasks when finish current disk part loading process.
std::vector<std::vector<std::pair<String, DiskPtr>>> threads_parts(num_threads);
std::set<size_t> remaining_thread_parts;
std::queue<size_t> threads_queue;
for (size_t i = 0; i < num_threads; ++i)
{
remaining_thread_parts.insert(i);
threads_queue.push(i);
}
while (!parts_queue.empty())
{
assert(!threads_queue.empty());
size_t i = threads_queue.front();
auto & need_parts = parts_per_thread[i];
assert(need_parts > 0);
auto & thread_parts = threads_parts[i];
auto & current_parts = parts_queue.front();
assert(!current_parts.empty());
auto parts_to_grab = std::min(need_parts, current_parts.size());
thread_parts.insert(thread_parts.end(), current_parts.end() - parts_to_grab, current_parts.end());
current_parts.resize(current_parts.size() - parts_to_grab);
need_parts -= parts_to_grab;
/// Before processing next thread, change disk if possible.
/// Different threads will likely start loading parts from different disk,
/// which may improve read parallelism for JBOD.
/// If current disk still has some parts, push it to the tail.
if (!current_parts.empty())
parts_queue.push(std::move(current_parts));
parts_queue.pop();
/// If current thread still want some parts, push it to the tail.
if (need_parts > 0)
threads_queue.push(i);
threads_queue.pop();
}
assert(threads_queue.empty());
assert(std::all_of(threads_parts.begin(), threads_parts.end(), [](const std::vector<std::pair<String, DiskPtr>> & parts)
{
return !parts.empty();
}));
size_t suspicious_broken_parts = 0;
size_t suspicious_broken_parts_bytes = 0;
std::atomic<bool> has_adaptive_parts = false;
std::atomic<bool> has_non_adaptive_parts = false;
std::mutex mutex;
auto load_part = [&](const String & part_name, const DiskPtr & part_disk_ptr)
{
auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version);
if (!part_opt)
return;
const auto & part_info = *part_opt;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto part = createPart(part_name, part_info, single_disk_volume, part_name);
bool broken = false;
String part_path = fs::path(relative_data_path) / part_name;
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
if (part_disk_ptr->exists(marker_path))
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath());
LOG_WARNING(log,
"Detaching stale part {}{} (size: {}), which should have been deleted after a move. "
"That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.",
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
suspicious_broken_parts_bytes += size_of_part;
return;
}
try
{
part->loadColumnsChecksumsIndexes(require_part_metadata, true);
}
catch (const Exception & e)
{
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (isNotEnoughMemoryErrorCode(e.code()))
throw;
broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// Ignore broken parts that can appear as a result of hard server restart.
if (broken)
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath());
LOG_ERROR(log,
"Detaching broken part {}{} (size: {}). "
"If it happened after update, it is likely because of backward incompability. "
"You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
suspicious_broken_parts_bytes += size_of_part;
return;
}
if (!part->index_granularity_info.is_adaptive)
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
else
has_adaptive_parts.store(true, std::memory_order_relaxed);
part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime();
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Committed);
std::lock_guard loading_lock(mutex);
auto [it, inserted] = data_parts_indexes.insert(part);
/// Remove duplicate parts with the same checksum.
if (!inserted)
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", part->getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART);
}
addPartContributionToDataVolume(part);
};
std::mutex part_select_mutex;
try
{
for (size_t thread = 0; thread < num_threads; ++thread)
{
pool.scheduleOrThrowOnError([&, thread]
{
while (true)
{
std::pair<String, DiskPtr> thread_part;
{
const std::lock_guard lock{part_select_mutex};
if (remaining_thread_parts.empty())
return;
/// Steal task if nothing to do
auto thread_idx = thread;
if (threads_parts[thread].empty())
{
// Try random steal tasks from the next thread
std::uniform_int_distribution<size_t> distribution(0, remaining_thread_parts.size() - 1);
auto it = remaining_thread_parts.begin();
std::advance(it, distribution(thread_local_rng));
thread_idx = *it;
}
auto & thread_parts = threads_parts[thread_idx];
thread_part = thread_parts.back();
thread_parts.pop_back();
if (thread_parts.empty())
remaining_thread_parts.erase(thread_idx);
}
load_part(thread_part.first, thread_part.second);
}
});
}
}
catch (...)
{
/// If this is not done, then in case of an exception, tasks will be destroyed before the threads are completed, and it will be bad.
pool.wait();
throw;
}
pool.wait();
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
throw Exception(
"Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled",
ErrorCodes::LOGICAL_ERROR);
has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously many ({}) broken parts to remove.",
suspicious_broken_parts);
if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously big size ({}) of all broken parts to remove.",
formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes));
}
void MergeTreeData::loadDataPartsFromWAL(
DataPartsVector & /* broken_parts_to_detach */,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal,
DataPartsLock & part_lock)
{
for (auto & part : parts_from_wal)
{
if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock))
continue;
part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Committed);
auto [it, inserted] = data_parts_indexes.insert(part);
if (!inserted)
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", part->getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART);
}
addPartContributionToDataVolume(part);
}
}
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
@ -904,7 +1159,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto settings = getSettings();
std::vector<std::pair<String, DiskPtr>> part_names_with_disks;
MutableDataPartsVector parts_from_wal;
Strings part_file_names;
@ -934,193 +1188,90 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
}
/// Reversed order to load part from low priority disks firstly.
/// Used for keep part on low priority disk if duplication found
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
/// Collect part names by disk.
std::map<String, std::vector<std::pair<String, DiskPtr>>> disk_part_map;
std::map<String, MutableDataPartsVector> disk_wal_part_map;
ThreadPool pool(disks.size());
std::mutex wal_init_lock;
for (const auto & disk_ptr : disks)
{
auto disk_ptr = *disk_it;
auto & disk_parts = disk_part_map[disk_ptr->getName()];
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
pool.scheduleOrThrowOnError([&, disk_ptr]()
{
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp")
|| it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
part_names_with_disks.emplace_back(it->name(), disk_ptr);
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
/// Create and correctly initialize global WAL object
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
{
std::unique_lock lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
ErrorCodes::CORRUPTED_DATA);
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
disk_wal_parts.push_back(std::move(part));
}
else if (settings->in_memory_parts_enable_wal)
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot, getContext()))
disk_wal_parts.push_back(std::move(part));
}
}
else if (settings->in_memory_parts_enable_wal)
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
}
}
auto part_lock = lockParts();
data_parts_indexes.clear();
if (part_names_with_disks.empty() && parts_from_wal.empty())
{
LOG_DEBUG(log, "There are no data parts");
return;
}
/// Parallel loading of data parts.
size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_names_with_disks.size());
std::mutex mutex;
DataPartsVector broken_parts_to_detach;
size_t suspicious_broken_parts = 0;
size_t suspicious_broken_parts_bytes = 0;
std::atomic<bool> has_adaptive_parts = false;
std::atomic<bool> has_non_adaptive_parts = false;
ThreadPool pool(num_threads);
for (auto & part_names_with_disk : part_names_with_disks)
{
pool.scheduleOrThrowOnError([&]
{
const auto & [part_name, part_disk_ptr] = part_names_with_disk;
auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version);
if (!part_opt)
return;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto part = createPart(part_name, *part_opt, single_disk_volume, part_name);
bool broken = false;
String part_path = fs::path(relative_data_path) / part_name;
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
if (part_disk_ptr->exists(marker_path))
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath());
LOG_WARNING(log,
"Detaching stale part {}{} (size: {}), which should have been deleted after a move. "
"That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.",
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
suspicious_broken_parts_bytes += size_of_part;
return;
}
try
{
part->loadColumnsChecksumsIndexes(require_part_metadata, true);
}
catch (const Exception & e)
{
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (isNotEnoughMemoryErrorCode(e.code()))
throw;
broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// Ignore broken parts that can appear as a result of hard server restart.
if (broken)
{
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
size_t size_of_part = IMergeTreeDataPart::calculateTotalSizeOnDisk(part->volume->getDisk(), part->getFullRelativePath());
LOG_ERROR(log,
"Detaching broken part {}{} (size: {}). "
"If it happened after update, it is likely because of backward incompability. "
"You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
suspicious_broken_parts_bytes += size_of_part;
return;
}
if (!part->index_granularity_info.is_adaptive)
has_non_adaptive_parts.store(true, std::memory_order_relaxed);
else
has_adaptive_parts.store(true, std::memory_order_relaxed);
part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime();
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Committed);
std::lock_guard loading_lock(mutex);
if (!data_parts_indexes.insert(part).second)
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists", part->name);
addPartContributionToDataVolume(part);
});
}
pool.wait();
for (auto & part : parts_from_wal)
for (auto & [_, disk_wal_parts] : disk_wal_part_map)
parts_from_wal.insert(
parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end()));
size_t num_parts = 0;
std::queue<std::vector<std::pair<String, DiskPtr>>> parts_queue;
for (auto & [_, disk_parts] : disk_part_map)
{
if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock))
if (disk_parts.empty())
continue;
part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Committed);
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
addPartContributionToDataVolume(part);
num_parts += disk_parts.size();
parts_queue.push(std::move(disk_parts));
}
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR);
auto part_lock = lockParts();
data_parts_indexes.clear();
has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
if (num_parts == 0 && parts_from_wal.empty())
{
LOG_DEBUG(log, "There are no data parts");
return;
}
if (suspicious_broken_parts > settings->max_suspicious_broken_parts && !skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously many ({}) broken parts to remove.",
suspicious_broken_parts);
if (suspicious_broken_parts_bytes > settings->max_suspicious_broken_parts_bytes && !skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously big size ({}) of all broken parts to remove.",
formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes));
DataPartsVector broken_parts_to_detach;
DataPartsVector duplicate_parts_to_remove;
if (num_parts > 0)
loadDataPartsFromDisk(
broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
if (!parts_from_wal.empty())
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
for (auto & part : broken_parts_to_detach)
part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
for (auto & part : duplicate_parts_to_remove)
part->remove();
/// Delete from the set of current parts those parts that are covered by another part (those parts that
/// were merged), but that for some reason are still not deleted from the filesystem.

View File

@ -1143,6 +1143,21 @@ private:
/// Returns default settings for storage with possible changes from global config.
virtual std::unique_ptr<MergeTreeSettings> getDefaultSettings() const = 0;
void loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
ThreadPool & pool,
size_t num_parts,
std::queue<std::vector<std::pair<String, DiskPtr>>> & parts_queue,
bool skip_sanity_checks,
const MergeTreeSettingsPtr & settings);
void loadDataPartsFromWAL(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal,
DataPartsLock & part_lock);
};
/// RAII struct to record big parts that are submerging or emerging.