diff --git a/.gitignore b/.gitignore
index 4bc162c1b0f..8a745655cbf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -159,6 +159,7 @@ website/package-lock.json
/programs/server/store
/programs/server/uuid
/programs/server/coordination
+/programs/server/workload
# temporary test files
tests/queries/0_stateless/test_*
diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md
index 76d6f5388e3..02fa5a8ca58 100644
--- a/docs/en/operations/server-configuration-parameters/settings.md
+++ b/docs/en/operations/server-configuration-parameters/settings.md
@@ -3224,6 +3224,34 @@ Default value: "default"
**See Also**
- [Workload Scheduling](/docs/en/operations/workload-scheduling.md)
+## workload_path {#workload_path}
+
+The directory used as a storage for all `CREATE WORKLOAD` and `CREATE RESOURCE` queries. By default `/workload/` folder under server working directory is used.
+
+**Example**
+
+``` xml
+/var/lib/clickhouse/workload/
+```
+
+**See Also**
+- [Workload Hierarchy](/docs/en/operations/workload-scheduling.md#workloads)
+- [workload_zookeeper_path](#workload_zookeeper_path)
+
+## workload_zookeeper_path {#workload_zookeeper_path}
+
+The path to a ZooKeeper node, which is used as a storage for all `CREATE WORKLOAD` and `CREATE RESOURCE` queries. For consistency all SQL definitions are stored as a value of this single znode. By default ZooKeeper is not used and definitions are stored on [disk](#workload_path).
+
+**Example**
+
+``` xml
+/clickhouse/workload/definitions.sql
+```
+
+**See Also**
+- [Workload Hierarchy](/docs/en/operations/workload-scheduling.md#workloads)
+- [workload_path](#workload_path)
+
## max_authentication_methods_per_user {#max_authentication_methods_per_user}
The maximum number of authentication methods a user can be created with or altered to.
diff --git a/docs/en/operations/system-tables/resources.md b/docs/en/operations/system-tables/resources.md
new file mode 100644
index 00000000000..6329f05f610
--- /dev/null
+++ b/docs/en/operations/system-tables/resources.md
@@ -0,0 +1,37 @@
+---
+slug: /en/operations/system-tables/resources
+---
+# resources
+
+Contains information for [resources](/docs/en/operations/workload-scheduling.md#workload_entity_storage) residing on the local server. The table contains a row for every resource.
+
+Example:
+
+``` sql
+SELECT *
+FROM system.resources
+FORMAT Vertical
+```
+
+``` text
+Row 1:
+──────
+name: io_read
+read_disks: ['s3']
+write_disks: []
+create_query: CREATE RESOURCE io_read (READ DISK s3)
+
+Row 2:
+──────
+name: io_write
+read_disks: []
+write_disks: ['s3']
+create_query: CREATE RESOURCE io_write (WRITE DISK s3)
+```
+
+Columns:
+
+- `name` (`String`) - Resource name.
+- `read_disks` (`Array(String)`) - The array of disk names that uses this resource for read operations.
+- `write_disks` (`Array(String)`) - The array of disk names that uses this resource for write operations.
+- `create_query` (`String`) - The definition of the resource.
diff --git a/docs/en/operations/system-tables/workloads.md b/docs/en/operations/system-tables/workloads.md
new file mode 100644
index 00000000000..d9c62372044
--- /dev/null
+++ b/docs/en/operations/system-tables/workloads.md
@@ -0,0 +1,40 @@
+---
+slug: /en/operations/system-tables/workloads
+---
+# workloads
+
+Contains information for [workloads](/docs/en/operations/workload-scheduling.md#workload_entity_storage) residing on the local server. The table contains a row for every workload.
+
+Example:
+
+``` sql
+SELECT *
+FROM system.workloads
+FORMAT Vertical
+```
+
+``` text
+Row 1:
+──────
+name: production
+parent: all
+create_query: CREATE WORKLOAD production IN `all` SETTINGS weight = 9
+
+Row 2:
+──────
+name: development
+parent: all
+create_query: CREATE WORKLOAD development IN `all`
+
+Row 3:
+──────
+name: all
+parent:
+create_query: CREATE WORKLOAD `all`
+```
+
+Columns:
+
+- `name` (`String`) - Workload name.
+- `parent` (`String`) - Parent workload name.
+- `create_query` (`String`) - The definition of the workload.
diff --git a/docs/en/operations/workload-scheduling.md b/docs/en/operations/workload-scheduling.md
index 08629492ec6..a43bea7a5b1 100644
--- a/docs/en/operations/workload-scheduling.md
+++ b/docs/en/operations/workload-scheduling.md
@@ -43,6 +43,20 @@ Example:
```
+An alternative way to express which disks are used by a resource is SQL syntax:
+
+```sql
+CREATE RESOURCE resource_name (WRITE DISK disk1, READ DISK disk2)
+```
+
+Resource could be used for any number of disk for READ or WRITE or both for READ and WRITE. There a syntax allowing to use a resource for all the disks:
+
+```sql
+CREATE RESOURCE all_io (READ ANY DISK, WRITE ANY DISK);
+```
+
+Note that server configuration options have priority over SQL way to define resources.
+
## Workload markup {#workload_markup}
Queries can be marked with setting `workload` to distinguish different workloads. If `workload` is not set, than value "default" is used. Note that you are able to specify the other value using settings profiles. Setting constraints can be used to make `workload` constant if you want all queries from the user to be marked with fixed value of `workload` setting.
@@ -153,9 +167,48 @@ Example:
```
+## Workload hierarchy (SQL only) {#workloads}
+
+Defining resources and classifiers in XML could be challenging. ClickHouse provides SQL syntax that is much more convenient. All resources that were created with `CREATE RESOURCE` share the same structure of the hierarchy, but could differ in some aspects. Every workload created with `CREATE WORKLOAD` maintains a few automatically created scheduling nodes for every resource. A child workload can be created inside another parent workload. Here is the example that defines exactly the same hierarchy as XML configuration above:
+
+```sql
+CREATE RESOURCE network_write (WRITE DISK s3)
+CREATE RESOURCE network_read (READ DISK s3)
+CREATE WORKLOAD all SETTINGS max_requests = 100
+CREATE WORKLOAD development IN all
+CREATE WORKLOAD production IN all SETTINGS weight = 3
+```
+
+The name of a leaf workload without children could be used in query settings `SETTINGS workload = 'name'`. Note that workload classifiers are also created automatically when using SQL syntax.
+
+To customize workload the following settings could be used:
+* `priority` - sibling workloads are served according to static priority values (lower value means higher priority).
+* `weight` - sibling workloads having the same static priority share resources according to weights.
+* `max_requests` - the limit on the number of concurrent resource requests in this workload.
+* `max_cost` - the limit on the total inflight bytes count of concurrent resource requests in this workload.
+* `max_speed` - the limit on byte processing rate of this workload (the limit is independent for every resource).
+* `max_burst` - maximum number of bytes that could be processed by the workload without being throttled (for every resource independently).
+
+Note that workload settings are translated into a proper set of scheduling nodes. For more details, see the description of the scheduling node [types and options](#hierarchy).
+
+There is no way to specify different hierarchies of workloads for different resources. But there is a way to specify different workload setting value for a specific resource:
+
+```sql
+CREATE OR REPLACE WORKLOAD all SETTINGS max_requests = 100, max_speed = 1000000 FOR network_read, max_speed = 2000000 FOR network_write
+```
+
+Also note that workload or resource could not be dropped if it is referenced from another workload. To update a definition of a workload use `CREATE OR REPLACE WORKLOAD` query.
+
+## Workloads and resources storage {#workload_entity_storage}
+Definitions of all workloads and resources in the form of `CREATE WORKLOAD` and `CREATE RESOURCE` queries are stored persistently either on disk at `workload_path` or in ZooKeeper at `workload_zookeeper_path`. ZooKeeper storage is recommended to achieve consistency between nodes. Alternatively `ON CLUSTER` clause could be used along with disk storage.
+
## See also
- [system.scheduler](/docs/en/operations/system-tables/scheduler.md)
+ - [system.workloads](/docs/en/operations/system-tables/workloads.md)
+ - [system.resources](/docs/en/operations/system-tables/resources.md)
- [merge_workload](/docs/en/operations/settings/merge-tree-settings.md#merge_workload) merge tree setting
- [merge_workload](/docs/en/operations/server-configuration-parameters/settings.md#merge_workload) global server setting
- [mutation_workload](/docs/en/operations/settings/merge-tree-settings.md#mutation_workload) merge tree setting
- [mutation_workload](/docs/en/operations/server-configuration-parameters/settings.md#mutation_workload) global server setting
+ - [workload_path](/docs/en/operations/server-configuration-parameters/settings.md#workload_path) global server setting
+ - [workload_zookeeper_path](/docs/en/operations/server-configuration-parameters/settings.md#workload_zookeeper_path) global server setting
diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp
index d061d134e69..826100f68e2 100644
--- a/programs/server/Server.cpp
+++ b/programs/server/Server.cpp
@@ -86,7 +86,7 @@
#include
#include
#include
-#include
+#include
#include
#include
#include "MetricsTransmitter.h"
@@ -920,7 +920,6 @@ try
registerFormats();
registerRemoteFileMetadatas();
registerSchedulerNodes();
- registerResourceManagers();
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
@@ -2253,6 +2252,8 @@ try
database_catalog.assertDatabaseExists(default_database);
/// Load user-defined SQL functions.
global_context->getUserDefinedSQLObjectsStorage().loadObjects();
+ /// Load WORKLOADs and RESOURCEs.
+ global_context->getWorkloadEntityStorage().loadEntities();
global_context->getRefreshSet().setRefreshesStopped(false);
}
diff --git a/programs/server/config.xml b/programs/server/config.xml
index 15649b5c95d..9807f8c0d5a 100644
--- a/programs/server/config.xml
+++ b/programs/server/config.xml
@@ -1399,6 +1399,10 @@
If not specified they will be stored locally. -->
+
+
+
diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h
index 010d11e533a..242dfcd8c35 100644
--- a/src/Access/Common/AccessType.h
+++ b/src/Access/Common/AccessType.h
@@ -99,6 +99,8 @@ enum class AccessType : uint8_t
M(CREATE_ARBITRARY_TEMPORARY_TABLE, "", GLOBAL, CREATE) /* allows to create and manipulate temporary tables
with arbitrary table engine */\
M(CREATE_FUNCTION, "", GLOBAL, CREATE) /* allows to execute CREATE FUNCTION */ \
+ M(CREATE_WORKLOAD, "", GLOBAL, CREATE) /* allows to execute CREATE WORKLOAD */ \
+ M(CREATE_RESOURCE, "", GLOBAL, CREATE) /* allows to execute CREATE RESOURCE */ \
M(CREATE_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE, "", GROUP, ALL) /* allows to execute {CREATE|ATTACH} */ \
\
@@ -108,6 +110,8 @@ enum class AccessType : uint8_t
implicitly enabled by the grant DROP_TABLE */\
M(DROP_DICTIONARY, "", DICTIONARY, DROP) /* allows to execute {DROP|DETACH} DICTIONARY */\
M(DROP_FUNCTION, "", GLOBAL, DROP) /* allows to execute DROP FUNCTION */\
+ M(DROP_WORKLOAD, "", GLOBAL, DROP) /* allows to execute DROP WORKLOAD */\
+ M(DROP_RESOURCE, "", GLOBAL, DROP) /* allows to execute DROP RESOURCE */\
M(DROP_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute DROP NAMED COLLECTION */\
M(DROP, "", GROUP, ALL) /* allows to execute {DROP|DETACH} */\
\
diff --git a/src/Access/ContextAccess.cpp b/src/Access/ContextAccess.cpp
index 949fd37e403..a5d0451714b 100644
--- a/src/Access/ContextAccess.cpp
+++ b/src/Access/ContextAccess.cpp
@@ -701,15 +701,17 @@ bool ContextAccess::checkAccessImplHelper(const ContextPtr & context, AccessFlag
const AccessFlags dictionary_ddl = AccessType::CREATE_DICTIONARY | AccessType::DROP_DICTIONARY;
const AccessFlags function_ddl = AccessType::CREATE_FUNCTION | AccessType::DROP_FUNCTION;
+ const AccessFlags workload_ddl = AccessType::CREATE_WORKLOAD | AccessType::DROP_WORKLOAD;
+ const AccessFlags resource_ddl = AccessType::CREATE_RESOURCE | AccessType::DROP_RESOURCE;
const AccessFlags table_and_dictionary_ddl = table_ddl | dictionary_ddl;
const AccessFlags table_and_dictionary_and_function_ddl = table_ddl | dictionary_ddl | function_ddl;
const AccessFlags write_table_access = AccessType::INSERT | AccessType::OPTIMIZE;
const AccessFlags write_dcl_access = AccessType::ACCESS_MANAGEMENT - AccessType::SHOW_ACCESS;
- const AccessFlags not_readonly_flags = write_table_access | table_and_dictionary_and_function_ddl | write_dcl_access | AccessType::SYSTEM | AccessType::KILL_QUERY;
+ const AccessFlags not_readonly_flags = write_table_access | table_and_dictionary_and_function_ddl | workload_ddl | resource_ddl | write_dcl_access | AccessType::SYSTEM | AccessType::KILL_QUERY;
const AccessFlags not_readonly_1_flags = AccessType::CREATE_TEMPORARY_TABLE;
- const AccessFlags ddl_flags = table_ddl | dictionary_ddl | function_ddl;
+ const AccessFlags ddl_flags = table_ddl | dictionary_ddl | function_ddl | workload_ddl | resource_ddl;
const AccessFlags introspection_flags = AccessType::INTROSPECTION;
};
static const PrecalculatedFlags precalc;
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 39499cc577d..3627d760d4c 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -136,6 +136,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Common/NamedCollections)
+add_headers_and_sources(dbms Common/Scheduler/Workload)
if (TARGET ch_contrib::amqp_cpp)
add_headers_and_sources(dbms Storages/RabbitMQ)
diff --git a/src/Common/Priority.h b/src/Common/Priority.h
index 8952fe4dd5a..f0e5787ae91 100644
--- a/src/Common/Priority.h
+++ b/src/Common/Priority.h
@@ -6,6 +6,7 @@
/// Separate type (rather than `Int64` is used just to avoid implicit conversion errors and to default-initialize
struct Priority
{
- Int64 value = 0; /// Note that lower value means higher priority.
- constexpr operator Int64() const { return value; } /// NOLINT
+ using Value = Int64;
+ Value value = 0; /// Note that lower value means higher priority.
+ constexpr operator Value() const { return value; } /// NOLINT
};
diff --git a/src/Common/Scheduler/IResourceManager.h b/src/Common/Scheduler/IResourceManager.h
index 8a7077ac3d5..c6f41346e11 100644
--- a/src/Common/Scheduler/IResourceManager.h
+++ b/src/Common/Scheduler/IResourceManager.h
@@ -26,6 +26,9 @@ class IClassifier : private boost::noncopyable
public:
virtual ~IClassifier() = default;
+ /// Returns true iff resource access is allowed by this classifier
+ virtual bool has(const String & resource_name) = 0;
+
/// Returns ResourceLink that should be used to access resource.
/// Returned link is valid until classifier destruction.
virtual ResourceLink get(const String & resource_name) = 0;
@@ -46,12 +49,15 @@ public:
/// Initialize or reconfigure manager.
virtual void updateConfiguration(const Poco::Util::AbstractConfiguration & config) = 0;
+ /// Returns true iff given resource is controlled through this manager.
+ virtual bool hasResource(const String & resource_name) const = 0;
+
/// Obtain a classifier instance required to get access to resources.
/// Note that it holds resource configuration, so should be destructed when query is done.
virtual ClassifierPtr acquire(const String & classifier_name) = 0;
/// For introspection, see `system.scheduler` table
- using VisitorFunc = std::function;
+ using VisitorFunc = std::function;
virtual void forEachNode(VisitorFunc visitor) = 0;
};
diff --git a/src/Common/Scheduler/ISchedulerConstraint.h b/src/Common/Scheduler/ISchedulerConstraint.h
index a976206de74..3bee9c1b424 100644
--- a/src/Common/Scheduler/ISchedulerConstraint.h
+++ b/src/Common/Scheduler/ISchedulerConstraint.h
@@ -15,8 +15,7 @@ namespace DB
* When constraint is again satisfied, scheduleActivation() is called from finishRequest().
*
* Derived class behaviour requirements:
- * - dequeueRequest() must fill `request->constraint` iff it is nullptr;
- * - finishRequest() must be recursive: call to `parent_constraint->finishRequest()`.
+ * - dequeueRequest() must call `request->addConstraint()`.
*/
class ISchedulerConstraint : public ISchedulerNode
{
@@ -25,34 +24,16 @@ public:
: ISchedulerNode(event_queue_, config, config_prefix)
{}
+ ISchedulerConstraint(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
+ : ISchedulerNode(event_queue_, info_)
+ {}
+
/// Resource consumption by `request` is finished.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void finishRequest(ResourceRequest * request) = 0;
- void setParent(ISchedulerNode * parent_) override
- {
- ISchedulerNode::setParent(parent_);
-
- // Assign `parent_constraint` to the nearest parent derived from ISchedulerConstraint
- for (ISchedulerNode * node = parent_; node != nullptr; node = node->parent)
- {
- if (auto * constraint = dynamic_cast(node))
- {
- parent_constraint = constraint;
- break;
- }
- }
- }
-
/// For introspection of current state (true = satisfied, false = violated)
virtual bool isSatisfied() = 0;
-
-protected:
- // Reference to nearest parent that is also derived from ISchedulerConstraint.
- // Request can traverse through multiple constraints while being dequeue from hierarchy,
- // while finishing request should traverse the same chain in reverse order.
- // NOTE: it must be immutable after initialization, because it is accessed in not thread-safe way from finishRequest()
- ISchedulerConstraint * parent_constraint = nullptr;
};
}
diff --git a/src/Common/Scheduler/ISchedulerNode.h b/src/Common/Scheduler/ISchedulerNode.h
index 0705c4f0a35..5e1239de274 100644
--- a/src/Common/Scheduler/ISchedulerNode.h
+++ b/src/Common/Scheduler/ISchedulerNode.h
@@ -57,7 +57,13 @@ struct SchedulerNodeInfo
SchedulerNodeInfo() = default;
- explicit SchedulerNodeInfo(const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
+ explicit SchedulerNodeInfo(double weight_, Priority priority_ = {})
+ {
+ setWeight(weight_);
+ setPriority(priority_);
+ }
+
+ explicit SchedulerNodeInfo(const Poco::Util::AbstractConfiguration & config, const String & config_prefix = {})
{
setWeight(config.getDouble(config_prefix + ".weight", weight));
setPriority(config.getInt64(config_prefix + ".priority", priority));
@@ -68,7 +74,7 @@ struct SchedulerNodeInfo
if (value <= 0 || !isfinite(value))
throw Exception(
ErrorCodes::INVALID_SCHEDULER_NODE,
- "Negative and non-finite node weights are not allowed: {}",
+ "Zero, negative and non-finite node weights are not allowed: {}",
value);
weight = value;
}
@@ -78,6 +84,11 @@ struct SchedulerNodeInfo
priority.value = value;
}
+ void setPriority(Priority value)
+ {
+ priority = value;
+ }
+
// To check if configuration update required
bool equals(const SchedulerNodeInfo & o) const
{
@@ -123,7 +134,14 @@ public:
, info(config, config_prefix)
{}
- virtual ~ISchedulerNode() = default;
+ ISchedulerNode(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
+ : event_queue(event_queue_)
+ , info(info_)
+ {}
+
+ virtual ~ISchedulerNode();
+
+ virtual const String & getTypeName() const = 0;
/// Checks if two nodes configuration is equal
virtual bool equals(ISchedulerNode * other)
@@ -134,10 +152,11 @@ public:
/// Attach new child
virtual void attachChild(const std::shared_ptr & child) = 0;
- /// Detach and destroy child
+ /// Detach child
+ /// NOTE: child might be destroyed if the only reference was stored in parent
virtual void removeChild(ISchedulerNode * child) = 0;
- /// Get attached child by name
+ /// Get attached child by name (for tests only)
virtual ISchedulerNode * getChild(const String & child_name) = 0;
/// Activation of child due to the first pending request
@@ -147,7 +166,7 @@ public:
/// Returns true iff node is active
virtual bool isActive() = 0;
- /// Returns number of active children
+ /// Returns number of active children (for introspection only).
virtual size_t activeChildren() = 0;
/// Returns the first request to be executed as the first component of resulting pair.
@@ -155,10 +174,10 @@ public:
virtual std::pair dequeueRequest() = 0;
/// Returns full path string using names of every parent
- String getPath()
+ String getPath() const
{
String result;
- ISchedulerNode * ptr = this;
+ const ISchedulerNode * ptr = this;
while (ptr->parent)
{
result = "/" + ptr->basename + result;
@@ -168,10 +187,7 @@ public:
}
/// Attach to a parent (used by attachChild)
- virtual void setParent(ISchedulerNode * parent_)
- {
- parent = parent_;
- }
+ void setParent(ISchedulerNode * parent_);
protected:
/// Notify parents about the first pending request or constraint becoming satisfied.
@@ -307,6 +323,15 @@ public:
pending.notify_one();
}
+ /// Removes an activation from queue
+ void cancelActivation(ISchedulerNode * node)
+ {
+ std::unique_lock lock{mutex};
+ if (node->is_linked())
+ activations.erase(activations.iterator_to(*node));
+ node->activation_event_id = 0;
+ }
+
/// Process single event if it exists
/// Note that postponing constraint are ignored, use it to empty the queue including postponed events on shutdown
/// Returns `true` iff event has been processed
@@ -471,6 +496,20 @@ private:
std::atomic manual_time{TimePoint()}; // for tests only
};
+inline ISchedulerNode::~ISchedulerNode()
+{
+ // Make sure there is no dangling reference in activations queue
+ event_queue->cancelActivation(this);
+}
+
+inline void ISchedulerNode::setParent(ISchedulerNode * parent_)
+{
+ parent = parent_;
+ // Avoid activation of a detached node
+ if (parent == nullptr)
+ event_queue->cancelActivation(this);
+}
+
inline void ISchedulerNode::scheduleActivation()
{
if (likely(parent))
diff --git a/src/Common/Scheduler/ISchedulerQueue.h b/src/Common/Scheduler/ISchedulerQueue.h
index b7a51870a24..6c77cee6b9d 100644
--- a/src/Common/Scheduler/ISchedulerQueue.h
+++ b/src/Common/Scheduler/ISchedulerQueue.h
@@ -21,6 +21,10 @@ public:
: ISchedulerNode(event_queue_, config, config_prefix)
{}
+ ISchedulerQueue(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
+ : ISchedulerNode(event_queue_, info_)
+ {}
+
// Wrapper for `enqueueRequest()` that should be used to account for available resource budget
// Returns `estimated_cost` that should be passed later to `adjustBudget()`
[[ nodiscard ]] ResourceCost enqueueRequestUsingBudget(ResourceRequest * request)
@@ -47,6 +51,11 @@ public:
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual bool cancelRequest(ResourceRequest * request) = 0;
+ /// Fails all the resource requests in queue and marks this queue as not usable.
+ /// Afterwards any new request will be failed on `enqueueRequest()`.
+ /// NOTE: This is done for queues that are about to be destructed.
+ virtual void purgeQueue() = 0;
+
/// For introspection
ResourceCost getBudget() const
{
diff --git a/src/Common/Scheduler/Nodes/ClassifiersConfig.cpp b/src/Common/Scheduler/Nodes/ClassifiersConfig.cpp
index 3be61801149..455d0880aa6 100644
--- a/src/Common/Scheduler/Nodes/ClassifiersConfig.cpp
+++ b/src/Common/Scheduler/Nodes/ClassifiersConfig.cpp
@@ -5,11 +5,6 @@
namespace DB
{
-namespace ErrorCodes
-{
- extern const int RESOURCE_NOT_FOUND;
-}
-
ClassifierDescription::ClassifierDescription(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
@@ -31,9 +26,11 @@ ClassifiersConfig::ClassifiersConfig(const Poco::Util::AbstractConfiguration & c
const ClassifierDescription & ClassifiersConfig::get(const String & classifier_name)
{
+ static ClassifierDescription empty;
if (auto it = classifiers.find(classifier_name); it != classifiers.end())
return it->second;
- throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Unknown workload classifier '{}' to access resources", classifier_name);
+ else
+ return empty;
}
}
diff --git a/src/Common/Scheduler/Nodes/ClassifiersConfig.h b/src/Common/Scheduler/Nodes/ClassifiersConfig.h
index 186c49943ad..62db719568b 100644
--- a/src/Common/Scheduler/Nodes/ClassifiersConfig.h
+++ b/src/Common/Scheduler/Nodes/ClassifiersConfig.h
@@ -10,6 +10,7 @@ namespace DB
/// Mapping of resource name into path string (e.g. "disk1" -> "/path/to/class")
struct ClassifierDescription : std::unordered_map
{
+ ClassifierDescription() = default;
ClassifierDescription(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
};
diff --git a/src/Common/Scheduler/Nodes/DynamicResourceManager.cpp b/src/Common/Scheduler/Nodes/CustomResourceManager.cpp
similarity index 84%
rename from src/Common/Scheduler/Nodes/DynamicResourceManager.cpp
rename to src/Common/Scheduler/Nodes/CustomResourceManager.cpp
index 5bf884fc3df..b9ab89ee2b8 100644
--- a/src/Common/Scheduler/Nodes/DynamicResourceManager.cpp
+++ b/src/Common/Scheduler/Nodes/CustomResourceManager.cpp
@@ -1,7 +1,6 @@
-#include
+#include
#include
-#include
#include
#include
@@ -21,7 +20,7 @@ namespace ErrorCodes
extern const int INVALID_SCHEDULER_NODE;
}
-DynamicResourceManager::State::State(EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config)
+CustomResourceManager::State::State(EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config)
: classifiers(config)
{
Poco::Util::AbstractConfiguration::Keys keys;
@@ -35,7 +34,7 @@ DynamicResourceManager::State::State(EventQueue * event_queue, const Poco::Util:
}
}
-DynamicResourceManager::State::Resource::Resource(
+CustomResourceManager::State::Resource::Resource(
const String & name,
EventQueue * event_queue,
const Poco::Util::AbstractConfiguration & config,
@@ -92,7 +91,7 @@ DynamicResourceManager::State::Resource::Resource(
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "undefined root node path '/' for resource '{}'", name);
}
-DynamicResourceManager::State::Resource::~Resource()
+CustomResourceManager::State::Resource::~Resource()
{
// NOTE: we should rely on `attached_to` and cannot use `parent`,
// NOTE: because `parent` can be `nullptr` in case attachment is still in event queue
@@ -106,14 +105,14 @@ DynamicResourceManager::State::Resource::~Resource()
}
}
-DynamicResourceManager::State::Node::Node(const String & name, EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
+CustomResourceManager::State::Node::Node(const String & name, EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
: type(config.getString(config_prefix + ".type", "fifo"))
, ptr(SchedulerNodeFactory::instance().get(type, event_queue, config, config_prefix))
{
ptr->basename = name;
}
-bool DynamicResourceManager::State::Resource::equals(const DynamicResourceManager::State::Resource & o) const
+bool CustomResourceManager::State::Resource::equals(const CustomResourceManager::State::Resource & o) const
{
if (nodes.size() != o.nodes.size())
return false;
@@ -130,14 +129,14 @@ bool DynamicResourceManager::State::Resource::equals(const DynamicResourceManage
return true;
}
-bool DynamicResourceManager::State::Node::equals(const DynamicResourceManager::State::Node & o) const
+bool CustomResourceManager::State::Node::equals(const CustomResourceManager::State::Node & o) const
{
if (type != o.type)
return false;
return ptr->equals(o.ptr.get());
}
-DynamicResourceManager::Classifier::Classifier(const DynamicResourceManager::StatePtr & state_, const String & classifier_name)
+CustomResourceManager::Classifier::Classifier(const CustomResourceManager::StatePtr & state_, const String & classifier_name)
: state(state_)
{
// State is immutable, but nodes are mutable and thread-safe
@@ -162,20 +161,25 @@ DynamicResourceManager::Classifier::Classifier(const DynamicResourceManager::Sta
}
}
-ResourceLink DynamicResourceManager::Classifier::get(const String & resource_name)
+bool CustomResourceManager::Classifier::has(const String & resource_name)
+{
+ return resources.contains(resource_name);
+}
+
+ResourceLink CustomResourceManager::Classifier::get(const String & resource_name)
{
if (auto iter = resources.find(resource_name); iter != resources.end())
return iter->second;
throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "Access denied to resource '{}'", resource_name);
}
-DynamicResourceManager::DynamicResourceManager()
+CustomResourceManager::CustomResourceManager()
: state(new State())
{
scheduler.start();
}
-void DynamicResourceManager::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
+void CustomResourceManager::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
StatePtr new_state = std::make_shared(scheduler.event_queue, config);
@@ -217,7 +221,13 @@ void DynamicResourceManager::updateConfiguration(const Poco::Util::AbstractConfi
// NOTE: after mutex unlock `state` became available for Classifier(s) and must be immutable
}
-ClassifierPtr DynamicResourceManager::acquire(const String & classifier_name)
+bool CustomResourceManager::hasResource(const String & resource_name) const
+{
+ std::lock_guard lock{mutex};
+ return state->resources.contains(resource_name);
+}
+
+ClassifierPtr CustomResourceManager::acquire(const String & classifier_name)
{
// Acquire a reference to the current state
StatePtr state_ref;
@@ -229,7 +239,7 @@ ClassifierPtr DynamicResourceManager::acquire(const String & classifier_name)
return std::make_shared(state_ref, classifier_name);
}
-void DynamicResourceManager::forEachNode(IResourceManager::VisitorFunc visitor)
+void CustomResourceManager::forEachNode(IResourceManager::VisitorFunc visitor)
{
// Acquire a reference to the current state
StatePtr state_ref;
@@ -244,7 +254,7 @@ void DynamicResourceManager::forEachNode(IResourceManager::VisitorFunc visitor)
{
for (auto & [name, resource] : state_ref->resources)
for (auto & [path, node] : resource->nodes)
- visitor(name, path, node.type, node.ptr);
+ visitor(name, path, node.ptr.get());
promise.set_value();
});
@@ -252,9 +262,4 @@ void DynamicResourceManager::forEachNode(IResourceManager::VisitorFunc visitor)
future.get();
}
-void registerDynamicResourceManager(ResourceManagerFactory & factory)
-{
- factory.registerMethod("dynamic");
-}
-
}
diff --git a/src/Common/Scheduler/Nodes/DynamicResourceManager.h b/src/Common/Scheduler/Nodes/CustomResourceManager.h
similarity index 86%
rename from src/Common/Scheduler/Nodes/DynamicResourceManager.h
rename to src/Common/Scheduler/Nodes/CustomResourceManager.h
index 4b0a3a48b61..900a9c4e50b 100644
--- a/src/Common/Scheduler/Nodes/DynamicResourceManager.h
+++ b/src/Common/Scheduler/Nodes/CustomResourceManager.h
@@ -10,7 +10,9 @@ namespace DB
{
/*
- * Implementation of `IResourceManager` supporting arbitrary dynamic hierarchy of scheduler nodes.
+ * Implementation of `IResourceManager` supporting arbitrary hierarchy of scheduler nodes.
+ * Scheduling hierarchies for every resource is described through server xml or yaml configuration.
+ * Configuration could be changed dynamically without server restart.
* All resources are controlled by single root `SchedulerRoot`.
*
* State of manager is set of resources attached to the scheduler. States are referenced by classifiers.
@@ -24,11 +26,12 @@ namespace DB
* violation will apply to fairness. Old version exists as long as there is at least one classifier
* instance referencing it. Classifiers are typically attached to queries and will be destructed with them.
*/
-class DynamicResourceManager : public IResourceManager
+class CustomResourceManager : public IResourceManager
{
public:
- DynamicResourceManager();
+ CustomResourceManager();
void updateConfiguration(const Poco::Util::AbstractConfiguration & config) override;
+ bool hasResource(const String & resource_name) const override;
ClassifierPtr acquire(const String & classifier_name) override;
void forEachNode(VisitorFunc visitor) override;
@@ -79,6 +82,7 @@ private:
{
public:
Classifier(const StatePtr & state_, const String & classifier_name);
+ bool has(const String & resource_name) override;
ResourceLink get(const String & resource_name) override;
private:
std::unordered_map resources; // accessible resources by names
@@ -86,7 +90,7 @@ private:
};
SchedulerRoot scheduler;
- std::mutex mutex;
+ mutable std::mutex mutex;
StatePtr state;
};
diff --git a/src/Common/Scheduler/Nodes/FairPolicy.h b/src/Common/Scheduler/Nodes/FairPolicy.h
index 246642ff2fd..a865711c460 100644
--- a/src/Common/Scheduler/Nodes/FairPolicy.h
+++ b/src/Common/Scheduler/Nodes/FairPolicy.h
@@ -28,7 +28,7 @@ namespace ErrorCodes
* of a child is set to vruntime of "start" of the last request. This guarantees immediate processing
* of at least single request of newly activated children and thus best isolation and scheduling latency.
*/
-class FairPolicy : public ISchedulerNode
+class FairPolicy final : public ISchedulerNode
{
/// Scheduling state of a child
struct Item
@@ -48,6 +48,23 @@ public:
: ISchedulerNode(event_queue_, config, config_prefix)
{}
+ FairPolicy(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
+ : ISchedulerNode(event_queue_, info_)
+ {}
+
+ ~FairPolicy() override
+ {
+ // We need to clear `parent` in all children to avoid dangling references
+ while (!children.empty())
+ removeChild(children.begin()->second.get());
+ }
+
+ const String & getTypeName() const override
+ {
+ static String type_name("fair");
+ return type_name;
+ }
+
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
diff --git a/src/Common/Scheduler/Nodes/FifoQueue.h b/src/Common/Scheduler/Nodes/FifoQueue.h
index 90f8fffe665..9502fae1a45 100644
--- a/src/Common/Scheduler/Nodes/FifoQueue.h
+++ b/src/Common/Scheduler/Nodes/FifoQueue.h
@@ -23,13 +23,28 @@ namespace ErrorCodes
/*
* FIFO queue to hold pending resource requests
*/
-class FifoQueue : public ISchedulerQueue
+class FifoQueue final : public ISchedulerQueue
{
public:
FifoQueue(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: ISchedulerQueue(event_queue_, config, config_prefix)
{}
+ FifoQueue(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
+ : ISchedulerQueue(event_queue_, info_)
+ {}
+
+ ~FifoQueue() override
+ {
+ purgeQueue();
+ }
+
+ const String & getTypeName() const override
+ {
+ static String type_name("fifo");
+ return type_name;
+ }
+
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
@@ -42,6 +57,8 @@ public:
void enqueueRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
+ if (is_not_usable)
+ throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Scheduler queue is about to be destructed");
queue_cost += request->cost;
bool was_empty = requests.empty();
requests.push_back(*request);
@@ -66,6 +83,8 @@ public:
bool cancelRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
+ if (is_not_usable)
+ return false; // Any request should already be failed or executed
if (request->is_linked())
{
// It's impossible to check that `request` is indeed inserted to this queue and not another queue.
@@ -88,6 +107,19 @@ public:
return false;
}
+ void purgeQueue() override
+ {
+ std::lock_guard lock(mutex);
+ is_not_usable = true;
+ while (!requests.empty())
+ {
+ ResourceRequest * request = &requests.front();
+ requests.pop_front();
+ request->failed(std::make_exception_ptr(
+ Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Scheduler queue with resource request is about to be destructed")));
+ }
+ }
+
bool isActive() override
{
std::lock_guard lock(mutex);
@@ -131,6 +163,7 @@ private:
std::mutex mutex;
Int64 queue_cost = 0;
boost::intrusive::list requests;
+ bool is_not_usable = false;
};
}
diff --git a/src/Common/Scheduler/Nodes/IOResourceManager.cpp b/src/Common/Scheduler/Nodes/IOResourceManager.cpp
new file mode 100644
index 00000000000..e2042a29a80
--- /dev/null
+++ b/src/Common/Scheduler/Nodes/IOResourceManager.cpp
@@ -0,0 +1,532 @@
+#include
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+
+#include
+#include
+#include