diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index 07e2edac129..a20ee53ff75 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -15,7 +15,8 @@ #include #include #include - +#include +#include namespace DB { @@ -68,8 +69,8 @@ DiskObjectStorage::DiskObjectStorage( , metadata_storage(std::move(metadata_storage_)) , object_storage(std::move(object_storage_)) , send_metadata(config.getBool(config_prefix + ".send_metadata", false)) - , read_resource_name(config.getString(config_prefix + ".read_resource", "")) - , write_resource_name(config.getString(config_prefix + ".write_resource", "")) + , read_resource_name_from_config(config.getString(config_prefix + ".read_resource", "")) + , write_resource_name_from_config(config.getString(config_prefix + ".write_resource", "")) , metadata_helper(std::make_unique(this, ReadSettings{}, WriteSettings{})) { data_source_description = DataSourceDescription{ @@ -80,6 +81,52 @@ DiskObjectStorage::DiskObjectStorage( .is_encrypted = false, .is_cached = object_storage->supportsCache(), }; + resource_changes_subscription = Context::getGlobalContextInstance()->getWorkloadEntityStorage().getAllEntitiesAndSubscribe( + [this] (const std::vector & events) + { + std::unique_lock lock{resource_mutex}; + for (auto [entity_type, resource_name, resource] : events) + { + if (entity_type == WorkloadEntityType::Resource) + { + if (resource) // CREATE RESOURCE + { + // We rely on the fact that every disk is allowed to be mentioned at most + // in one RESOURCE for READ and in one RESOURCE for WRITE + // TODO(serxa): add disk operations validation in workload entity storage + auto * create = typeid_cast(resource.get()); + chassert(create); + for (const auto & [mode, disk] : create->operations) + { + if (disk == name) + { + switch (mode) + { + case ASTCreateResourceQuery::AccessMode::Read: + { + read_resource_name_from_sql = resource_name; + break; + } + case ASTCreateResourceQuery::AccessMode::Write: + { + write_resource_name_from_sql = resource_name; + break; + } + } + } + } + } + else // DROP RESOURCE + { + if (read_resource_name_from_sql == resource_name) + read_resource_name_from_sql.clear(); + if (write_resource_name_from_sql == resource_name) + write_resource_name_from_sql.clear(); + } + break; + } + } + }); } StoredObjects DiskObjectStorage::getStorageObjects(const String & local_path) const @@ -480,13 +527,13 @@ static inline Settings updateIOSchedulingSettings(const Settings & settings, con String DiskObjectStorage::getReadResourceName() const { std::unique_lock lock(resource_mutex); - return read_resource_name; + return read_resource_name_from_config.empty() ? read_resource_name_from_sql : read_resource_name_from_config; } String DiskObjectStorage::getWriteResourceName() const { std::unique_lock lock(resource_mutex); - return write_resource_name; + return write_resource_name_from_config.empty() ? write_resource_name_from_sql : write_resource_name_from_config; } std::unique_ptr DiskObjectStorage::readFile( @@ -551,10 +598,10 @@ void DiskObjectStorage::applyNewSettings( { std::unique_lock lock(resource_mutex); - if (String new_read_resource_name = config.getString(config_prefix + ".read_resource", ""); new_read_resource_name != read_resource_name) - read_resource_name = new_read_resource_name; - if (String new_write_resource_name = config.getString(config_prefix + ".write_resource", ""); new_write_resource_name != write_resource_name) - write_resource_name = new_write_resource_name; + if (String new_read_resource_name = config.getString(config_prefix + ".read_resource", ""); new_read_resource_name != read_resource_name_from_config) + read_resource_name_from_config = new_read_resource_name; + if (String new_write_resource_name = config.getString(config_prefix + ".write_resource", ""); new_write_resource_name != write_resource_name_from_config) + write_resource_name_from_config = new_write_resource_name; } IDisk::applyNewSettings(config, context_, config_prefix, disk_map); diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h index 5c45a258806..d4d4dc11ffa 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.h +++ b/src/Disks/ObjectStorages/DiskObjectStorage.h @@ -6,6 +6,8 @@ #include #include +#include + #include "config.h" @@ -242,8 +244,11 @@ private: const bool send_metadata; mutable std::mutex resource_mutex; - String read_resource_name; - String write_resource_name; + String read_resource_name_from_config; // specified in disk config.xml + String write_resource_name_from_config; // specified in disk config.xml + String read_resource_name_from_sql; // described by CREATE RESOURCE queries + String write_resource_name_from_sql; // described by CREATE RESOURCE queries + scope_guard resource_changes_subscription; std::unique_ptr metadata_helper; }; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index d3cbbf76156..5de1dece884 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2979,17 +2979,7 @@ void Context::setUserDefinedSQLObjectsStorage(std::unique_ptruser_defined_sql_objects_storage = std::move(storage); } -const IWorkloadEntityStorage & Context::getWorkloadEntityStorage() const -{ - callOnce(shared->workload_entity_storage_initialized, [&] { - shared->workload_entity_storage = createWorkloadEntityStorage(getGlobalContext()); - }); - - SharedLockGuard lock(shared->mutex); - return *shared->workload_entity_storage; -} - -IWorkloadEntityStorage & Context::getWorkloadEntityStorage() +IWorkloadEntityStorage & Context::getWorkloadEntityStorage() const { callOnce(shared->workload_entity_storage_initialized, [&] { shared->workload_entity_storage = createWorkloadEntityStorage(getGlobalContext()); @@ -2999,12 +2989,6 @@ IWorkloadEntityStorage & Context::getWorkloadEntityStorage() return *shared->workload_entity_storage; } -void Context::setWorkloadEntityStorage(std::unique_ptr storage) -{ - std::lock_guard lock(shared->mutex); - shared->workload_entity_storage = std::move(storage); -} - #if USE_NLP SynonymsExtensions & Context::getSynonymsExtensions() const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index be963d85757..114e2c96570 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -882,9 +882,7 @@ public: void setUserDefinedSQLObjectsStorage(std::unique_ptr storage); void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config); - const IWorkloadEntityStorage & getWorkloadEntityStorage() const; - IWorkloadEntityStorage & getWorkloadEntityStorage(); - void setWorkloadEntityStorage(std::unique_ptr storage); + IWorkloadEntityStorage & getWorkloadEntityStorage() const; #if USE_NLP SynonymsExtensions & getSynonymsExtensions() const;