integrate DiskObjectStorage and RESOURCEs

This commit is contained in:
serxa 2024-09-24 15:52:29 +00:00
parent 709c8489f7
commit 9db958dcdc
4 changed files with 65 additions and 31 deletions

View File

@ -15,7 +15,8 @@
#include <Disks/FakeDiskTransaction.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Parsers/ASTCreateResourceQuery.h>
namespace DB
{
@ -68,8 +69,8 @@ DiskObjectStorage::DiskObjectStorage(
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(config.getBool(config_prefix + ".send_metadata", false))
, read_resource_name(config.getString(config_prefix + ".read_resource", ""))
, write_resource_name(config.getString(config_prefix + ".write_resource", ""))
, read_resource_name_from_config(config.getString(config_prefix + ".read_resource", ""))
, write_resource_name_from_config(config.getString(config_prefix + ".write_resource", ""))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}, WriteSettings{}))
{
data_source_description = DataSourceDescription{
@ -80,6 +81,52 @@ DiskObjectStorage::DiskObjectStorage(
.is_encrypted = false,
.is_cached = object_storage->supportsCache(),
};
resource_changes_subscription = Context::getGlobalContextInstance()->getWorkloadEntityStorage().getAllEntitiesAndSubscribe(
[this] (const std::vector<IWorkloadEntityStorage::Event> & events)
{
std::unique_lock lock{resource_mutex};
for (auto [entity_type, resource_name, resource] : events)
{
if (entity_type == WorkloadEntityType::Resource)
{
if (resource) // CREATE RESOURCE
{
// We rely on the fact that every disk is allowed to be mentioned at most
// in one RESOURCE for READ and in one RESOURCE for WRITE
// TODO(serxa): add disk operations validation in workload entity storage
auto * create = typeid_cast<ASTCreateResourceQuery *>(resource.get());
chassert(create);
for (const auto & [mode, disk] : create->operations)
{
if (disk == name)
{
switch (mode)
{
case ASTCreateResourceQuery::AccessMode::Read:
{
read_resource_name_from_sql = resource_name;
break;
}
case ASTCreateResourceQuery::AccessMode::Write:
{
write_resource_name_from_sql = resource_name;
break;
}
}
}
}
}
else // DROP RESOURCE
{
if (read_resource_name_from_sql == resource_name)
read_resource_name_from_sql.clear();
if (write_resource_name_from_sql == resource_name)
write_resource_name_from_sql.clear();
}
break;
}
}
});
}
StoredObjects DiskObjectStorage::getStorageObjects(const String & local_path) const
@ -480,13 +527,13 @@ static inline Settings updateIOSchedulingSettings(const Settings & settings, con
String DiskObjectStorage::getReadResourceName() const
{
std::unique_lock lock(resource_mutex);
return read_resource_name;
return read_resource_name_from_config.empty() ? read_resource_name_from_sql : read_resource_name_from_config;
}
String DiskObjectStorage::getWriteResourceName() const
{
std::unique_lock lock(resource_mutex);
return write_resource_name;
return write_resource_name_from_config.empty() ? write_resource_name_from_sql : write_resource_name_from_config;
}
std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
@ -551,10 +598,10 @@ void DiskObjectStorage::applyNewSettings(
{
std::unique_lock lock(resource_mutex);
if (String new_read_resource_name = config.getString(config_prefix + ".read_resource", ""); new_read_resource_name != read_resource_name)
read_resource_name = new_read_resource_name;
if (String new_write_resource_name = config.getString(config_prefix + ".write_resource", ""); new_write_resource_name != write_resource_name)
write_resource_name = new_write_resource_name;
if (String new_read_resource_name = config.getString(config_prefix + ".read_resource", ""); new_read_resource_name != read_resource_name_from_config)
read_resource_name_from_config = new_read_resource_name;
if (String new_write_resource_name = config.getString(config_prefix + ".write_resource", ""); new_write_resource_name != write_resource_name_from_config)
write_resource_name_from_config = new_write_resource_name;
}
IDisk::applyNewSettings(config, context_, config_prefix, disk_map);

View File

@ -6,6 +6,8 @@
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Common/re2.h>
#include <base/scope_guard.h>
#include "config.h"
@ -242,8 +244,11 @@ private:
const bool send_metadata;
mutable std::mutex resource_mutex;
String read_resource_name;
String write_resource_name;
String read_resource_name_from_config; // specified in disk config.xml
String write_resource_name_from_config; // specified in disk config.xml
String read_resource_name_from_sql; // described by CREATE RESOURCE queries
String write_resource_name_from_sql; // described by CREATE RESOURCE queries
scope_guard resource_changes_subscription;
std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;
};

View File

@ -2979,17 +2979,7 @@ void Context::setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObj
shared->user_defined_sql_objects_storage = std::move(storage);
}
const IWorkloadEntityStorage & Context::getWorkloadEntityStorage() const
{
callOnce(shared->workload_entity_storage_initialized, [&] {
shared->workload_entity_storage = createWorkloadEntityStorage(getGlobalContext());
});
SharedLockGuard lock(shared->mutex);
return *shared->workload_entity_storage;
}
IWorkloadEntityStorage & Context::getWorkloadEntityStorage()
IWorkloadEntityStorage & Context::getWorkloadEntityStorage() const
{
callOnce(shared->workload_entity_storage_initialized, [&] {
shared->workload_entity_storage = createWorkloadEntityStorage(getGlobalContext());
@ -2999,12 +2989,6 @@ IWorkloadEntityStorage & Context::getWorkloadEntityStorage()
return *shared->workload_entity_storage;
}
void Context::setWorkloadEntityStorage(std::unique_ptr<IWorkloadEntityStorage> storage)
{
std::lock_guard lock(shared->mutex);
shared->workload_entity_storage = std::move(storage);
}
#if USE_NLP
SynonymsExtensions & Context::getSynonymsExtensions() const

View File

@ -882,9 +882,7 @@ public:
void setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage);
void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config);
const IWorkloadEntityStorage & getWorkloadEntityStorage() const;
IWorkloadEntityStorage & getWorkloadEntityStorage();
void setWorkloadEntityStorage(std::unique_ptr<IWorkloadEntityStorage> storage);
IWorkloadEntityStorage & getWorkloadEntityStorage() const;
#if USE_NLP
SynonymsExtensions & getSynonymsExtensions() const;