Merge pull request #47009 from ClickHouse/disks-and-io-sheduler

This commit is contained in:
Sergei Trifonov 2023-09-11 07:22:14 +02:00 committed by GitHub
commit 08bad4d440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 806 additions and 33 deletions

View File

@ -1139,6 +1139,8 @@ Optional parameters:
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
- `read_resource` — Resource name to be used for [scheduling](/docs/en/operations/workload-scheduling.md) of read requests to this disk. Default value is empty string (IO scheduling is not enabled for this disk).
- `write_resource` — Resource name to be used for [scheduling](/docs/en/operations/workload-scheduling.md) of write requests to this disk. Default value is empty string (IO scheduling is not enabled for this disk).
### Configuring the cache
@ -1251,6 +1253,8 @@ Other parameters:
* `cache_enabled` - Allows to cache mark and index files on local FS. Default value is `true`.
* `cache_path` - Path on local FS where to store cached mark and index files. Default value is `/var/lib/clickhouse/disks/<disk_name>/cache/`.
* `skip_access_check` - If true, disk access checks will not be performed on disk start-up. Default value is `false`.
* `read_resource` — Resource name to be used for [scheduling](/docs/en/operations/workload-scheduling.md) of read requests to this disk. Default value is empty string (IO scheduling is not enabled for this disk).
* `write_resource` — Resource name to be used for [scheduling](/docs/en/operations/workload-scheduling.md) of write requests to this disk. Default value is empty string (IO scheduling is not enabled for this disk).
Examples of working configurations can be found in integration tests directory (see e.g. [test_merge_tree_azure_blob_storage](https://github.com/ClickHouse/ClickHouse/blob/master/tests/integration/test_merge_tree_azure_blob_storage/configs/config.d/storage_conf.xml) or [test_azure_blob_storage_zero_copy_replication](https://github.com/ClickHouse/ClickHouse/blob/master/tests/integration/test_azure_blob_storage_zero_copy_replication/configs/config.d/storage_conf.xml)).

View File

@ -0,0 +1,64 @@
---
slug: /en/operations/system-tables/scheduler
---
# scheduler
Contains information and status for [scheduling nodes](/docs/en/operations/workload-scheduling.md/#hierarchy) residing on the local server.
This table can be used for monitoring. The table contains a row for every scheduling node.
Example:
``` sql
SELECT *
FROM system.scheduler
WHERE resource = 'network_read' AND path = '/prio/fair/prod'
FORMAT Vertical
```
``` text
Row 1:
──────
resource: network_read
path: /prio/fair/prod
type: fifo
weight: 5
priority: 0
is_active: 0
active_children: 0
dequeued_requests: 67
dequeued_cost: 4692272
busy_periods: 63
vruntime: 938454.1999999989
system_vruntime: ᴺᵁᴸᴸ
queue_length: 0
queue_cost: 0
budget: -60524
is_satisfied: ᴺᵁᴸᴸ
inflight_requests: ᴺᵁᴸᴸ
inflight_cost: ᴺᵁᴸᴸ
max_requests: ᴺᵁᴸᴸ
max_cost: ᴺᵁᴸᴸ
```
Columns:
- `resource` (`String`) - Resource name
- `path` (`String`) - Path to a scheduling node within this resource scheduling hierarchy
- `type` (`String`) - Type of a scheduling node.
- `weight` (`Float64`) - Weight of a node, used by a parent node of `fair`` type.
- `priority` (`Int64`) - Priority of a node, used by a parent node of 'priority' type (Lower value means higher priority).
- `is_active` (`UInt8`) - Whether this node is currently active - has resource requests to be dequeued and constraints satisfied.
- `active_children` (`UInt64`) - The number of children in active state.
- `dequeued_requests` (`UInt64`) - The total number of resource requests dequeued from this node.
- `dequeued_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests dequeued from this node.
- `busy_periods` (`UInt64`) - The total number of deactivations of this node.
- `vruntime` (`Nullable(Float64)`) - For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner.
- `system_vruntime` (`Nullable(Float64)`) - For `fair` nodes only. Virtual runtime showing `vruntime` of the last processed resource request. Used during child activation as the new value of `vruntime`.
- `queue_length` (`Nullable(UInt64)`) - For `fifo` nodes only. Current number of resource requests residing in the queue.
- `queue_cost` (`Nullable(UInt64)`) - For `fifo` nodes only. Sum of costs (e.g. size in bytes) of all requests residing in the queue.
- `budget` (`Nullable(Int64)`) - For `fifo` nodes only. The number of available "cost units" for new resource requests. Can appear in case of discrepancy of estimated and real costs of resource requests (e.g. after read/write failure)
- `is_satisfied` (`Nullable(UInt8)`) - For constraint nodes only (e.g. `inflight_limit`). Equals `1` if all the constraint of this node are satisfied.
- `inflight_requests` (`Nullable(Int64)`) - For `inflight_limit` nodes only. The number of resource requests dequeued from this node, that are currently in consumption state.
- `inflight_cost` (`Nullable(Int64)`) - For `inflight_limit` nodes only. The sum of costs (e.g. bytes) of all resource requests dequeued from this node, that are currently in consumption state.
- `max_requests` (`Nullable(Int64)`) - For `inflight_limit` nodes only. Upper limit for `inflight_requests` leading to constraint violation.
- `max_cost` (`Nullable(Int64)`) - For `inflight_limit` nodes only. Upper limit for `inflight_cost` leading to constraint violation.

View File

@ -0,0 +1,153 @@
---
slug: /en/operations/workload-scheduling
sidebar_position: 69
sidebar_label: "Workload scheduling"
title: "Workload scheduling"
---
When ClickHouse execute multiple queries simultaneously, they may be using shared resources (e.g. disks). Scheduling constraints and policies can be applied to regulate how resources are utilized and shared between different workloads. For every resource a scheduling hierarchy can be configured. Hierarchy root represents a resource, while leafs are queues, holding requests that exceed resource capacity.
:::note
Currently only remote disk IO can be scheduled using described method. For CPU scheduling see settings about thread pools and [`concurrent_threads_soft_limit_num`](server-configuration-parameters/settings.md#concurrent_threads_soft_limit_num). For flexible memory limits see [Memory overcommit](settings/memory-overcommit.md)
:::
## Disk configuration {#disk-config}
To enable IO scheduling for a specific disk, you have to specify `read_resource` and/or `write_resource` in storage configuration. It says ClickHouse what resource should be used for every read and write requests with given disk. Read and write resource can refer to the same resource name, which is useful for local SSDs or HDDs. Multiple different disks also can refer to the same resource, which is useful for remote disks: if you want to be able to allow fair division of network bandwidth between e.g. "production" and "development" workloads.
Example:
```xml
<clickhouse>
<storage_configuration>
...
<disks>
<s3>
<type>s3</type>
<endpoint>https://clickhouse-public-datasets.s3.amazonaws.com/my-bucket/root-path/</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
<read_resource>network_read</read_resource>
<write_resource>network_write</write_resource>
</s3>
</disks>
<policies>
<s3_main>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3_main>
</policies>
</storage_configuration>
</clickhouse>
```
## Workload markup {#workload_markup}
Queries can be marked with setting `workload` to distinguish different workloads. If `workload` is not set, than value "default" is used. Note that you are able to specify the other value using settings profiles. Setting constraints can be used to make `workload` constant if you want all queries from the user to be marked with fixed value of `workload` setting.
Let's consider an example of a system with two different workloads: "production" and "development".
```sql
SELECT count() FROM my_table WHERE value = 42 SETTINGS workload = 'production'
SELECT count() FROM my_table WHERE value = 13 SETTINGS workload = 'development'
```
## Resource scheduling hierarchy {#hierarchy}
From the standpoint of scheduling subsystem a resource represents a hierarchy of scheduling nodes.
```mermaid
graph TD
subgraph network_read
nr_root(("/"))
-->|100 concurrent requests| nr_fair("fair")
-->|75% bandwidth| nr_prod["prod"]
nr_fair
-->|25% bandwidth| nr_dev["dev"]
end
subgraph network_write
nw_root(("/"))
-->|100 concurrent requests| nw_fair("fair")
-->|75% bandwidth| nw_prod["prod"]
nw_fair
-->|25% bandwidth| nw_dev["dev"]
end
```
**Possible node types:**
* `inflight_limit` (constraint) - blocks if either number of concurrent in-flight requests exceeds `max_requests`, or their total cost exceeds `max_cost`; must have a single child.
* `fair` (policy) - selects the next request to serve from one of its children nodes according to max-min fairness; children nodes can specify `weight` (default is 1).
* `priority` (policy) - selects the next request to serve from one of its children nodes according to static priorities (lower value means higher priority); children nodes can specify `priority` (default is 0).
* `fifo` (queue) - leaf of the hierarchy capable of holding requests that exceed resource capacity.
The following example shows how to define IO scheduling hierarchies shown in the picture:
```xml
<clickhouse>
<resources>
<network_read>
<node path="/">
<type>inflight_limit</type>
<max_requests>100</max_requests>
</node>
<node path="/fair">
<type>fair</type>
</node>
<node path="/fair/prod">
<type>fifo</type>
<weight>3</weight>
</node>
<node path="/fair/dev">
<type>fifo</type>
</node>
</network_read>
<network_write>
<node path="/">
<type>inflight_limit</type>
<max_requests>100</max_requests>
</node>
<node path="/fair">
<type>fair</type>
</node>
<node path="/fair/prod">
<type>fifo</type>
<weight>3</weight>
</node>
<node path="/fair/dev">
<type>fifo</type>
</node>
</network_write>
</resources>
</clickhouse>
```
## Workload classifiers {#workload_classifiers}
Workload classifiers are used to define mapping from `workload` specified by a query into leaf-queues that should be used for specific resources. At the moment, workload classification is simple: only static mapping is available.
Example:
```xml
<clickhouse>
<workload_classifiers>
<production>
<network_read>/fair/prod</network_read>
<network_write>/fair/prod</network_write>
</production>
<development>
<network_read>/fair/dev</network_read>
<network_write>/fair/dev</network_write>
</development>
<default>
<network_read>/fair/dev</network_read>
<network_write>/fair/dev</network_write>
</default>
</workload_classifiers>
</clickhouse>
```
## See also
- [system.scheduler](/docs/en/operations/system-tables/scheduler.md)

View File

@ -97,6 +97,14 @@ ThreadGroupPtr CurrentThread::getGroup()
return current_thread->getThreadGroup();
}
ContextPtr CurrentThread::getQueryContext()
{
if (unlikely(!current_thread))
return {};
return current_thread->getQueryContext();
}
std::string_view CurrentThread::getQueryId()
{
if (unlikely(!current_thread))

View File

@ -86,6 +86,10 @@ public:
static void finalizePerformanceCounters();
/// Returns a non-empty string if the thread is attached to a query
/// Returns attached query context
static ContextPtr getQueryContext();
static std::string_view getQueryId();
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor

View File

@ -268,6 +268,16 @@ ReadSettings Context::getReadSettings() const
return ReadSettings{};
}
ResourceManagerPtr Context::getResourceManager() const
{
return nullptr;
}
ClassifierPtr Context::getWorkloadClassifier() const
{
return nullptr;
}
void Context::initializeKeeperDispatcher([[maybe_unused]] bool start_async) const
{
const auto & config_ref = getConfigRef();

View File

@ -13,6 +13,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <IO/AsyncReadCounters.h>
#include <IO/IResourceManager.h>
#include <Poco/Util/Application.h>
@ -118,6 +119,10 @@ public:
ReadSettings getReadSettings() const;
/// Resource management related
ResourceManagerPtr getResourceManager() const;
ClassifierPtr getWorkloadClassifier() const;
std::shared_ptr<KeeperDispatcher> getKeeperDispatcher() const;
std::shared_ptr<KeeperDispatcher> tryGetKeeperDispatcher() const;
void initializeKeeperDispatcher(bool start_async) const;

View File

@ -7,6 +7,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/CurrentThread.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/logger_useful.h>
@ -65,6 +66,8 @@ DiskObjectStorage::DiskObjectStorage(
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(config.getBool(config_prefix + ".send_metadata", false))
, read_resource_name(config.getString(config_prefix + ".read_resource", ""))
, write_resource_name(config.getString(config_prefix + ".write_resource", ""))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
{}
@ -480,6 +483,32 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
config_prefix);
}
template <class Settings>
static inline Settings updateResourceLink(const Settings & settings, const String & resource_name)
{
if (resource_name.empty())
return settings;
if (auto query_context = CurrentThread::getQueryContext())
{
Settings result(settings);
result.resource_link = query_context->getWorkloadClassifier()->get(resource_name);
return result;
}
return settings;
}
String DiskObjectStorage::getReadResourceName() const
{
std::unique_lock lock(resource_mutex);
return read_resource_name;
}
String DiskObjectStorage::getWriteResourceName() const
{
std::unique_lock lock(resource_mutex);
return write_resource_name;
}
std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
const String & path,
const ReadSettings & settings,
@ -495,7 +524,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
return object_storage->readObjects(
storage_objects,
object_storage->getAdjustedSettingsFromMetadataFile(settings, path),
object_storage->getAdjustedSettingsFromMetadataFile(updateResourceLink(settings, getReadResourceName()), path),
read_hint,
file_size);
}
@ -513,7 +542,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
path,
buf_size,
mode,
object_storage->getAdjustedSettingsFromMetadataFile(settings, path));
object_storage->getAdjustedSettingsFromMetadataFile(updateResourceLink(settings, getWriteResourceName()), path));
}
Strings DiskObjectStorage::getBlobPath(const String & path) const
@ -543,6 +572,15 @@ void DiskObjectStorage::applyNewSettings(
/// FIXME we cannot use config_prefix that was passed through arguments because the disk may be wrapped with cache and we need another name
const auto config_prefix = "storage_configuration.disks." + name;
object_storage->applyNewSettings(config, config_prefix, context_);
{
std::unique_lock lock(resource_mutex);
if (String new_read_resource_name = config.getString(config_prefix + ".read_resource", ""); new_read_resource_name != read_resource_name)
read_resource_name = new_read_resource_name;
if (String new_write_resource_name = config.getString(config_prefix + ".write_resource", ""); new_write_resource_name != write_resource_name)
write_resource_name = new_write_resource_name;
}
IDisk::applyNewSettings(config, context_, config_prefix, disk_map);
}

View File

@ -212,6 +212,9 @@ private:
/// execution.
DiskTransactionPtr createObjectStorageTransaction();
String getReadResourceName() const;
String getWriteResourceName() const;
const String object_storage_root_path;
Poco::Logger * log;
@ -226,6 +229,10 @@ private:
const bool send_metadata;
mutable std::mutex resource_mutex;
String read_resource_name;
String write_resource_name;
std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;
};

View File

@ -54,6 +54,7 @@ void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
std::move(hdfs_storage),
config,
config_prefix);
disk->startup(context, skip_access_check);
return disk;

View File

@ -54,6 +54,7 @@ void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check)
object_storage,
config,
config_prefix);
disk->startup(context, skip_access_check);
return disk;
};

View File

@ -7,7 +7,7 @@
#include <boost/noncopyable.hpp>
#include <memory>
#include <unordered_map>
#include <functional>
namespace DB
{
@ -23,7 +23,7 @@ class IClassifier : private boost::noncopyable
public:
virtual ~IClassifier() {}
/// Returns ResouceLink that should be used to access resource.
/// Returns ResourceLink that should be used to access resource.
/// Returned link is valid until classifier destruction.
virtual ResourceLink get(const String & resource_name) = 0;
};
@ -46,6 +46,10 @@ public:
/// Obtain a classifier instance required to get access to resources.
/// Note that it holds resource configuration, so should be destructed when query is done.
virtual ClassifierPtr acquire(const String & classifier_name) = 0;
/// For introspection, see `system.scheduler` table
using VisitorFunc = std::function<void(const String & resource, const String & path, const String & type, const SchedulerNodePtr & node)>;
virtual void forEachNode(VisitorFunc visitor) = 0;
};
using ResourceManagerPtr = std::shared_ptr<IResourceManager>;

View File

@ -21,7 +21,7 @@ namespace DB
class ISchedulerConstraint : public ISchedulerNode
{
public:
ISchedulerConstraint(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
explicit ISchedulerConstraint(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerNode(event_queue_, config, config_prefix)
{}
@ -44,6 +44,9 @@ public:
}
}
/// For introspection of current state (true = satisfied, false = violated)
virtual bool isSatisfied() = 0;
protected:
// Reference to nearest parent that is also derived from ISchedulerConstraint.
// Request can traverse through multiple constraints while being dequeue from hierarchy,

View File

@ -3,6 +3,7 @@
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <Common/Priority.h>
#include <base/types.h>
#include <IO/ResourceRequest.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -68,6 +69,13 @@ struct SchedulerNodeInfo
{
priority.value = value;
}
// To check if configuration update required
bool equals(const SchedulerNodeInfo & o) const
{
// `parent` data is not compared intentionally (it is not part of configuration settings)
return weight == o.weight && priority == o.priority;
}
};
/*
@ -157,8 +165,11 @@ public:
virtual ~ISchedulerNode() {}
// Checks if two nodes configuration is equal
virtual bool equals(ISchedulerNode * other) = 0;
/// Checks if two nodes configuration is equal
virtual bool equals(ISchedulerNode * other)
{
return info.equals(other->info);
}
/// Attach new child
virtual void attachChild(const std::shared_ptr<ISchedulerNode> & child) = 0;
@ -176,7 +187,10 @@ public:
/// Returns true iff node is active
virtual bool isActive() = 0;
/// Returns the first request to be executed as the first component of resuting pair.
/// Returns number of active children
virtual size_t activeChildren() = 0;
/// Returns the first request to be executed as the first component of resulting pair.
/// The second pair component is `true` iff node is still active after dequeueing.
virtual std::pair<ResourceRequest *, bool> dequeueRequest() = 0;
@ -215,6 +229,11 @@ public:
String basename;
SchedulerNodeInfo info;
ISchedulerNode * parent = nullptr;
/// Introspection
std::atomic<UInt64> dequeued_requests{0};
std::atomic<ResourceCost> dequeued_cost{0};
std::atomic<UInt64> busy_periods{0};
};
using SchedulerNodePtr = std::shared_ptr<ISchedulerNode>;

View File

@ -50,6 +50,12 @@ public:
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;
/// For introspection
ResourceCost getBudget() const
{
return budget.get();
}
private:
// Allows multiple consumers to synchronize with common "debit/credit" balance.
// 1) (positive) to avoid wasting of allocated but not used resource (e.g in case of a failure);

View File

@ -21,7 +21,7 @@ ClassifierDescription::ClassifierDescription(const Poco::Util::AbstractConfigura
ClassifiersConfig::ClassifiersConfig(const Poco::Util::AbstractConfiguration & config)
{
Poco::Util::AbstractConfiguration::Keys keys;
const String config_prefix = "classifiers";
const String config_prefix = "workload_classifiers";
config.keys(config_prefix, keys);
for (const auto & key : keys)
classifiers.emplace(std::piecewise_construct,
@ -34,7 +34,7 @@ const ClassifierDescription & ClassifiersConfig::get(const String & classifier_n
if (auto it = classifiers.find(classifier_name); it != classifiers.end())
return it->second;
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Unknown classifier '{}' to access resources", classifier_name);
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Unknown workload classifier '{}' to access resources", classifier_name);
}
}

View File

@ -15,14 +15,14 @@ struct ClassifierDescription : std::unordered_map<String, String>
/*
* Loads a config with the following format:
* <classifiers>
* <workload_classifiers>
* <classifier1>
* <resource1>/path/to/queue</resource1>
* <resource2>/path/to/another/queue</resource2>
* </classifier1>
* ...
* <classifierN>...</classifierN>
* </classifiers>
* </workload_classifiers>
*/
class ClassifiersConfig
{

View File

@ -9,6 +9,7 @@
#include <map>
#include <tuple>
#include <future>
namespace DB
{
@ -217,13 +218,36 @@ void DynamicResourceManager::updateConfiguration(const Poco::Util::AbstractConfi
ClassifierPtr DynamicResourceManager::acquire(const String & classifier_name)
{
// Acquire a reference to the current state
StatePtr state_;
StatePtr state_ref;
{
std::lock_guard lock{mutex};
state_ = state;
state_ref = state;
}
return std::make_shared<Classifier>(state_, classifier_name);
return std::make_shared<Classifier>(state_ref, classifier_name);
}
void DynamicResourceManager::forEachNode(IResourceManager::VisitorFunc visitor)
{
// Acquire a reference to the current state
StatePtr state_ref;
{
std::lock_guard lock{mutex};
state_ref = state;
}
std::promise<void> promise;
auto future = promise.get_future();
scheduler.event_queue->enqueue([state_ref, visitor, &promise]
{
for (auto & [name, resource] : state_ref->resources)
for (auto & [path, node] : resource->nodes)
visitor(name, path, node.type, node.ptr);
promise.set_value();
});
// Block until execution is done in the scheduler thread
future.get();
}
void registerDynamicResourceManager(ResourceManagerFactory & factory)

View File

@ -19,7 +19,7 @@ namespace DB
* `ClassifierPtr` is acquired and held.
*
* Manager can update configuration after initialization. During update, new version of resources are also
* attached to scheduler, so multiple version can coexist for a short perid. This will violate constraints
* attached to scheduler, so multiple version can coexist for a short period. This will violate constraints
* (e.g. in-fly-limit), because different version have independent nodes to impose constraints, the same
* violation will apply to fairness. Old version exists as long as there is at least one classifier
* instance referencing it. Classifiers are typically attached to queries and will be destructed with them.
@ -30,6 +30,7 @@ public:
DynamicResourceManager();
void updateConfiguration(const Poco::Util::AbstractConfiguration & config) override;
ClassifierPtr acquire(const String & classifier_name) override;
void forEachNode(VisitorFunc visitor) override;
private:
/// Holds everything required to work with one specific configuration

View File

@ -6,6 +6,7 @@
#include <Common/Stopwatch.h>
#include <algorithm>
#include <optional>
#include <unordered_map>
#include <vector>
@ -50,6 +51,8 @@ public:
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<FairPolicy *>(other))
return true;
return false;
@ -176,8 +179,11 @@ public:
max_vruntime = 0;
}
system_vruntime = max_vruntime;
busy_periods++;
}
dequeued_requests++;
dequeued_cost += request->cost;
return {request, heap_size > 0};
}
@ -186,12 +192,33 @@ public:
return heap_size > 0;
}
size_t activeChildren() override
{
return heap_size;
}
void activateChild(ISchedulerNode * child) override
{
// Find this child; this is O(1), thanks to inactive index we hold in `parent.idx`
activateChildImpl(child->info.parent.idx);
}
// For introspection
double getSystemVRuntime() const
{
return system_vruntime;
}
std::optional<double> getChildVRuntime(ISchedulerNode * child) const
{
for (const auto & item : items)
{
if (child == item.child)
return item.vruntime;
}
return std::nullopt;
}
private:
void activateChildImpl(size_t inactive_idx)
{

View File

@ -30,6 +30,8 @@ public:
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<FifoQueue *>(other))
return true;
return false;
@ -39,6 +41,7 @@ public:
{
std::unique_lock lock(mutex);
request->enqueue_ns = clock_gettime_ns();
queue_cost += request->cost;
bool was_empty = requests.empty();
requests.push_back(request);
if (was_empty)
@ -52,6 +55,11 @@ public:
return {nullptr, false};
ResourceRequest * result = requests.front();
requests.pop_front();
if (requests.empty())
busy_periods++;
queue_cost -= result->cost;
dequeued_requests++;
dequeued_cost += result->cost;
return {result, !requests.empty()};
}
@ -61,6 +69,11 @@ public:
return !requests.empty();
}
size_t activeChildren() override
{
return 0;
}
void activateChild(ISchedulerNode *) override
{
assert(false); // queue cannot have children
@ -83,8 +96,15 @@ public:
return nullptr;
}
std::pair<UInt64, Int64> getQueueLengthAndCost()
{
std::unique_lock lock(mutex);
return {requests.size(), queue_cost};
}
private:
std::mutex mutex;
Int64 queue_cost = 0;
std::deque<ResourceRequest *> requests;
};

View File

@ -42,6 +42,8 @@ public:
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<PriorityPolicy *>(other))
return true;
return false;
@ -113,8 +115,12 @@ public:
{
std::pop_heap(items.begin(), items.end());
items.pop_back();
if (items.empty())
busy_periods++;
}
dequeued_requests++;
dequeued_cost += request->cost;
return {request, !items.empty()};
}
@ -123,6 +129,11 @@ public:
return !items.empty();
}
size_t activeChildren() override
{
return items.size();
}
void activateChild(ISchedulerNode * child) override
{
bool activate_parent = items.empty();

View File

@ -27,6 +27,8 @@ public:
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<SemaphoreConstraint *>(other))
return max_requests == o->max_requests && max_cost == o->max_cost;
return false;
@ -78,7 +80,10 @@ public:
requests++;
cost += request->cost;
child_active = child_now_active;
if (!active())
busy_periods++;
dequeued_requests++;
dequeued_cost += request->cost;
return {request, active()};
}
@ -113,6 +118,30 @@ public:
return active();
}
size_t activeChildren() override
{
std::unique_lock lock(mutex);
return child_active;
}
bool isSatisfied() override
{
std::unique_lock lock(mutex);
return satisfied();
}
std::pair<Int64, Int64> getInflights()
{
std::unique_lock lock(mutex);
return {requests, cost};
}
std::pair<Int64, Int64> getLimits()
{
std::unique_lock lock(mutex);
return {max_requests, max_cost};
}
private:
bool satisfied() const
{

View File

@ -22,6 +22,11 @@ public:
ClassifierPtr acquire(const String & classifier_name) override;
void forEachNode(VisitorFunc visitor) override
{
UNUSED(visitor);
}
private:
struct Resource
{

View File

@ -24,10 +24,10 @@ TEST(IOResourceDynamicResourceManager, Smoke)
<node path="/fair/B"><type>fifo</type><weight>3</weight></node>
</res1>
</resources>
<classifiers>
<workload_classifiers>
<A><res1>/fair/A</res1></A>
<B><res1>/fair/B</res1></B>
</classifiers>
</workload_classifiers>
</clickhouse>
)CONFIG");
@ -71,11 +71,11 @@ TEST(IOResourceDynamicResourceManager, Fairness)
<node path="/fair/leader"><type>fifo</type></node>
</res1>
</resources>
<classifiers>
<workload_classifiers>
<A><res1>/fair/A</res1></A>
<B><res1>/fair/B</res1></B>
<leader><res1>/fair/leader</res1></leader>
</classifiers>
</workload_classifiers>
</clickhouse>
)CONFIG");

View File

@ -24,10 +24,10 @@ TEST(IOResourceStaticResourceManager, Smoke)
<node path="/prio/B"><priority>1</priority></node>
</res1>
</resources>
<classifiers>
<workload_classifiers>
<A><res1>/prio/A</res1></A>
<B><res1>/prio/B</res1></B>
</classifiers>
</workload_classifiers>
</clickhouse>
)CONFIG");
@ -70,13 +70,13 @@ TEST(IOResourceStaticResourceManager, Prioritization)
<node path="/prio/leader"></node>
</res1>
</resources>
<classifiers>
<workload_classifiers>
<A><res1>/prio/A</res1></A>
<B><res1>/prio/B</res1></B>
<C><res1>/prio/C</res1></C>
<D><res1>/prio/D</res1></D>
<leader><res1>/prio/leader</res1></leader>
</classifiers>
</workload_classifiers>
</clickhouse>
)CONFIG");

View File

@ -48,6 +48,11 @@ public:
available.fetch_add(estimated_cost - real_cost);
}
ResourceCost get() const
{
return available.load();
}
private:
std::atomic<ResourceCost> available = 0; // requested - consumed
};

View File

@ -45,8 +45,7 @@ class ResourceRequest
{
public:
/// Cost of request execution; should be filled before request enqueueing.
/// NOTE: If cost is not known in advance, credit model can be used:
/// NOTE: for the first request use 1 and
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
ResourceCost cost;
/// Request outcome

View File

@ -97,6 +97,8 @@ public:
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
return false;
if (auto * o = dynamic_cast<SchedulerRoot *>(other))
return true;
return false;
@ -156,6 +158,8 @@ public:
else
current = current->next; // Just move round-robin pointer
dequeued_requests++;
dequeued_cost += request->cost;
return {request, current != nullptr};
}
@ -164,6 +168,11 @@ public:
return current != nullptr;
}
size_t activeChildren() override
{
return 0;
}
void activateChild(ISchedulerNode * child) override
{
activate(TResource::get(child->info));
@ -205,6 +214,7 @@ private:
value->next = nullptr;
value->prev = nullptr;
current = nullptr;
busy_periods++;
return;
}
else // Just move current to next to avoid invalidation

View File

@ -1292,10 +1292,12 @@ ResourceManagerPtr Context::getResourceManager() const
return shared->resource_manager;
}
ClassifierPtr Context::getClassifier() const
ClassifierPtr Context::getWorkloadClassifier() const
{
auto lock = getLock();
return getResourceManager()->acquire(getSettingsRef().workload);
if (!classifier)
classifier = getResourceManager()->acquire(getSettingsRef().workload);
return classifier;
}

View File

@ -413,6 +413,10 @@ private:
/// Temporary data for query execution accounting.
TemporaryDataOnDiskScopePtr temp_data_on_disk;
/// Resource classifier for a query, holds smart pointers required for ResourceLink
/// NOTE: all resource links became invalid after `classifier` destruction
mutable ClassifierPtr classifier;
/// Prepared sets that can be shared between different queries. One use case is when is to share prepared sets between
/// mutation tasks of one mutation executed against different parts of the same table.
PreparedSetsCachePtr prepared_sets_cache;
@ -578,7 +582,7 @@ public:
/// Resource management related
ResourceManagerPtr getResourceManager() const;
ClassifierPtr getClassifier() const;
ClassifierPtr getWorkloadClassifier() const;
/// We have to copy external tables inside executeQuery() to track limits. Therefore, set callback for it. Must set once.
void setExternalTablesInitializer(ExternalTablesInitializer && initializer);

View File

@ -0,0 +1,107 @@
#include <Storages/System/StorageSystemScheduler.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <IO/ISchedulerNode.h>
#include <IO/IResourceManager.h>
#include <IO/Resource/FairPolicy.h>
#include <IO/Resource/PriorityPolicy.h>
#include <IO/Resource/SemaphoreConstraint.h>
#include <IO/Resource/FifoQueue.h>
#include <Interpreters/Context.h>
#include "IO/ResourceRequest.h"
namespace DB
{
NamesAndTypesList StorageSystemScheduler::getNamesAndTypes()
{
NamesAndTypesList names_and_types{
{"resource", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"type", std::make_shared<DataTypeString>()},
{"weight", std::make_shared<DataTypeFloat64>()},
{"priority", std::make_shared<DataTypeInt64>()},
{"is_active", std::make_shared<DataTypeUInt8>()},
{"active_children", std::make_shared<DataTypeUInt64>()},
{"dequeued_requests", std::make_shared<DataTypeUInt64>()},
{"dequeued_cost", std::make_shared<DataTypeInt64>()},
{"busy_periods", std::make_shared<DataTypeUInt64>()},
{"vruntime", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>())},
{"system_vruntime", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>())},
{"queue_length", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
{"queue_cost", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())},
{"budget", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())},
{"is_satisfied", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt8>())},
{"inflight_requests", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())},
{"inflight_cost", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())},
{"max_requests", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())},
{"max_cost", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())},
};
return names_and_types;
}
void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
context->getResourceManager()->forEachNode([&] (const String & resource, const String & path, const String & type, const SchedulerNodePtr & node)
{
size_t i = 0;
res_columns[i++]->insert(resource);
res_columns[i++]->insert(path);
res_columns[i++]->insert(type);
res_columns[i++]->insert(node->info.weight);
res_columns[i++]->insert(node->info.priority.value);
res_columns[i++]->insert(node->isActive());
res_columns[i++]->insert(node->activeChildren());
res_columns[i++]->insert(node->dequeued_requests.load());
res_columns[i++]->insert(node->dequeued_cost.load());
res_columns[i++]->insert(node->busy_periods.load());
Field vruntime;
Field system_vruntime;
Field queue_length;
Field queue_cost;
Field budget;
Field is_satisfied;
Field inflight_requests;
Field inflight_cost;
Field max_requests;
Field max_cost;
if (auto * parent = dynamic_cast<FairPolicy *>(node->parent))
{
if (auto value = parent->getChildVRuntime(node.get()))
vruntime = *value;
}
if (auto * ptr = dynamic_cast<FairPolicy *>(node.get()))
system_vruntime = ptr->getSystemVRuntime();
if (auto * ptr = dynamic_cast<FifoQueue *>(node.get()))
std::tie(queue_length, queue_cost) = ptr->getQueueLengthAndCost();
if (auto * ptr = dynamic_cast<ISchedulerQueue *>(node.get()))
budget = ptr->getBudget();
if (auto * ptr = dynamic_cast<ISchedulerConstraint *>(node.get()))
is_satisfied = ptr->isSatisfied();
if (auto * ptr = dynamic_cast<SemaphoreConstraint *>(node.get()))
{
std::tie(inflight_requests, inflight_cost) = ptr->getInflights();
std::tie(max_requests, max_cost) = ptr->getLimits();
}
res_columns[i++]->insert(vruntime);
res_columns[i++]->insert(system_vruntime);
res_columns[i++]->insert(queue_length);
res_columns[i++]->insert(queue_cost);
res_columns[i++]->insert(budget);
res_columns[i++]->insert(is_satisfied);
res_columns[i++]->insert(inflight_requests);
res_columns[i++]->insert(inflight_cost);
res_columns[i++]->insert(max_requests);
res_columns[i++]->insert(max_cost);
});
}
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
/// Implements `system.scheduler` table, which allows you to get information about scheduling nodes.
class StorageSystemScheduler final : public IStorageSystemOneBlock<StorageSystemScheduler>
{
public:
std::string getName() const override { return "SystemScheduler"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
};
}

View File

@ -83,6 +83,7 @@
#include <Storages/System/StorageSystemDroppedTables.h>
#include <Storages/System/StorageSystemZooKeeperConnection.h>
#include <Storages/System/StorageSystemJemalloc.h>
#include <Storages/System/StorageSystemScheduler.h>
#if USE_RDKAFKA
#include <Storages/System/StorageSystemKafkaConsumers.h>
@ -148,6 +149,7 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
attach<StorageSystemBackups>(context, system_database, "backups");
attach<StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache");
attach<StorageSystemDroppedTables>(context, system_database, "dropped_tables");
attach<StorageSystemScheduler>(context, system_database, "scheduler");
#if USE_RDKAFKA
attach<StorageSystemKafkaConsumers>(context, system_database, "kafka_consumers");
#endif

View File

@ -0,0 +1,62 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
<s3_max_put_rps>10</s3_max_put_rps>
<s3_max_get_rps>10</s3_max_get_rps>
<read_resource>network_read</read_resource>
<write_resource>network_write</write_resource>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<resources>
<network_read>
<node path="/"> <type>inflight_limit</type><max_cost>1000000</max_cost></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/admin"> <type>fifo</type><priority>0</priority></node>
<node path="/prio/fair"> <type>fair</type><priority>1</priority></node>
<node path="/prio/fair/prod"><type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev"> <type>fifo</type><weight>1</weight></node>
</network_read>
<network_write>
<node path="/"> <type>inflight_limit</type><max_cost>1000000</max_cost></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/admin"> <type>fifo</type><priority>0</priority></node>
<node path="/prio/fair"> <type>fair</type><priority>1</priority></node>
<node path="/prio/fair/prod"><type>fifo</type><weight>9</weight></node>
<node path="/prio/fair/dev"> <type>fifo</type><weight>1</weight></node>
</network_write>
</resources>
<workload_classifiers>
<admin>
<network_read>/prio/admin</network_read>
<network_write>/prio/admin</network_write>
</admin>
<production>
<network_read>/prio/fair/prod</network_read>
<network_write>/prio/fair/prod</network_write>
</production>
<development>
<network_read>/prio/fair/dev</network_read>
<network_write>/prio/fair/dev</network_write>
</development>
<default>
<network_read>/prio/fair/dev</network_read>
<network_write>/prio/fair/dev</network_write>
</default>
</workload_classifiers>
</clickhouse>

View File

@ -0,0 +1,112 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import time
import threading
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
stay_alive=True,
main_configs=["configs/scheduler.xml"],
with_minio=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield
finally:
cluster.shutdown()
def test_s3_disk():
node.query(
f"""
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, storage_policy='s3';
"""
)
def write_query(workload):
try:
node.query(
f"insert into data select * from numbers(1e5) settings workload='{workload}'"
)
except QueryRuntimeException:
pass
thread1 = threading.Thread(target=write_query, args=["development"])
thread2 = threading.Thread(target=write_query, args=["production"])
thread3 = threading.Thread(target=write_query, args=["admin"])
thread1.start()
thread2.start()
thread3.start()
thread3.join()
thread2.join()
thread1.join()
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/admin'"
)
== "1\n"
)
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/fair/dev'"
)
== "1\n"
)
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_write' and path='/prio/fair/prod'"
)
== "1\n"
)
def read_query(workload):
try:
node.query(f"select sum(key*key) from data settings workload='{workload}'")
except QueryRuntimeException:
pass
thread1 = threading.Thread(target=read_query, args=["development"])
thread2 = threading.Thread(target=read_query, args=["production"])
thread3 = threading.Thread(target=read_query, args=["admin"])
thread1.start()
thread2.start()
thread3.start()
thread3.join()
thread2.join()
thread1.join()
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/admin'"
)
== "1\n"
)
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/fair/dev'"
)
== "1\n"
)
assert (
node.query(
f"select dequeued_requests>0 from system.scheduler where resource='network_read' and path='/prio/fair/prod'"
)
== "1\n"
)

View File

@ -1346,17 +1346,18 @@ defaultRoles
defaultValueOfArgumentType
defaultValueOfTypeName
delim
deltaLake
deltaSum
deltaSumTimestamp
deltalake
deltaLake
deltasum
deltaSum
deltasumtimestamp
deltaSumTimestamp
demangle
denormalize
denormalized
denormalizing
denormals
dequeued
deserialization
deserialized
deserializing
@ -1441,6 +1442,7 @@ farmFingerprint
farmHash
fastops
fcoverage
fifo
filesystem
filesystemAvailable
filesystemCapacity
@ -1608,6 +1610,7 @@ incrementing
indexHint
indexOf
infi
inflight
initcap
initcapUTF
initialQueryID
@ -2538,6 +2541,7 @@ visitParamExtractRaw
visitParamExtractString
visitParamExtractUInt
visitParamHas
vruntime
wchc
wchs
webpage