start loader, fix helpers, fix deadlock

This commit is contained in:
serxa 2023-05-10 19:56:48 +00:00
parent 1198f1fc4c
commit 6bbc061ba0
8 changed files with 16 additions and 9 deletions

View File

@ -779,6 +779,8 @@ try
0, // We don't need any threads one all the parts will be loaded
server_settings.outdated_part_loading_thread_pool_queue_size);
global_context->getAsyncLoader().start();
/// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs"))
{

View File

@ -185,7 +185,7 @@ inline void scheduleLoad(const LoadTaskPtrs & tasks)
}
template <class... Args>
inline void scheduleLoad(Args && ... args)
inline void scheduleLoadAll(Args && ... args)
{
(scheduleLoad(std::forward<Args>(args)), ...);
}
@ -208,7 +208,7 @@ inline void waitLoad(const LoadTaskPtrs & tasks)
}
template <class... Args>
inline void waitLoad(Args && ... args)
inline void waitLoadAll(Args && ... args)
{
(waitLoad(std::forward<Args>(args)), ...);
}
@ -216,8 +216,8 @@ inline void waitLoad(Args && ... args)
template <class... Args>
inline void scheduleAndWaitLoad(Args && ... args)
{
scheduleLoad(std::forward<Args>(args)...);
waitLoad(std::forward<Args>(args)...);
scheduleLoadAll(std::forward<Args>(args)...);
waitLoadAll(std::forward<Args>(args)...);
}
inline LoadJobSet getGoals(const LoadTaskPtrs & tasks)

View File

@ -443,9 +443,10 @@ void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, Loadin
LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode)
{
auto base = DatabaseOrdinary::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
std::scoped_lock lock{mutex};
auto job = makeLoadJob(
DatabaseOrdinary::startupDatabaseAsync(async_loader, std::move(startup_after), mode)->goals(),
base->goals(),
DATABASE_STARTUP_PRIORITY,
fmt::format("startup Atomic database {}", database_name),
[this, mode] (const LoadJobPtr &)

View File

@ -502,9 +502,10 @@ UInt64 DatabaseReplicated::getMetadataHash(const String & table_name) const
LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode)
{
auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
std::scoped_lock lock{mutex};
auto job = makeLoadJob(
DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode)->goals(),
base->goals(),
DATABASE_STARTUP_PRIORITY,
fmt::format("startup Replicated database {}", database_name),
[this] (const LoadJobPtr &)

View File

@ -65,9 +65,10 @@ void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exceptio
LoadTaskPtr DatabaseMaterializedMySQL::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode)
{
auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
std::scoped_lock lock{mutex};
auto job = makeLoadJob(
DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode)->goals(),
base->goals(),
DATABASE_STARTUP_PRIORITY,
fmt::format("startup MaterializedMySQL database {}", database_name),
[this, mode] (const LoadJobPtr &)

View File

@ -127,9 +127,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
LoadTaskPtr DatabaseMaterializedPostgreSQL::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode)
{
auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
std::scoped_lock lock{mutex};
auto job = makeLoadJob(
DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode)->goals(),
base->goals(),
DATABASE_STARTUP_PRIORITY,
fmt::format("startup MaterializedMySQL database {}", database_name),
[this] (const LoadJobPtr &)

View File

@ -12,6 +12,7 @@
#include <Common/Stopwatch.h>
#include <Common/AsyncLoader.h>
namespace Poco
{
class Logger; // NOLINT(cppcoreguidelines-virtual-class-destructor)

View File

@ -2,7 +2,6 @@
#include <base/types.h>
#include <Common/isLocalAddress.h>
#include <Common/AsyncLoader.h>
#include <Common/MultiVersion.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/RemoteHostFilter.h>
@ -130,6 +129,7 @@ class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
template <class Queue>
class MergeTreeBackgroundExecutor;
class AsyncLoader;
/// Scheduling policy can be changed using `background_merges_mutations_scheduling_policy` config option.
/// By default concurrent merges are scheduled using "round_robin" to ensure fair and starvation-free operation.