better waiting for outdated data parts

This commit is contained in:
Anton Popov 2022-11-22 14:47:24 +00:00
parent 92ac9385b9
commit e3db29aebf
4 changed files with 80 additions and 26 deletions

View File

@ -167,6 +167,7 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY;
extern const int CANNOT_RESTORE_TABLE;
extern const int ZERO_COPY_REPLICATION_ERROR;
extern const int NOT_INITIALIZED;
}
@ -1619,7 +1620,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
resetObjectColumnsFromActiveParts(part_lock);
LOG_DEBUG(log, "There are no data parts");
are_outdated_data_parts_loaded = true;
outdated_data_parts_loading_finished = true;
return;
}
@ -1670,11 +1671,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
loadOutdatedDataParts(std::move(unloaded_parts));
});
outdated_data_parts_loading_task->deactivate();
}
else
are_outdated_data_parts_loaded = true;
outdated_data_parts_loading_finished = true;
LOG_DEBUG(log, "Loaded data parts ({} items)", data_parts_indexes.size());
data_parts_loading_finished = true;
@ -1689,30 +1688,66 @@ void MergeTreeData::loadOutdatedDataParts(PartLoadingTreeNodes parts_to_load)
for (const auto & part : parts_to_load)
{
auto res = loadDataPart(part->info, part->name, part->disk, MergeTreeDataPartState::Outdated, data_parts_mutex);
if (outdated_data_parts_loading_canceled)
{
LOG_DEBUG(log, "Stopped loading outdated parts because task was canceled");
return;
}
if (res.is_broken)
res.part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
else if (res.part->is_duplicate)
res.part->remove();
else
preparePartForRemoval(res.part);
std::string_view what;
try
{
what = "load";
auto res = loadDataPart(part->info, part->name, part->disk, MergeTreeDataPartState::Outdated, data_parts_mutex);
if (res.is_broken)
{
what = "rename to detached";
res.part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
}
else if (res.part->is_duplicate)
{
what = "remove";
res.part->remove();
}
else
{
what = "prepare for removal";
preparePartForRemoval(res.part);
}
}
catch (...)
{
LOG_ERROR(log, "Failed to {} outdated data part {}. You need to resolve it manually. Exception: {}",
what, part->name, getCurrentExceptionMessage(false));
continue;
}
}
LOG_DEBUG(log, "Loaded {} outdated data parts", parts_to_load.size());
are_outdated_data_parts_loaded.store(true);
are_outdated_data_parts_loaded.notify_all();
{
std::lock_guard lock(outdated_data_parts_mutex);
outdated_data_parts_loading_finished = true;
}
outdated_data_parts_cv.notify_all();
}
void MergeTreeData::waitForOutdatedPartsToBeLoaded() const
{
if (!are_outdated_data_parts_loaded)
{
LOG_TRACE(log, "Will wait for outdated data parts to be loaded");
are_outdated_data_parts_loaded.wait(false);
LOG_TRACE(log, "Finished waiting for outdated data parts to be loaded");
}
if (outdated_data_parts_loading_finished)
return;
LOG_TRACE(log, "Will wait for outdated data parts to be loaded");
std::unique_lock lock(outdated_data_parts_mutex);
outdated_data_parts_cv.wait(lock, [this] { return outdated_data_parts_loading_finished || outdated_data_parts_loading_canceled; });
if (outdated_data_parts_loading_canceled)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of outdated data parts was canceled");
LOG_TRACE(log, "Finished waiting for outdated data parts to be loaded");
}
void MergeTreeData::startOutdatedDataPartsLoadingTask()
@ -1721,6 +1756,20 @@ void MergeTreeData::startOutdatedDataPartsLoadingTask()
outdated_data_parts_loading_task->activateAndSchedule();
}
void MergeTreeData::stopOutdatedDataPartsLoadingTask()
{
if (!outdated_data_parts_loading_task)
return;
{
std::lock_guard lock(outdated_data_parts_mutex);
outdated_data_parts_loading_canceled = true;
}
outdated_data_parts_loading_task->deactivate();
outdated_data_parts_cv.notify_all();
}
/// Is the part directory old.
/// True if its modification time and the modification time of all files inside it is less then threshold.
/// (Only files on the first level of nesting are considered).
@ -4294,6 +4343,7 @@ Pipe MergeTreeData::alterPartition(
{
if (command.replace)
checkPartitionCanBeDropped(command.partition, query_context);
String from_database = query_context->resolveDatabase(command.from_database);
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context);

View File

@ -1323,11 +1323,17 @@ protected:
};
BackgroundSchedulePool::TaskHolder outdated_data_parts_loading_task;
std::atomic_bool are_outdated_data_parts_loaded = false;
mutable std::mutex outdated_data_parts_mutex;
mutable std::condition_variable outdated_data_parts_cv;
std::atomic_bool outdated_data_parts_loading_finished = false;
std::atomic_bool outdated_data_parts_loading_canceled = false;
void loadOutdatedDataParts(PartLoadingTreeNodes parts_to_load);
void waitForOutdatedPartsToBeLoaded() const;
void startOutdatedDataPartsLoadingTask();
void stopOutdatedDataPartsLoadingTask();
static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);

View File

@ -177,9 +177,10 @@ void StorageMergeTree::shutdown()
mutation_wait_event.notify_all();
}
/// We need to for loading of oudated data parts
/// for correct execution of 'clearOldPartsFromFilesystem'
waitForOutdatedPartsToBeLoaded();
if (outdated_data_parts_loading_task)
outdated_data_parts_loading_task->deactivate();
stopOutdatedDataPartsLoadingTask();
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();

View File

@ -4305,10 +4305,7 @@ void StorageReplicatedMergeTree::shutdown()
return;
session_expired_callback_handler.reset();
waitForOutdatedPartsToBeLoaded();
if (outdated_data_parts_loading_task)
outdated_data_parts_loading_task->deactivate();
stopOutdatedDataPartsLoadingTask();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();