Merge pull request #28363 from amosbird/fastload2

Speed up part loading for JBOD
This commit is contained in:
Anton Popov 2021-10-16 02:37:32 +03:00 committed by GitHub
commit d2ba3bcaa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 333 additions and 167 deletions

View File

@ -896,116 +896,86 @@ Int64 MergeTreeData::getMaxBlockNumber() const
return max_block_num; return max_block_num;
} }
void MergeTreeData::loadDataPartsFromDisk(
void MergeTreeData::loadDataParts(bool skip_sanity_checks) DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
ThreadPool & pool,
size_t num_parts,
std::queue<std::vector<std::pair<String, DiskPtr>>> & parts_queue,
bool skip_sanity_checks,
const MergeTreeSettingsPtr & settings)
{ {
LOG_DEBUG(log, "Loading data parts");
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto settings = getSettings();
std::vector<std::pair<String, DiskPtr>> part_names_with_disks;
MutableDataPartsVector parts_from_wal;
Strings part_file_names;
auto disks = getStoragePolicy()->getDisks();
/// Only check if user did touch storage configuration for this table.
if (!getStoragePolicy()->isDefaultPolicy() && !skip_sanity_checks)
{
/// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks.
std::unordered_set<String> defined_disk_names;
for (const auto & disk_ptr : disks)
defined_disk_names.insert(disk_ptr->getName());
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (defined_disk_names.count(disk_name) == 0 && disk->exists(relative_data_path))
{
for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
if (MergeTreePartInfo::tryParsePartName(it->name(), format_version))
throw Exception(ErrorCodes::UNKNOWN_DISK,
"Part {} was found on disk {} which is not defined in the storage policy",
backQuote(it->name()), backQuote(disk_name));
}
}
}
}
/// Reversed order to load part from low priority disks firstly.
/// Used for keep part on low priority disk if duplication found
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
{
auto disk_ptr = *disk_it;
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp")
|| it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
part_names_with_disks.emplace_back(it->name(), disk_ptr);
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
{
/// Create and correctly initialize global WAL object
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
else if (settings->in_memory_parts_enable_wal)
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
}
}
auto part_lock = lockParts();
data_parts_indexes.clear();
if (part_names_with_disks.empty() && parts_from_wal.empty())
{
LOG_DEBUG(log, "There are no data parts");
return;
}
/// Parallel loading of data parts. /// Parallel loading of data parts.
size_t num_threads = std::min(size_t(settings->max_part_loading_threads), part_names_with_disks.size()); pool.setMaxThreads(std::min(size_t(settings->max_part_loading_threads), num_parts));
size_t num_threads = pool.getMaxThreads();
std::vector<size_t> parts_per_thread(num_threads, num_parts / num_threads);
for (size_t i = 0ul; i < num_parts % num_threads; ++i)
++parts_per_thread[i];
std::mutex mutex; /// Prepare data parts for parallel loading. Threads will focus on given disk first, then steal
/// others' tasks when finish current disk part loading process.
std::vector<std::vector<std::pair<String, DiskPtr>>> threads_parts(num_threads);
std::set<size_t> remaining_thread_parts;
std::queue<size_t> threads_queue;
for (size_t i = 0; i < num_threads; ++i)
{
remaining_thread_parts.insert(i);
threads_queue.push(i);
}
while (!parts_queue.empty())
{
assert(!threads_queue.empty());
size_t i = threads_queue.front();
auto & need_parts = parts_per_thread[i];
assert(need_parts > 0);
auto & thread_parts = threads_parts[i];
auto & current_parts = parts_queue.front();
assert(!current_parts.empty());
auto parts_to_grab = std::min(need_parts, current_parts.size());
thread_parts.insert(thread_parts.end(), current_parts.end() - parts_to_grab, current_parts.end());
current_parts.resize(current_parts.size() - parts_to_grab);
need_parts -= parts_to_grab;
/// Before processing next thread, change disk if possible.
/// Different threads will likely start loading parts from different disk,
/// which may improve read parallelism for JBOD.
/// If current disk still has some parts, push it to the tail.
if (!current_parts.empty())
parts_queue.push(std::move(current_parts));
parts_queue.pop();
/// If current thread still want some parts, push it to the tail.
if (need_parts > 0)
threads_queue.push(i);
threads_queue.pop();
}
assert(threads_queue.empty());
assert(std::all_of(threads_parts.begin(), threads_parts.end(), [](const std::vector<std::pair<String, DiskPtr>> & parts)
{
return !parts.empty();
}));
DataPartsVector broken_parts_to_detach;
size_t suspicious_broken_parts = 0; size_t suspicious_broken_parts = 0;
size_t suspicious_broken_parts_bytes = 0; size_t suspicious_broken_parts_bytes = 0;
std::atomic<bool> has_adaptive_parts = false; std::atomic<bool> has_adaptive_parts = false;
std::atomic<bool> has_non_adaptive_parts = false; std::atomic<bool> has_non_adaptive_parts = false;
ThreadPool pool(num_threads); std::mutex mutex;
auto load_part = [&](const String & part_name, const DiskPtr & part_disk_ptr)
for (auto & part_names_with_disk : part_names_with_disks)
{ {
pool.scheduleOrThrowOnError([&]
{
const auto & [part_name, part_disk_ptr] = part_names_with_disk;
auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version); auto part_opt = MergeTreePartInfo::tryParsePartName(part_name, format_version);
if (!part_opt) if (!part_opt)
return; return;
const auto & part_info = *part_opt;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0); auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
auto part = createPart(part_name, *part_opt, single_disk_volume, part_name); auto part = createPart(part_name, part_info, single_disk_volume, part_name);
bool broken = false; bool broken = false;
String part_path = fs::path(relative_data_path) / part_name; String part_path = fs::path(relative_data_path) / part_name;
String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME; String marker_path = fs::path(part_path) / IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
if (part_disk_ptr->exists(marker_path)) if (part_disk_ptr->exists(marker_path))
{ {
/// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist /// NOTE: getBytesOnDisk() cannot be used here, since it maybe zero of checksums.txt will not exist
@ -1015,12 +985,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
"That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.", "That can only happen after unclean restart of ClickHouse after move of a part having an operation blocking that stale copy of part.",
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part)); getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
std::lock_guard loading_lock(mutex); std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part); broken_parts_to_detach.push_back(part);
++suspicious_broken_parts; ++suspicious_broken_parts;
suspicious_broken_parts_bytes += size_of_part; suspicious_broken_parts_bytes += size_of_part;
return; return;
} }
@ -1057,53 +1024,87 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
"You need to resolve this manually", "You need to resolve this manually",
getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part)); getFullPathOnDisk(part_disk_ptr), part_name, formatReadableSizeWithBinarySuffix(size_of_part));
std::lock_guard loading_lock(mutex); std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part); broken_parts_to_detach.push_back(part);
++suspicious_broken_parts; ++suspicious_broken_parts;
suspicious_broken_parts_bytes += size_of_part; suspicious_broken_parts_bytes += size_of_part;
return; return;
} }
if (!part->index_granularity_info.is_adaptive) if (!part->index_granularity_info.is_adaptive)
has_non_adaptive_parts.store(true, std::memory_order_relaxed); has_non_adaptive_parts.store(true, std::memory_order_relaxed);
else else
has_adaptive_parts.store(true, std::memory_order_relaxed); has_adaptive_parts.store(true, std::memory_order_relaxed);
part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime(); part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime();
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later /// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Committed); part->setState(DataPartState::Committed);
std::lock_guard loading_lock(mutex); std::lock_guard loading_lock(mutex);
auto [it, inserted] = data_parts_indexes.insert(part);
if (!data_parts_indexes.insert(part).second) /// Remove duplicate parts with the same checksum.
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists", part->name); if (!inserted)
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", part->getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART);
}
addPartContributionToDataVolume(part); addPartContributionToDataVolume(part);
};
std::mutex part_select_mutex;
try
{
for (size_t thread = 0; thread < num_threads; ++thread)
{
pool.scheduleOrThrowOnError([&, thread]
{
while (true)
{
std::pair<String, DiskPtr> thread_part;
{
const std::lock_guard lock{part_select_mutex};
if (remaining_thread_parts.empty())
return;
/// Steal task if nothing to do
auto thread_idx = thread;
if (threads_parts[thread].empty())
{
// Try random steal tasks from the next thread
std::uniform_int_distribution<size_t> distribution(0, remaining_thread_parts.size() - 1);
auto it = remaining_thread_parts.begin();
std::advance(it, distribution(thread_local_rng));
thread_idx = *it;
}
auto & thread_parts = threads_parts[thread_idx];
thread_part = thread_parts.back();
thread_parts.pop_back();
if (thread_parts.empty())
remaining_thread_parts.erase(thread_idx);
}
load_part(thread_part.first, thread_part.second);
}
}); });
} }
}
catch (...)
{
/// If this is not done, then in case of an exception, tasks will be destroyed before the threads are completed, and it will be bad.
pool.wait();
throw;
}
pool.wait(); pool.wait();
for (auto & part : parts_from_wal)
{
if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock))
continue;
part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Committed);
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
addPartContributionToDataVolume(part);
}
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts) if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
throw Exception("Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled", ErrorCodes::LOGICAL_ERROR); throw Exception(
"Table contains parts with adaptive and non adaptive marks, but `setting enable_mixed_granularity_parts` is disabled",
ErrorCodes::LOGICAL_ERROR);
has_non_adaptive_index_granularity_parts = has_non_adaptive_parts; has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
@ -1116,10 +1117,160 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS, throw Exception(ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS,
"Suspiciously big size ({}) of all broken parts to remove.", "Suspiciously big size ({}) of all broken parts to remove.",
formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes)); formatReadableSizeWithBinarySuffix(suspicious_broken_parts_bytes));
}
void MergeTreeData::loadDataPartsFromWAL(
DataPartsVector & /* broken_parts_to_detach */,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal,
DataPartsLock & part_lock)
{
for (auto & part : parts_from_wal)
{
if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock))
continue;
part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->setState(DataPartState::Committed);
auto [it, inserted] = data_parts_indexes.insert(part);
if (!inserted)
{
if ((*it)->checksums.getTotalChecksumHex() == part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", part->getFullPath());
duplicate_parts_to_remove.push_back(part);
}
else
throw Exception("Part " + part->name + " already exists but with different checksums", ErrorCodes::DUPLICATE_DATA_PART);
}
addPartContributionToDataVolume(part);
}
}
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
LOG_DEBUG(log, "Loading data parts");
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto settings = getSettings();
MutableDataPartsVector parts_from_wal;
Strings part_file_names;
auto disks = getStoragePolicy()->getDisks();
/// Only check if user did touch storage configuration for this table.
if (!getStoragePolicy()->isDefaultPolicy() && !skip_sanity_checks)
{
/// Check extra parts at different disks, in order to not allow to miss data parts at undefined disks.
std::unordered_set<String> defined_disk_names;
for (const auto & disk_ptr : disks)
defined_disk_names.insert(disk_ptr->getName());
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (defined_disk_names.count(disk_name) == 0 && disk->exists(relative_data_path))
{
for (const auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
if (MergeTreePartInfo::tryParsePartName(it->name(), format_version))
throw Exception(ErrorCodes::UNKNOWN_DISK,
"Part {} was found on disk {} which is not defined in the storage policy",
backQuote(it->name()), backQuote(disk_name));
}
}
}
}
/// Collect part names by disk.
std::map<String, std::vector<std::pair<String, DiskPtr>>> disk_part_map;
std::map<String, MutableDataPartsVector> disk_wal_part_map;
ThreadPool pool(disks.size());
std::mutex wal_init_lock;
for (const auto & disk_ptr : disks)
{
auto & disk_parts = disk_part_map[disk_ptr->getName()];
auto & disk_wal_parts = disk_wal_part_map[disk_ptr->getName()];
pool.scheduleOrThrowOnError([&, disk_ptr]()
{
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
{
std::unique_lock lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
ErrorCodes::CORRUPTED_DATA);
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
disk_wal_parts.push_back(std::move(part));
}
else if (settings->in_memory_parts_enable_wal)
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot, getContext()))
disk_wal_parts.push_back(std::move(part));
}
}
});
}
pool.wait();
for (auto & [_, disk_wal_parts] : disk_wal_part_map)
parts_from_wal.insert(
parts_from_wal.end(), std::make_move_iterator(disk_wal_parts.begin()), std::make_move_iterator(disk_wal_parts.end()));
size_t num_parts = 0;
std::queue<std::vector<std::pair<String, DiskPtr>>> parts_queue;
for (auto & [_, disk_parts] : disk_part_map)
{
if (disk_parts.empty())
continue;
num_parts += disk_parts.size();
parts_queue.push(std::move(disk_parts));
}
auto part_lock = lockParts();
data_parts_indexes.clear();
if (num_parts == 0 && parts_from_wal.empty())
{
LOG_DEBUG(log, "There are no data parts");
return;
}
DataPartsVector broken_parts_to_detach;
DataPartsVector duplicate_parts_to_remove;
if (num_parts > 0)
loadDataPartsFromDisk(
broken_parts_to_detach, duplicate_parts_to_remove, pool, num_parts, parts_queue, skip_sanity_checks, settings);
if (!parts_from_wal.empty())
loadDataPartsFromWAL(broken_parts_to_detach, duplicate_parts_to_remove, parts_from_wal, part_lock);
for (auto & part : broken_parts_to_detach) for (auto & part : broken_parts_to_detach)
part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
for (auto & part : duplicate_parts_to_remove)
part->remove();
/// Delete from the set of current parts those parts that are covered by another part (those parts that /// Delete from the set of current parts those parts that are covered by another part (those parts that
/// were merged), but that for some reason are still not deleted from the filesystem. /// were merged), but that for some reason are still not deleted from the filesystem.

View File

@ -1143,6 +1143,21 @@ private:
/// Returns default settings for storage with possible changes from global config. /// Returns default settings for storage with possible changes from global config.
virtual std::unique_ptr<MergeTreeSettings> getDefaultSettings() const = 0; virtual std::unique_ptr<MergeTreeSettings> getDefaultSettings() const = 0;
void loadDataPartsFromDisk(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
ThreadPool & pool,
size_t num_parts,
std::queue<std::vector<std::pair<String, DiskPtr>>> & parts_queue,
bool skip_sanity_checks,
const MergeTreeSettingsPtr & settings);
void loadDataPartsFromWAL(
DataPartsVector & broken_parts_to_detach,
DataPartsVector & duplicate_parts_to_remove,
MutableDataPartsVector & parts_from_wal,
DataPartsLock & part_lock);
}; };
/// RAII struct to record big parts that are submerging or emerging. /// RAII struct to record big parts that are submerging or emerging.