do not wait loading parts at shutdown

This commit is contained in:
Anton Popov 2022-12-28 16:07:14 +00:00
parent 0722020cae
commit 2adf0e9db3
4 changed files with 69 additions and 51 deletions

View File

@ -1034,9 +1034,7 @@ MergeTreeData::PartLoadingTree::build(PartLoadingInfos nodes)
{
std::sort(nodes.begin(), nodes.end(), [](const auto & lhs, const auto & rhs)
{
const auto & lhs_info = std::get<0>(lhs);
const auto & rhs_info = std::get<0>(rhs);
return std::tie(lhs_info.level, lhs_info.mutation) > std::tie(rhs_info.level, rhs_info.mutation);
return std::tie(lhs.info.level, lhs.info.mutation) > std::tie(rhs.info.level, rhs.info.mutation);
});
PartLoadingTree tree;
@ -1367,7 +1365,7 @@ std::vector<MergeTreeData::LoadPartResult> MergeTreeData::loadDataPartsFromDisk(
auto res = loadDataPart(thread_part->info, thread_part->name, thread_part->disk, DataPartState::Active, part_loading_mutex);
thread_part->is_loaded = true;
bool is_active_part = res.part->getState() == DataPartState::Active && !res.part->is_duplicate;
bool is_active_part = res.part->getState() == DataPartState::Active;
/// If part is broken or duplicate or should be removed according to transaction
/// and it has any covered parts then try to load them to replace this part.
if (!is_active_part && !thread_part->children.empty())
@ -1564,6 +1562,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
bool have_lightweight_in_parts = false;
bool have_parts_with_version_metadata = false;
bool is_static_storage = isStaticStorage();
if (num_parts > 0)
{
auto loaded_parts = loadDataPartsFromDisk(pool, num_parts, parts_queue, settings);
@ -1593,8 +1593,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
}
bool is_static_storage = isStaticStorage();
if (settings->in_memory_parts_enable_wal)
{
pool.setMaxThreads(disks.size());
@ -1652,7 +1650,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
resetObjectColumnsFromActiveParts(part_lock);
LOG_DEBUG(log, "There are no data parts");
outdated_data_parts_loading_finished = true;
return;
}
@ -1697,38 +1694,50 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!unloaded_parts.empty())
{
LOG_DEBUG(log, "Found {} outdated data parts. They will be loaded asynchronously", unloaded_parts.size());
outdated_unloaded_data_parts = std::move(unloaded_parts);
outdated_data_parts_loading_task = getContext()->getSchedulePool().createTask(
"MergeTreeData::loadOutdatedDataParts",
[this, unloaded_parts]() mutable
{
loadOutdatedDataParts(std::move(unloaded_parts));
});
[this] { loadOutdatedDataParts(/*is_async=*/ true); });
}
else
outdated_data_parts_loading_finished = true;
LOG_DEBUG(log, "Loaded data parts ({} items)", data_parts_indexes.size());
data_parts_loading_finished = true;
}
void MergeTreeData::loadOutdatedDataParts(PartLoadingTreeNodes parts_to_load)
void MergeTreeData::loadOutdatedDataParts(bool is_async)
try
{
if (parts_to_load.empty())
return;
LOG_DEBUG(log, "Loading {} outdated data parts", parts_to_load.size());
for (const auto & part : parts_to_load)
{
if (outdated_data_parts_loading_canceled)
std::lock_guard lock(outdated_data_parts_mutex);
LOG_DEBUG(log, "Loading {} outdated data parts", outdated_unloaded_data_parts.size());
}
size_t num_loaded_parts = 0;
while (true)
{
PartLoadingTree::NodePtr part;
{
LOG_DEBUG(log, "Stopped loading outdated parts because task was canceled");
return;
std::lock_guard lock(outdated_data_parts_mutex);
if (is_async && outdated_data_parts_loading_canceled)
{
LOG_DEBUG(log,
"Stopped loading outdated parts because task was canceled. "
"Loaded {} parts, {} left unloaded", num_loaded_parts, outdated_unloaded_data_parts.size());
return;
}
if (outdated_unloaded_data_parts.empty())
break;
part = std::move(outdated_unloaded_data_parts.back());
outdated_unloaded_data_parts.pop_back();
}
auto res = loadDataPart(part->info, part->name, part->disk, MergeTreeDataPartState::Outdated, data_parts_mutex);
++num_loaded_parts;
if (res.is_broken)
res.part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
@ -1738,13 +1747,7 @@ try
preparePartForRemoval(res.part);
}
LOG_DEBUG(log, "Loaded {} outdated data parts", parts_to_load.size());
{
std::lock_guard lock(outdated_data_parts_mutex);
outdated_data_parts_loading_finished = true;
}
LOG_DEBUG(log, "Loaded {} outdated data parts", num_loaded_parts);
outdated_data_parts_cv.notify_all();
}
catch (...)
@ -1761,13 +1764,13 @@ void MergeTreeData::waitForOutdatedPartsToBeLoaded() const
if (isStaticStorage())
return;
if (outdated_data_parts_loading_finished)
std::unique_lock lock(outdated_data_parts_mutex);
if (outdated_unloaded_data_parts.empty())
return;
LOG_TRACE(log, "Will wait for outdated data parts to be loaded");
std::unique_lock lock(outdated_data_parts_mutex);
outdated_data_parts_cv.wait(lock, [this] { return outdated_data_parts_loading_finished || outdated_data_parts_loading_canceled; });
outdated_data_parts_cv.wait(lock, [this] { return outdated_unloaded_data_parts.empty() || outdated_data_parts_loading_canceled; });
if (outdated_data_parts_loading_canceled)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of outdated data parts was canceled");

View File

@ -1359,8 +1359,21 @@ protected:
std::map<MergeTreePartInfo, std::shared_ptr<Node>> children;
};
struct PartLoadingInfo
{
PartLoadingInfo(const MergeTreePartInfo & info_, const String & name_, const DiskPtr & disk_)
: info(info_), name(name_), disk(disk_)
{
}
/// Store name explicitly because it cannot be easily
/// retrived from info in tables with old syntax.
MergeTreePartInfo info;
String name;
DiskPtr disk;
};
using NodePtr = std::shared_ptr<Node>;
using PartLoadingInfo = std::tuple<MergeTreePartInfo, String, DiskPtr>;
using PartLoadingInfos = std::vector<PartLoadingInfo>;
/// Builds a tree from the list of part infos.
@ -1387,15 +1400,14 @@ protected:
MutableDataPartPtr part;
};
BackgroundSchedulePool::TaskHolder outdated_data_parts_loading_task;
mutable std::mutex outdated_data_parts_mutex;
mutable std::condition_variable outdated_data_parts_cv;
std::atomic_bool outdated_data_parts_loading_finished = false;
std::atomic_bool outdated_data_parts_loading_canceled = false;
BackgroundSchedulePool::TaskHolder outdated_data_parts_loading_task;
PartLoadingTreeNodes outdated_unloaded_data_parts;
bool outdated_data_parts_loading_canceled = false;
void loadOutdatedDataParts(PartLoadingTreeNodes parts_to_load);
void loadOutdatedDataParts(bool is_async);
void startOutdatedDataPartsLoadingTask();
void stopOutdatedDataPartsLoadingTask();

View File

@ -177,11 +177,6 @@ void StorageMergeTree::shutdown()
mutation_wait_event.notify_all();
}
/// We need to wait for loading of oudated data parts
/// for correct execution of 'clearOldPartsFromFilesystem'
waitForOutdatedPartsToBeLoaded();
stopOutdatedDataPartsLoadingTask();
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();

View File

@ -902,6 +902,21 @@ void StorageReplicatedMergeTree::drop()
dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings());
}
/// Wait for loading of all outdated parts because
/// in case of zero copy recursive removal of directory
/// is not supported and table cannot be dropped.
if (canUseZeroCopyReplication())
{
/// Load remaining parts synchronously because task
/// for loading is already cancelled in shutdown().
std::lock_guard lock(outdated_data_parts_mutex);
if (!outdated_unloaded_data_parts.empty())
{
assert(outdated_data_parts_loading_canceled);
loadOutdatedDataParts(/*is_async=*/ false);
}
}
dropAllData();
}
@ -4318,13 +4333,6 @@ void StorageReplicatedMergeTree::shutdown()
return;
session_expired_callback_handler.reset();
/// Wait for loading of all outdated parts because
/// in case of zero copy recursive removal of directory
/// is not supported and table cannot be dropped.
if (canUseZeroCopyReplication())
waitForOutdatedPartsToBeLoaded();
stopOutdatedDataPartsLoadingTask();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.