Fix Keeper Context

This commit is contained in:
Antonio Andelic 2023-10-13 11:30:37 +00:00
parent c40a79f0b4
commit 48933f52fe
2 changed files with 93 additions and 51 deletions

View File

@ -3,6 +3,7 @@
#include <Common/Config/ConfigProcessor.h>
#include <Common/Macros.h>
#include <Common/ThreadPool.h>
#include <Common/callOnce.h>
#include <Core/ServerSettings.h>
@ -14,6 +15,7 @@
namespace ProfileEvents
{
extern const Event ContextLock;
extern const Event ContextLockWaitMicroseconds;
}
namespace CurrentMetrics
@ -40,7 +42,7 @@ struct ContextSharedPart : boost::noncopyable
{}
/// For access of most of shared objects. Recursive mutex.
mutable std::recursive_mutex mutex;
mutable SharedMutex mutex;
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher TSA_GUARDED_BY(keeper_dispatcher_mutex);
@ -50,13 +52,16 @@ struct ContextSharedPart : boost::noncopyable
String path; /// Path to the data directory, with a slash at the end.
ConfigurationPtr config; /// Global configuration settings.
MultiVersion<Macros> macros; /// Substitutions extracted from config.
OnceFlag schedule_pool_initialized;
mutable std::unique_ptr<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
///
mutable OnceFlag readers_initialized;
mutable std::unique_ptr<IAsynchronousReader> asynchronous_remote_fs_reader;
mutable std::unique_ptr<IAsynchronousReader> asynchronous_local_fs_reader;
mutable std::unique_ptr<IAsynchronousReader> synchronous_local_fs_reader;
mutable OnceFlag threadpool_writer_initialized;
mutable std::unique_ptr<ThreadPool> threadpool_writer;
mutable ThrottlerPtr remote_read_throttler; /// A server-wide throttler for remote IO reads
@ -64,13 +69,14 @@ struct ContextSharedPart : boost::noncopyable
mutable ThrottlerPtr local_read_throttler; /// A server-wide throttler for local IO reads
mutable ThrottlerPtr local_write_throttler; /// A server-wide throttler for local IO writes
};
ContextData::ContextData() = default;
ContextData::ContextData(const ContextData &) = default;
Context::Context() = default;
Context::Context(const Context & rhs) : ContextData(rhs), std::enable_shared_from_this<Context>(rhs) {}
Context::~Context() = default;
Context::Context(const Context &) = default;
Context & Context::operator=(const Context &) = default;
SharedContextHolder::SharedContextHolder(SharedContextHolder &&) noexcept = default;
SharedContextHolder & SharedContextHolder::operator=(SharedContextHolder &&) noexcept = default;
@ -87,10 +93,10 @@ void Context::makeGlobalContext()
global_context = shared_from_this();
}
ContextMutablePtr Context::createGlobal(ContextSharedPart * shared)
ContextMutablePtr Context::createGlobal(ContextSharedPart * shared_part)
{
auto res = std::shared_ptr<Context>(new Context);
res->shared = shared;
res->shared = shared_part;
return res;
}
@ -105,6 +111,7 @@ SharedContextHolder Context::createShared()
return SharedContextHolder(std::make_unique<ContextSharedPart>());
}
ContextMutablePtr Context::getGlobalContext() const
{
auto ptr = global_context.lock();
@ -112,22 +119,55 @@ ContextMutablePtr Context::getGlobalContext() const
return ptr;
}
std::unique_lock<std::recursive_mutex> Context::getLock() const
std::unique_lock<SharedMutex> Context::getGlobalLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
return std::unique_lock(shared->mutex);
Stopwatch watch;
auto lock = std::unique_lock(shared->mutex);
ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
return lock;
}
std::shared_lock<SharedMutex> Context::getGlobalSharedLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
Stopwatch watch;
auto lock = std::shared_lock(shared->mutex);
ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
return lock;
}
std::unique_lock<SharedMutex> Context::getLocalLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
Stopwatch watch;
auto lock = std::unique_lock(mutex);
ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
return lock;
}
std::shared_lock<SharedMutex> Context::getLocalSharedLock() const
{
ProfileEvents::increment(ProfileEvents::ContextLock);
CurrentMetrics::Increment increment{CurrentMetrics::ContextLockWait};
Stopwatch watch;
auto lock = std::shared_lock(mutex);
ProfileEvents::increment(ProfileEvents::ContextLockWaitMicroseconds, watch.elapsedMicroseconds());
return lock;
}
String Context::getPath() const
{
auto lock = getLock();
auto lock = getGlobalSharedLock();
return shared->path;
}
void Context::setPath(const String & path)
{
auto lock = getLock();
auto lock = getGlobalLock();
shared->path = path;
}
@ -143,15 +183,13 @@ void Context::setMacros(std::unique_ptr<Macros> && macros)
BackgroundSchedulePool & Context::getSchedulePool() const
{
auto lock = getLock();
if (!shared->schedule_pool)
{
callOnce(shared->schedule_pool_initialized, [&] {
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
shared->server_settings.background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
CurrentMetrics::BackgroundSchedulePoolSize,
"BgSchPool");
}
});
return *shared->schedule_pool;
}
@ -168,30 +206,21 @@ const RemoteHostFilter & Context::getRemoteHostFilter() const
IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{
auto lock = getLock();
callOnce(shared->readers_initialized, [&] {
const auto & config = getConfigRef();
shared->asynchronous_remote_fs_reader = createThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER, config);
shared->asynchronous_local_fs_reader = createThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER, config);
shared->synchronous_local_fs_reader = createThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER, config);
});
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
if (!shared->asynchronous_remote_fs_reader)
shared->asynchronous_remote_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->asynchronous_local_fs_reader)
shared->asynchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->synchronous_local_fs_reader)
shared->synchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->synchronous_local_fs_reader;
}
}
}
@ -207,19 +236,19 @@ std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetche
void Context::setConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
auto lock = getGlobalLock();
shared->config = config;
}
const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
{
auto lock = getLock();
auto lock = getGlobalSharedLock();
return shared->config ? *shared->config : Poco::Util::Application::instance().config();
}
std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
{
auto lock = getLock();
auto lock = getLocalLock();
if (!async_read_counters)
async_read_counters = std::make_shared<AsyncReadCounters>();
return async_read_counters;
@ -227,18 +256,14 @@ std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
ThreadPool & Context::getThreadPoolWriter() const
{
const auto & config = getConfigRef();
auto lock = getLock();
if (!shared->threadpool_writer)
{
callOnce(shared->threadpool_writer_initialized, [&] {
const auto & config = getConfigRef();
auto pool_size = config.getUInt(".threadpool_writer_pool_size", 100);
auto queue_size = config.getUInt(".threadpool_writer_queue_size", 1000000);
shared->threadpool_writer = std::make_unique<ThreadPool>(
CurrentMetrics::IOWriterThreads, CurrentMetrics::IOWriterThreadsActive, pool_size, pool_size, queue_size);
}
});
return *shared->threadpool_writer;
}

View File

@ -6,6 +6,7 @@
#include <Common/MultiVersion.h>
#include <Common/RemoteHostFilter.h>
#include <Common/SharedMutex.h>
#include <Disks/IO/getThreadPoolReader.h>
@ -44,17 +45,9 @@ private:
std::unique_ptr<ContextSharedPart> shared;
};
class Context : public std::enable_shared_from_this<Context>
class ContextData
{
private:
/// Use copy constructor or createGlobal() instead
Context();
Context(const Context &);
Context & operator=(const Context &);
std::unique_lock<std::recursive_mutex> getLock() const;
protected:
ContextWeakMutablePtr global_context;
inline static ContextPtr global_context_instance;
ContextSharedPart * shared;
@ -63,9 +56,33 @@ private:
mutable std::shared_ptr<AsyncReadCounters> async_read_counters;
Settings settings; /// Setting for query execution.
public:
/// Use copy constructor or createGlobal() instead
ContextData();
ContextData(const ContextData &);
};
class Context : public ContextData, public std::enable_shared_from_this<Context>
{
private:
/// ContextData mutex
mutable SharedMutex mutex;
Context();
Context(const Context &);
std::unique_lock<SharedMutex> getGlobalLock() const;
std::shared_lock<SharedMutex> getGlobalSharedLock() const;
std::unique_lock<SharedMutex> getLocalLock() const;
std::shared_lock<SharedMutex> getLocalSharedLock() const;
public:
/// Create initial Context with ContextShared and etc.
static ContextMutablePtr createGlobal(ContextSharedPart * shared);
static ContextMutablePtr createGlobal(ContextSharedPart * shared_part);
static SharedContextHolder createShared();
ContextMutablePtr getGlobalContext() const;