support ANY DISK clause in RESOURCE definition

This commit is contained in:
serxa 2024-10-23 13:23:48 +00:00
parent 7c58e7a93b
commit 4081ed87e9
7 changed files with 115 additions and 58 deletions

View File

@ -46,7 +46,13 @@ Example:
An alternative way to express which disks are used by a resource is SQL syntax:
```sql
CREATE RESOURCE (WRITE DISK disk1, READ DISK disk2)
CREATE RESOURCE resource_name (WRITE DISK disk1, READ DISK disk2)
```
Resource could be used for any number of disk for READ or WRITE or both for READ and WRITE. There a syntax allowing to use a resource for all the disks:
```sql
CREATE RESOURCE all_io (READ ANY DISK, WRITE ANY DISK);
```
Note that server configuration options have priority over SQL way to define resources.

View File

@ -88,63 +88,81 @@ DiskObjectStorage::DiskObjectStorage(
[this] (const std::vector<IWorkloadEntityStorage::Event> & events)
{
std::unique_lock lock{resource_mutex};
for (auto [entity_type, resource_name, resource] : events)
// Sets of matching resource names. Required to resolve possible conflicts in deterministic way
std::set<String> new_read_resource_name_from_sql{read_resource_name_from_sql};
std::set<String> new_write_resource_name_from_sql{write_resource_name_from_sql};
std::set<String> new_read_resource_name_from_sql_any{read_resource_name_from_sql_any};
std::set<String> new_write_resource_name_from_sql_any{write_resource_name_from_sql_any};
for (const auto & [entity_type, resource_name, resource] : events)
{
if (entity_type == WorkloadEntityType::Resource)
{
if (resource) // CREATE RESOURCE
{
// We rely on the fact that every disk is allowed to be mentioned at most
// in one RESOURCE for READ and in one RESOURCE for WRITE
// TODO(serxa): add disk operations validation in workload entity storage
auto * create = typeid_cast<ASTCreateResourceQuery *>(resource.get());
chassert(create);
for (const auto & [mode, disk] : create->operations)
{
if (disk == name)
if (!disk)
{
switch (mode)
{
case ASTCreateResourceQuery::AccessMode::Read:
{
if (read_resource_name_from_config.empty())
LOG_INFO(log, "Using resource '{}' for READ", resource_name);
else
LOG_INFO(log, "Resource '{}' should be used for READ, but it is overridden by config to resource '{}'",
resource_name, read_resource_name_from_config);
read_resource_name_from_sql = resource_name;
break;
}
case ASTCreateResourceQuery::AccessMode::Write:
{
if (write_resource_name_from_config.empty())
LOG_INFO(log, "Using resource '{}' for WRITE", resource_name);
else
LOG_INFO(log, "Resource '{}' should be used for WRITE, but it is overridden by config to resource '{}'",
resource_name, write_resource_name_from_config);
write_resource_name_from_sql = resource_name;
break;
}
case ASTCreateResourceQuery::AccessMode::Read: new_read_resource_name_from_sql_any.insert(resource_name); break;
case ASTCreateResourceQuery::AccessMode::Write: new_write_resource_name_from_sql_any.insert(resource_name); break;
}
}
else if (*disk == name)
{
switch (mode)
{
case ASTCreateResourceQuery::AccessMode::Read: new_read_resource_name_from_sql.insert(resource_name); break;
case ASTCreateResourceQuery::AccessMode::Write: new_write_resource_name_from_sql.insert(resource_name); break;
}
}
}
}
else // DROP RESOURCE
{
if (read_resource_name_from_sql == resource_name)
{
LOG_INFO(log, "Stop using resource '{}' for READ", resource_name);
read_resource_name_from_sql.clear();
}
if (write_resource_name_from_sql == resource_name)
{
LOG_INFO(log, "Stop using resource '{}' for WRITE", resource_name);
write_resource_name_from_sql.clear();
}
new_read_resource_name_from_sql.erase(resource_name);
new_write_resource_name_from_sql.erase(resource_name);
new_read_resource_name_from_sql_any.erase(resource_name);
new_write_resource_name_from_sql_any.erase(resource_name);
}
break;
}
}
String old_read_resource = getReadResourceNameNoLock();
String old_write_resource = getWriteResourceNameNoLock();
if (!new_read_resource_name_from_sql_any.empty())
read_resource_name_from_sql_any = *new_read_resource_name_from_sql_any.begin();
else
read_resource_name_from_sql_any.clear();
if (!new_write_resource_name_from_sql_any.empty())
write_resource_name_from_sql_any = *new_write_resource_name_from_sql_any.begin();
else
write_resource_name_from_sql_any.clear();
if (!new_read_resource_name_from_sql.empty())
read_resource_name_from_sql = *new_read_resource_name_from_sql.begin();
else
read_resource_name_from_sql.clear();
if (!new_write_resource_name_from_sql.empty())
write_resource_name_from_sql = *new_write_resource_name_from_sql.begin();
else
write_resource_name_from_sql.clear();
String new_read_resource = getReadResourceNameNoLock();
String new_write_resource = getWriteResourceNameNoLock();
if (old_read_resource != new_read_resource)
LOG_INFO(log, "Using resource '{}' instead of '{}' for READ", new_read_resource, old_write_resource);
if (old_write_resource != new_write_resource)
LOG_INFO(log, "Using resource '{}' instead of '{}' for WRITE", new_write_resource, old_write_resource);
});
}
@ -545,13 +563,29 @@ static inline Settings updateIOSchedulingSettings(const Settings & settings, con
String DiskObjectStorage::getReadResourceName() const
{
std::unique_lock lock(resource_mutex);
return read_resource_name_from_config.empty() ? read_resource_name_from_sql : read_resource_name_from_config;
return getReadResourceNameNoLock();
}
String DiskObjectStorage::getWriteResourceName() const
{
std::unique_lock lock(resource_mutex);
return write_resource_name_from_config.empty() ? write_resource_name_from_sql : write_resource_name_from_config;
return getWriteResourceNameNoLock();
}
String DiskObjectStorage::getReadResourceNameNoLock() const
{
if (read_resource_name_from_config.empty())
return read_resource_name_from_sql.empty() ? read_resource_name_from_sql_any : read_resource_name_from_sql;
else
return read_resource_name_from_config;
}
String DiskObjectStorage::getWriteResourceNameNoLock() const
{
if (write_resource_name_from_config.empty())
return write_resource_name_from_sql.empty() ? write_resource_name_from_sql_any : write_resource_name_from_sql;
else
return write_resource_name_from_config;
}
std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(

View File

@ -226,6 +226,8 @@ private:
String getReadResourceName() const;
String getWriteResourceName() const;
String getReadResourceNameNoLock() const;
String getWriteResourceNameNoLock() const;
const String object_key_prefix;
LoggerPtr log;
@ -244,10 +246,12 @@ private:
const bool send_metadata;
mutable std::mutex resource_mutex;
String read_resource_name_from_config; // specified in disk config.xml
String write_resource_name_from_config; // specified in disk config.xml
String read_resource_name_from_sql; // described by CREATE RESOURCE queries
String write_resource_name_from_sql; // described by CREATE RESOURCE queries
String read_resource_name_from_config; // specified in disk config.xml read_resource element
String write_resource_name_from_config; // specified in disk config.xml write_resource element
String read_resource_name_from_sql; // described by CREATE RESOURCE query with READ DISK clause
String write_resource_name_from_sql; // described by CREATE RESOURCE query with WRITE DISK clause
String read_resource_name_from_sql_any; // described by CREATE RESOURCE query with READ ANY DISK clause
String write_resource_name_from_sql_any; // described by CREATE RESOURCE query with WRITE ANY DISK clause
scope_guard resource_changes_subscription;
std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;

View File

@ -52,17 +52,22 @@ void ASTCreateResourceQuery::formatImpl(const IAST::FormatSettings & format, IAS
{
case AccessMode::Read:
{
format.ostr << (format.hilite ? hilite_keyword : "") << "READ DISK ";
format.ostr << (format.hilite ? hilite_keyword : "") << "READ ";
break;
}
case AccessMode::Write:
{
format.ostr << (format.hilite ? hilite_keyword : "") << "WRITE DISK ";
format.ostr << (format.hilite ? hilite_keyword : "") << "WRITE ";
break;
}
}
format.ostr << (format.hilite ? hilite_none : "");
format.ostr << (format.hilite ? hilite_identifier : "") << backQuoteIfNeed(operation.disk) << (format.hilite ? hilite_none : "");
if (operation.disk)
{
format.ostr << "DISK " << (format.hilite ? hilite_none : "");
format.ostr << (format.hilite ? hilite_identifier : "") << backQuoteIfNeed(*operation.disk) << (format.hilite ? hilite_none : "");
}
else
format.ostr << "ANY DISK" << (format.hilite ? hilite_none : "");
}
format.ostr << ")";

View File

@ -18,7 +18,7 @@ public:
struct Operation
{
AccessMode mode;
String disk;
std::optional<String> disk; // Applies to all disks if not set
friend bool operator ==(const Operation & lhs, const Operation & rhs) { return lhs.mode == rhs.mode && lhs.disk == rhs.disk; }
friend bool operator !=(const Operation & lhs, const Operation & rhs) { return !(lhs == rhs); }

View File

@ -19,7 +19,7 @@ bool parseOneOperation(ASTCreateResourceQuery::Operation & operation, IParser::P
ASTCreateResourceQuery::AccessMode mode;
ASTPtr node;
String disk;
std::optional<String> disk;
if (ParserKeyword(Keyword::WRITE).ignore(pos, expected))
mode = ASTCreateResourceQuery::AccessMode::Write;
@ -28,14 +28,23 @@ bool parseOneOperation(ASTCreateResourceQuery::Operation & operation, IParser::P
else
return false;
if (!ParserKeyword(Keyword::DISK).ignore(pos, expected))
return false;
if (ParserKeyword(Keyword::ANY).ignore(pos, expected))
{
if (!ParserKeyword(Keyword::DISK).ignore(pos, expected))
return false;
}
else
{
if (!ParserKeyword(Keyword::DISK).ignore(pos, expected))
return false;
if (!disk_name_p.parse(pos, node, expected))
return false;
if (!disk_name_p.parse(pos, node, expected))
return false;
if (!tryGetIdentifierNameInto(node, disk))
return false;
disk.emplace();
if (!tryGetIdentifierNameInto(node, *disk))
return false;
}
operation.mode = mode;
operation.disk = std::move(disk);

View File

@ -4,7 +4,6 @@
#include <Parsers/queryToString.h>
#include <Storages/System/StorageSystemResources.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include "Parsers/ASTCreateQuery.h"
#include <Parsers/ASTCreateResourceQuery.h>
@ -40,12 +39,12 @@ void StorageSystemResources::fillData(MutableColumns & res_columns, ContextPtr c
{
case DB::ASTCreateResourceQuery::AccessMode::Read:
{
read_disks.emplace_back(disk);
read_disks.emplace_back(disk ? *disk : "ANY");
break;
}
case DB::ASTCreateResourceQuery::AccessMode::Write:
{
write_disks.emplace_back(disk);
write_disks.emplace_back(disk ? *disk : "ANY");
break;
}
}