Integrate workload entity storage into server

This commit is contained in:
serxa 2024-09-01 17:08:24 +00:00
parent bb2716251b
commit 6b6cfd4e16
6 changed files with 105 additions and 1 deletions

View File

@ -2088,6 +2088,8 @@ try
database_catalog.assertDatabaseExists(default_database);
/// Load user-defined SQL functions.
global_context->getUserDefinedSQLObjectsStorage().loadObjects();
/// Load WORKLOADs and RESOURCEs.
global_context->getWorkloadEntityStorage().loadObjects();
}
catch (...)
{

View File

@ -1386,6 +1386,10 @@
If not specified they will be stored locally. -->
<!-- <user_defined_zookeeper_path>/clickhouse/user_defined</user_defined_zookeeper_path> -->
<!-- Path in ZooKeeper to store workload and resource created by the command CREATE WORKLOAD and CREATE REESOURCE.
If not specified they will be stored locally. -->
<!-- <workload_zookeeper_path>/clickhouse/workload</workload_zookeeper_path> -->
<!-- Uncomment if you want data to be compressed 30-100% better.
Don't do that if you just started using ClickHouse.
-->

View File

@ -0,0 +1,48 @@
#include <Common/Scheduler/Workload/createWorkloadEntityStorage.h>
#include <Common/Scheduler/Workload/WorkloadEntityDiskStorage.h>
#include <Common/Scheduler/Workload/WorkloadEntityKeeperStorage.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
#include <memory>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
}
std::unique_ptr<IWorkloadEntityStorage> createWorkloadEntityStorage(const ContextMutablePtr & global_context)
{
const String zookeeper_path_key = "workload_zookeeper_path";
const String disk_path_key = "workload_path";
const auto & config = global_context->getConfigRef();
if (config.has(zookeeper_path_key))
{
if (config.has(disk_path_key))
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"'{}' and '{}' must not be both specified in the config",
zookeeper_path_key,
disk_path_key);
}
abort(); // TODO(serxa): crate WorkloadEntityKeeperStorage object
//return std::make_unique<WorkloadEntityKeeperStorage>(global_context, config.getString(zookeeper_path_key));
}
else
{
String default_path = fs::path{global_context->getPath()} / "workload" / "";
String path = config.getString(disk_path_key, default_path);
return std::make_unique<WorkloadEntityDiskStorage>(global_context, path);
}
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
namespace DB
{
std::unique_ptr<IWorkloadEntityStorage> createWorkloadEntityStorage(const ContextMutablePtr & global_context);
}

View File

@ -64,7 +64,6 @@
#include <Access/SettingsConstraintsAndProfileIDs.h>
#include <Access/ExternalAuthenticators.h>
#include <Access/GSSAcceptor.h>
#include <Common/Scheduler/ResourceManagerFactory.h>
#include <Backups/BackupsWorker.h>
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
@ -89,6 +88,8 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/Scheduler/ResourceManagerFactory.h>
#include <Common/Scheduler/Workload/createWorkloadEntityStorage.h>
#include <Common/StackTrace.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/Config/ConfigProcessor.h>
@ -270,6 +271,9 @@ struct ContextSharedPart : boost::noncopyable
mutable OnceFlag user_defined_sql_objects_storage_initialized;
mutable std::unique_ptr<IUserDefinedSQLObjectsStorage> user_defined_sql_objects_storage;
mutable OnceFlag workload_entity_storage_initialized;
mutable std::unique_ptr<IWorkloadEntityStorage> workload_entity_storage;
#if USE_NLP
mutable OnceFlag synonyms_extensions_initialized;
mutable std::optional<SynonymsExtensions> synonyms_extensions;
@ -609,6 +613,7 @@ struct ContextSharedPart : boost::noncopyable
SHUTDOWN(log, "dictionaries loader", external_dictionaries_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "UDFs loader", external_user_defined_executable_functions_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "another UDFs storage", user_defined_sql_objects_storage, stopWatching());
SHUTDOWN(log, "workload entity storage", workload_entity_storage, stopWatching());
LOG_TRACE(log, "Shutting down named sessions");
Session::shutdownNamedSessions();
@ -640,6 +645,7 @@ struct ContextSharedPart : boost::noncopyable
std::unique_ptr<ExternalDictionariesLoader> delete_external_dictionaries_loader;
std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> delete_external_user_defined_executable_functions_loader;
std::unique_ptr<IUserDefinedSQLObjectsStorage> delete_user_defined_sql_objects_storage;
std::unique_ptr<IWorkloadEntityStorage> delete_workload_entity_storage;
std::unique_ptr<BackgroundSchedulePool> delete_buffer_flush_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_distributed_schedule_pool;
@ -724,6 +730,7 @@ struct ContextSharedPart : boost::noncopyable
delete_external_dictionaries_loader = std::move(external_dictionaries_loader);
delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader);
delete_user_defined_sql_objects_storage = std::move(user_defined_sql_objects_storage);
delete_workload_entity_storage = std::move(workload_entity_storage);
delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool);
delete_schedule_pool = std::move(schedule_pool);
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
@ -742,6 +749,7 @@ struct ContextSharedPart : boost::noncopyable
delete_external_dictionaries_loader.reset();
delete_external_user_defined_executable_functions_loader.reset();
delete_user_defined_sql_objects_storage.reset();
delete_workload_entity_storage.reset();
delete_ddl_worker.reset();
delete_buffer_flush_schedule_pool.reset();
delete_schedule_pool.reset();
@ -2903,6 +2911,32 @@ void Context::setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObj
shared->user_defined_sql_objects_storage = std::move(storage);
}
const IWorkloadEntityStorage & Context::getWorkloadEntityStorage() const
{
callOnce(shared->workload_entity_storage_initialized, [&] {
shared->workload_entity_storage = createWorkloadEntityStorage(getGlobalContext());
});
SharedLockGuard lock(shared->mutex);
return *shared->workload_entity_storage;
}
IWorkloadEntityStorage & Context::getWorkloadEntityStorage()
{
callOnce(shared->workload_entity_storage_initialized, [&] {
shared->workload_entity_storage = createWorkloadEntityStorage(getGlobalContext());
});
std::lock_guard lock(shared->mutex);
return *shared->workload_entity_storage;
}
void Context::setWorkloadEntityStorage(std::unique_ptr<IWorkloadEntityStorage> storage)
{
std::lock_guard lock(shared->mutex);
shared->workload_entity_storage = std::move(storage);
}
#if USE_NLP
SynonymsExtensions & Context::getSynonymsExtensions() const

View File

@ -70,6 +70,7 @@ class EmbeddedDictionaries;
class ExternalDictionariesLoader;
class ExternalUserDefinedExecutableFunctionsLoader;
class IUserDefinedSQLObjectsStorage;
class IWorkloadEntityStorage;
class InterserverCredentials;
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
@ -879,6 +880,10 @@ public:
void setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage);
void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config);
const IWorkloadEntityStorage & getWorkloadEntityStorage() const;
IWorkloadEntityStorage & getWorkloadEntityStorage();
void setWorkloadEntityStorage(std::unique_ptr<IWorkloadEntityStorage> storage);
#if USE_NLP
SynonymsExtensions & getSynonymsExtensions() const;
Lemmatizers & getLemmatizers() const;