IO scheduling subsystem

This commit is contained in:
serxa 2022-09-27 13:26:41 +00:00
parent b1d8593d18
commit b057d07977
33 changed files with 2313 additions and 1 deletions

View File

@ -69,6 +69,8 @@
#include <QueryPipeline/ConnectionCollector.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <IO/Resource/registerSchedulerNodes.h>
#include <IO/Resource/registerResourceManagers.h>
#include <Common/Config/ConfigReloader.h>
#include <Server/HTTPHandlerFactory.h>
#include "MetricsTransmitter.h"
@ -678,6 +680,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
registerDisks();
registerFormats();
registerRemoteFileMetadatas();
registerSchedulerNodes();
registerResourceManagers();
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
@ -1248,6 +1252,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getDistributedSchedulePool().increaseThreadsCount(new_pool_size);
}
if (config->has("resources"))
{
global_context->getResourceManager()->updateConfiguration(*config);
}
if (!initial_loading)
{
/// We do not load ZooKeeper configuration on the first config loading

View File

@ -91,6 +91,7 @@ add_headers_and_sources(clickhouse_common_io Common)
add_headers_and_sources(clickhouse_common_io Common/HashTable)
add_headers_and_sources(clickhouse_common_io IO)
add_headers_and_sources(clickhouse_common_io IO/Archives)
add_headers_and_sources(clickhouse_common_io IO/Resource)
add_headers_and_sources(clickhouse_common_io IO/S3)
list (REMOVE_ITEM clickhouse_common_io_sources Common/malloc.cpp Common/new_delete.cpp)

View File

@ -637,6 +637,9 @@
M(666, CANNOT_USE_CACHE) \
M(667, NOT_INITIALIZED) \
M(668, INVALID_STATE) \
M(669, INVALID_SCHEDULER_NODE) \
M(670, RESOURCE_ACCESS_DENIED) \
M(671, RESOURCE_NOT_FOUND) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -635,6 +635,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Map, additional_table_filters, "", "Additional filter expression which would be applied after reading from specified table. Syntax: {'table1': 'expression', 'database.table2': 'expression'}", 0) \
M(String, additional_result_filter, "", "Additional filter expression which would be applied to query result", 0) \
\
M(String, workload, "default", "Name of workload to be used to access resources", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \

53
src/IO/IResourceManager.h Normal file
View File

@ -0,0 +1,53 @@
#pragma once
#include <IO/ResourceRequest.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <unordered_map>
namespace DB
{
/*
* Instance of derived class holds everything required for resource consumption,
* including resources currently registered at `SchedulerRoot`. This is required to avoid
* problems during configuration update. Do not hold instances longer than required.
* Should be created on query start and destructed when query is done.
*/
class IClassifier : private boost::noncopyable
{
public:
virtual ~IClassifier() {}
/// Returns ResouceLink that should be used to access resource.
/// Returned link is valid until classifier destruction.
virtual ResourceLink get(const String & resource_name) = 0;
};
using ClassifierPtr = std::shared_ptr<IClassifier>;
/*
* Represents control plane of resource scheduling. Derived class is responsible for reading
* configuration, creating all required `ISchedulerNode` objects and
* managing their lifespan.
*/
class IResourceManager : private boost::noncopyable
{
public:
virtual ~IResourceManager() {}
/// Initialize or reconfigure manager.
virtual void updateConfiguration(const Poco::Util::AbstractConfiguration & config) = 0;
/// Obtain a classifier instance required to get access to resources.
/// Note that it holds resource configuration, so should be destructed when query is done.
virtual ClassifierPtr acquire(const String & classifier_name) = 0;
};
using ResourceManagerPtr = std::shared_ptr<IResourceManager>;
}

View File

@ -0,0 +1,55 @@
#pragma once
#include <IO/ISchedulerNode.h>
namespace DB
{
/*
* Constraint defined on the set of requests in consumption state.
* It allows to track two events:
* - dequeueRequest(): resource consumption begins
* - finishRequest(): resource consumption finishes
* This allows to keep track of in-flight requests and implement different constraints (e.g. in-flight limit).
* When constraint is violated, node must be deactivated by dequeueRequest() returning `false`.
* When constraint is again satisfied, scheduleActivation() is called from finishRequest().
*
* Derived class behaviour requirements:
* - dequeueRequest() must fill `request->constraint` iff it is nullptr;
* - finishRequest() must be recursive: call to `parent_constraint->finishRequest()`.
*/
class ISchedulerConstraint : public ISchedulerNode
{
public:
ISchedulerConstraint(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerNode(event_queue_, config, config_prefix)
{}
/// Resource consumption by `request` is finised.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void finishRequest(ResourceRequest * request) = 0;
void setParent(ISchedulerNode * parent_) override
{
ISchedulerNode::setParent(parent_);
// Assign `parent_constraint` to the nearest parent derived from ISchedulerConstraint
for (ISchedulerNode * node = parent_; node != nullptr; node = node->parent)
{
if (auto * constraint = dynamic_cast<ISchedulerConstraint *>(node))
{
parent_constraint = constraint;
break;
}
}
}
protected:
// Reference to nearest parent that is also derived from ISchedulerConstraint.
// Request can traverse through multiple constraints while being dequeue from hierarchy,
// while finishing request should traverse the same chain in reverse order.
// NOTE: it must be immutable after initialization, because it is accessed in not thread-safe way from finishRequest()
ISchedulerConstraint * parent_constraint = nullptr;
};
}

221
src/IO/ISchedulerNode.h Normal file
View File

@ -0,0 +1,221 @@
#pragma once
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <IO/ResourceRequest.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/XMLConfiguration.h>
#include <boost/noncopyable.hpp>
#include <deque>
#include <functional>
#include <memory>
#include <mutex>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
}
class ISchedulerNode;
inline const Poco::Util::AbstractConfiguration & emptyConfig()
{
static Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration();
return *config;
}
/*
* Info read and write for scheduling purposes by parent
*/
struct SchedulerNodeInfo
{
double weight = 1.0; /// Weight of this node among it's siblings
Int64 priority = 0; /// Priority of this node among it's siblings (higher value means higher priority)
/// Arbitrary data accessed/stored by parent
union {
size_t idx;
void * ptr;
} parent;
SchedulerNodeInfo() = default;
SchedulerNodeInfo(const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
{
setWeight(config.getDouble(config_prefix + ".weight", weight));
setPriority(config.getInt64(config_prefix + ".priority", priority));
}
void setWeight(double value)
{
if (value <= 0 || !isfinite(value))
throw Exception(
ErrorCodes::INVALID_SCHEDULER_NODE,
"Negative and non-finite node weights are not allowed: {}",
value);
weight = value;
}
void setPriority(int value)
{
priority = value;
}
};
/*
* Simple waitable thread-safe FIFO task queue.
* Intended to hold postponed events for later handling (usually by scheduler thread).
*/
class EventQueue
{
public:
using Event = std::function<void()>;
void enqueue(Event&& event)
{
std::unique_lock lock{mutex};
bool was_empty = queue.empty();
queue.emplace_back(event);
if (was_empty)
pending.notify_one();
}
/// Process single event if it exists
/// Returns `true` iff event has been processed
bool tryProcess()
{
std::unique_lock lock{mutex};
if (queue.empty())
return false;
Event event = std::move(queue.front());
queue.pop_front();
lock.unlock(); // do not hold queue mutext while processing events
event();
return true;
}
/// Wait for single event (if not available) and process it
void process()
{
std::unique_lock lock{mutex};
pending.wait(lock, [&] { return !queue.empty(); });
Event event = std::move(queue.front());
queue.pop_front();
lock.unlock(); // do not hold queue mutext while processing events
event();
}
private:
std::mutex mutex;
std::condition_variable pending;
std::deque<Event> queue;
};
/*
* Node of hierarchy for scheduling requests for resource. Base class for all
* kinds of scheduling elements (queues, policies, constraints and schedulers).
*
* Root node is a scheduler, which has it's thread to dequeue requests,
* execute requests (see ResourceRequest) and process events in a thread-safe manner.
* Immediate children of the scheduler represent independent resources.
* Each resource has it's own hierarchy to achieve required scheduling policies.
* Non-leaf nodes do not hold requests, but keep scheduling state
* (e.g. consumption history, amount of in-flight requests, etc).
* Leafs of hierarchy are queues capable of holding pending requests.
*
* scheduler (SchedulerRoot)
* / \
* constraint constraint (SemaphoreConstraint)
* | |
* policy policy (PriorityPolicy)
* / \ / \
* q1 q2 q3 q4 (FifoQueue)
*
* Dequeueing request from an inner node will dequeue request from one of active leaf-queues in its subtree.
* Node is considered to be active iff:
* - it has at least one pending request in one of leaves of it's subtree;
* - and enforced constraints, if any, are satisfied
* (e.g. amount of concurrent requests is not greater than some number).
*
* All methods must be called only from scheduler thread for thread-safety.
*/
class ISchedulerNode : private boost::noncopyable
{
public:
ISchedulerNode(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: event_queue(event_queue_)
, info(config, config_prefix)
{}
virtual ~ISchedulerNode() {}
// Checks if two nodes configuration is equal
virtual bool equals(ISchedulerNode * other) = 0;
/// Attach new child
virtual void attachChild(const std::shared_ptr<ISchedulerNode> & child) = 0;
/// Detach and destroy child
virtual void removeChild(ISchedulerNode * child) = 0;
/// Get attached child by name
virtual ISchedulerNode * getChild(const String & child_name) = 0;
/// Activation of child due to the first pending request
/// Should be called on leaf node (i.e. queue) to propagate activation signal through chain to the root
virtual void activateChild(ISchedulerNode * child) = 0;
/// Returns true iff node is active
virtual bool isActive() = 0;
/// Returns the first request to be executed as the first component of resuting pair.
/// The second pair component is `true` iff node is still active after dequeueing.
virtual std::pair<ResourceRequest *, bool> dequeueRequest() = 0;
/// Returns full path string using names of every parent
String getPath()
{
String result;
ISchedulerNode * ptr = this;
while (ptr->parent)
{
result = "/" + ptr->basename + result;
ptr = ptr->parent;
}
return result.empty() ? "/" : result;
}
/// Attach to a parent (used by attachChild)
virtual void setParent(ISchedulerNode * parent_)
{
parent = parent_;
}
protected:
/// Notify parents about the first pending request or constraint becoming satisfied.
/// Postponed to be handled in scheduler thread, so it is intended to be called from outside.
void scheduleActivation()
{
if (likely(parent))
{
event_queue->enqueue([this] { parent->activateChild(this); });
}
}
public:
EventQueue * const event_queue;
String basename;
SchedulerNodeInfo info;
ISchedulerNode * parent = nullptr;
};
using SchedulerNodePtr = std::shared_ptr<ISchedulerNode>;
}

26
src/IO/ISchedulerQueue.h Normal file
View File

@ -0,0 +1,26 @@
#pragma once
#include <IO/ISchedulerNode.h>
#include <memory>
namespace DB
{
/*
* Queue for pending requests for specific resource, leaf of hierarchy.
*/
class ISchedulerQueue : public ISchedulerNode
{
public:
ISchedulerQueue(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerNode(event_queue_, config, config_prefix)
{}
/// Enqueue new request to be executed using underlying resource.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;
};
}

View File

@ -0,0 +1,40 @@
#include <IO/Resource/ClassifiersConfig.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int RESOURCE_NOT_FOUND;
}
ClassifierDescription::ClassifierDescription(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & key : keys)
emplace(key, config.getString(config_prefix + "." + key));
}
ClassifiersConfig::ClassifiersConfig(const Poco::Util::AbstractConfiguration & config)
{
Poco::Util::AbstractConfiguration::Keys keys;
const String config_prefix = "classifiers";
config.keys(config_prefix, keys);
for (const auto & key : keys)
classifiers.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(config, config_prefix + "." + key));
}
const ClassifierDescription & ClassifiersConfig::get(const String & classifier_name)
{
if (auto it = classifiers.find(classifier_name); it != classifiers.end())
return it->second;
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Unknown classifier '{}' to access resources", classifier_name);
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <base/types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <unordered_map>
namespace DB
{
/// Mapping of resource name into path string (e.g. "disk1" -> "/path/to/class")
struct ClassifierDescription : std::unordered_map<String, String>
{
ClassifierDescription(const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
};
/*
* Loads a config with the following format:
* <classifiers>
* <classifier1>
* <resource1>/path/to/queue</resource1>
* <resource2>/path/to/another/queue</resource2>
* </classifier1>
* ...
* <classifierN>...</classifierN>
* </classifiers>
*/
class ClassifiersConfig
{
public:
ClassifiersConfig() = default;
ClassifiersConfig(const Poco::Util::AbstractConfiguration & config);
const ClassifierDescription & get(const String & classifier_name);
private:
std::unordered_map<String, ClassifierDescription> classifiers; // by classifier_name
};
}

View File

@ -0,0 +1,13 @@
#include <IO/Resource/FifoQueue.h>
#include <IO/SchedulerNodeFactory.h>
namespace DB
{
void registerFifoQueue(SchedulerNodeFactory & factory)
{
factory.registerMethod<FifoQueue>("fifo");
}
}

View File

@ -0,0 +1,91 @@
#pragma once
#include <Common/Stopwatch.h>
#include <IO/ISchedulerQueue.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <deque>
#include <mutex>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
}
/*
* FIFO queue to hold pending resource requests
*/
class FifoQueue : public ISchedulerQueue
{
public:
FifoQueue(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: ISchedulerQueue(event_queue_, config, config_prefix)
{}
bool equals(ISchedulerNode * other) override
{
if (auto * o = dynamic_cast<FifoQueue *>(other))
return true;
return false;
}
void enqueueRequest(ResourceRequest * request) override
{
std::unique_lock lock(mutex);
request->enqueue_ns = clock_gettime_ns();
bool was_empty = requests.empty();
requests.push_back(request);
if (was_empty)
scheduleActivation();
}
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
std::unique_lock lock(mutex);
if (requests.empty())
return {nullptr, false};
ResourceRequest * result = requests.front();
requests.pop_front();
return {result, !requests.empty()};
}
bool isActive() override
{
std::unique_lock lock(mutex);
return !requests.empty();
}
void activateChild(ISchedulerNode *) override
{
assert(false); // queue cannot have children
}
void attachChild(const SchedulerNodePtr &) override
{
throw Exception(
ErrorCodes::INVALID_SCHEDULER_NODE,
"Cannot add child to leaf scheduler queue: {}",
getPath());
}
void removeChild(ISchedulerNode *) override
{
}
ISchedulerNode * getChild(const String &) override
{
return nullptr;
}
private:
std::mutex mutex;
std::deque<ResourceRequest *> requests;
};
}

View File

@ -0,0 +1,13 @@
#include <IO/Resource/PriorityPolicy.h>
#include <IO/SchedulerNodeFactory.h>
namespace DB
{
void registerPriorityPolicy(SchedulerNodeFactory & factory)
{
factory.registerMethod<PriorityPolicy>("priority");
}
}

View File

@ -0,0 +1,143 @@
#pragma once
#include <IO/ISchedulerQueue.h>
#include <IO/SchedulerRoot.h>
#include <algorithm>
#include <unordered_map>
#include <vector>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
}
/*
* Scheduler node that implements priority scheduling policy.
* Requests are scheduled in order of priorities.
*/
class PriorityPolicy : public ISchedulerNode
{
/// Scheduling state of a child
struct Item
{
ISchedulerNode * child = nullptr;
Int64 priority = 0; // higher value means higher priority
/// For max-heap by priority
bool operator<(const Item& rhs) const noexcept
{
return priority < rhs.priority;
}
};
public:
PriorityPolicy(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerNode(event_queue_, config, config_prefix)
{}
bool equals(ISchedulerNode * other) override
{
if (auto * o = dynamic_cast<PriorityPolicy *>(other))
return true;
return false;
}
void attachChild(const SchedulerNodePtr & child) override
{
// Take ownership
assert(child->parent == nullptr);
if (auto [it, inserted] = children.emplace(child->basename, child); !inserted)
throw Exception(
ErrorCodes::INVALID_SCHEDULER_NODE,
"Can't add another child with the same path: {}",
it->second->getPath());
// Attach
child->setParent(this);
// Activate child if it is not empty
if (child->isActive())
activateChild(child.get());
}
void removeChild(ISchedulerNode * child) override
{
if (auto iter = children.find(child->basename); iter != children.end())
{
SchedulerNodePtr removed = iter->second;
// Deactivate: detach is not very common operation, so we can afford O(N) here
for (auto i = items.begin(), e = items.end(); i != e; ++i)
{
if (i->child == removed.get())
{
items.erase(i);
// Element was removed from inside of heap -- heap must be rebuilt
std::make_heap(items.begin(), items.end());
break;
}
}
// Detach
removed->setParent(nullptr);
// Get rid of ownership
children.erase(iter);
}
}
ISchedulerNode * getChild(const String & child_name) override
{
if (auto iter = children.find(child_name); iter != children.end())
return iter->second.get();
else
return nullptr;
}
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (items.empty())
return {nullptr, false};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
assert(request != nullptr);
// Deactivate child if it is empty
if (!child_active)
{
std::pop_heap(items.begin(), items.end());
items.pop_back();
}
return {request, !items.empty()};
}
bool isActive() override
{
return !items.empty();
}
void activateChild(ISchedulerNode * child) override
{
bool activate_parent = items.empty();
items.emplace_back(Item{child, child->info.priority});
std::push_heap(items.begin(), items.end());
if (activate_parent && parent)
parent->activateChild(this);
}
private:
/// Heap of active children
std::vector<Item> items;
/// All children with ownership
std::unordered_map<String, SchedulerNodePtr> children; // basename -> child
};
}

View File

@ -0,0 +1,13 @@
#include <IO/Resource/SemaphoreConstraint.h>
#include <IO/SchedulerNodeFactory.h>
namespace DB
{
void registerSemaphoreConstraint(SchedulerNodeFactory & factory)
{
factory.registerMethod<SemaphoreConstraint>("inflight_limit");
}
}

View File

@ -0,0 +1,138 @@
#pragma once
#include <IO/ISchedulerConstraint.h>
#include <IO/SchedulerRoot.h>
#include <mutex>
#include <limits>
#include <utility>
namespace DB
{
/*
* Limited concurrency constraint.
* Blocks if either number of concurrent in-flight requests exceeds `max_requests`, or their total cost exceeds `max_cost`
*/
class SemaphoreConstraint : public ISchedulerConstraint
{
static constexpr Int64 default_max_requests = std::numeric_limits<Int64>::max();
static constexpr Int64 default_max_cost = std::numeric_limits<Int64>::max();
public:
SemaphoreConstraint(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerConstraint(event_queue_, config, config_prefix)
, max_requests(config.getInt64(config_prefix + ".max_requests", default_max_requests))
, max_cost(config.getInt64(config_prefix + ".max_cost", config.getInt64(config_prefix + ".max_bytes", default_max_cost)))
{}
bool equals(ISchedulerNode * other) override
{
if (auto * o = dynamic_cast<SemaphoreConstraint *>(other))
return max_requests == o->max_requests && max_cost == o->max_cost;
return false;
}
void attachChild(const std::shared_ptr<ISchedulerNode> & child_) override
{
// Take ownership
child = child_;
child->setParent(this);
// Activate if required
if (child->isActive())
activateChild(child.get());
}
void removeChild(ISchedulerNode * child_) override
{
if (child.get() == child_)
{
child_active = false; // deactivate
child->setParent(nullptr); // detach
child.reset();
}
}
ISchedulerNode * getChild(const String & child_name) override
{
if (child->basename == child_name)
return child.get();
else
return nullptr;
}
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
// Dequeue request from the child
auto [request, child_now_active] = child->dequeueRequest();
if (!request)
return {nullptr, false};
// Request has reference to the first (closest to leaf) `constraint`, which can have `parent_constraint`.
// The former is initialized here dynamically and the latter is initialized once during hierarchy construction.
if (!request->constraint)
request->constraint = this;
// Update state on request arrival
std::unique_lock lock(mutex);
requests++;
cost += request->cost;
child_active = child_now_active;
return {request, active()};
}
void finishRequest(ResourceRequest * request) override
{
// Recursive traverse of parent flow controls in reverse order
if (parent_constraint)
parent_constraint->finishRequest(request);
// Update state on request departure
std::unique_lock lock(mutex);
bool was_active = active();
requests--;
cost -= request->cost;
// Schedule activation on transition from inactive state
if (!was_active && active())
scheduleActivation();
}
void activateChild(ISchedulerNode * child_) override
{
std::unique_lock lock(mutex);
if (child_ == child.get())
if (!std::exchange(child_active, true) && satisfied() && parent)
parent->activateChild(this);
}
bool isActive() override
{
std::unique_lock lock(mutex);
return active();
}
private:
bool satisfied() const
{
return requests < max_requests && cost < max_cost;
}
bool active() const
{
return satisfied() && child_active;
}
private:
std::mutex mutex;
Int64 requests = 0;
Int64 cost = 0;
bool child_active = false;
SchedulerNodePtr child;
Int64 max_requests = default_max_requests;
Int64 max_cost = default_max_cost;
};
}

View File

@ -0,0 +1,138 @@
#include <IO/Resource/StaticResourceManager.h>
#include <IO/SchedulerNodeFactory.h>
#include <IO/ResourceManagerFactory.h>
#include <IO/ISchedulerQueue.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <map>
#include <tuple>
#include <algorithm>
namespace DB
{
namespace ErrorCodes
{
extern const int RESOURCE_ACCESS_DENIED;
extern const int RESOURCE_NOT_FOUND;
extern const int INVALID_SCHEDULER_NODE;
}
StaticResourceManager::Resource::Resource(
const String & name,
EventQueue * event_queue,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix)
{
// Initialize scheduler nodes
Poco::Util::AbstractConfiguration::Keys keys;
std::sort(keys.begin(), keys.end()); // for parents to appear before children
config.keys(config_prefix, keys);
for (const auto & key : keys)
{
if (!startsWith(key, "node"))
continue;
// Validate path
String path = config.getString(config_prefix + "." + key + "[@path]", "");
if (path.empty())
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Attribute 'path' must be specified in all nodes for resource '{}'", name);
if (path[0] != '/')
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "path must start with '/' for resource '{}'", name);
// Create node
String type = config.getString(config_prefix + "." + key + ".type", "fifo");
SchedulerNodePtr node = SchedulerNodeFactory::instance().get(type, event_queue, config, config_prefix + "." + key);
node->basename = path.substr(1);
// Take ownership
if (auto [_, inserted] = nodes.emplace(path, node); !inserted)
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Duplicate path '{}' for resource '{}'", path, name);
// Attach created node to parent (if not root)
if (path != "/")
{
String parent_path = path.substr(0, path.rfind('/'));
if (parent_path.empty())
parent_path = "/";
if (auto parent = nodes.find(parent_path); parent != nodes.end())
parent->second->attachChild(node);
else
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Parent doesn't exist for path '{}' for resource '{}'", path, name);
}
}
if (nodes.find("/") == nodes.end())
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "undefined root node path '/' for resource '{}'", name);
}
StaticResourceManager::Classifier::Classifier(const StaticResourceManager & manager, const ClassifierDescription & cfg)
{
for (auto [resource_name, path] : cfg)
{
if (auto resource_iter = manager.resources.find(resource_name); resource_iter != manager.resources.end())
{
const Resource & resource = resource_iter->second;
if (auto node_iter = resource.nodes.find(path); node_iter != resource.nodes.end())
{
if (auto queue = dynamic_cast<ISchedulerQueue *>(node_iter->second.get()))
resources.emplace(resource_name, ResourceLink{.queue = queue});
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Unable to access non-queue node at path '{}' for resource '{}'", path, resource_name);
}
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Path '{}' for resource '{}' does not exist", path, resource_name);
}
else
resources.emplace(resource_name, ResourceLink{}); // resource not configured - unlimited
}
}
ResourceLink StaticResourceManager::Classifier::get(const String & resource_name)
{
if (auto iter = resources.find(resource_name); iter != resources.end())
return iter->second;
else
throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "Access denied to resource '{}'", resource_name);
}
void StaticResourceManager::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
if (!resources.empty())
return; // already initialized, configuration update is not supported
Poco::Util::AbstractConfiguration::Keys keys;
const String config_prefix = "resources";
config.keys(config_prefix, keys);
// Create resource for every element under <resources> tag
for (const auto & key : keys)
{
auto [iter, _] = resources.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(key, scheduler.event_queue, config, config_prefix + "." + key));
// Attach root of resource to scheduler
scheduler.attachChild(iter->second.nodes.find("/")->second);
}
// Initialize classifiers
classifiers.reset(new ClassifiersConfig(config));
// Run scheduler thread
scheduler.start();
}
ClassifierPtr StaticResourceManager::acquire(const String & classifier_name)
{
return std::make_shared<Classifier>(*this, classifiers->get(classifier_name));
}
void registerStaticResourceManager(ResourceManagerFactory & factory)
{
factory.registerMethod<StaticResourceManager>("static");
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include <IO/IResourceManager.h>
#include <IO/SchedulerRoot.h>
#include <IO/Resource/ClassifiersConfig.h>
#include <mutex>
namespace DB
{
/*
* Reads `<resources>` from config at startup and registers them in single `SchedulerRoot`.
* Do not support configuration updates, server restart is required.
*/
class StaticResourceManager : public IResourceManager
{
public:
// Just initialization, any further updates are ignored for the sake of simplicity
// NOTE: manager must be initialized before any acquire() calls to avoid races
void updateConfiguration(const Poco::Util::AbstractConfiguration & config) override;
ClassifierPtr acquire(const String & classifier_name) override;
private:
struct Resource
{
std::unordered_map<String, SchedulerNodePtr> nodes; // by paths
Resource(
const String & name,
EventQueue * event_queue,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix);
};
struct Classifier : public IClassifier
{
Classifier(const StaticResourceManager & manager, const ClassifierDescription & cfg);
ResourceLink get(const String & resource_name) override;
std::unordered_map<String, ResourceLink> resources; // accessible resources by names
};
private:
SchedulerRoot scheduler;
std::unordered_map<String, Resource> resources; // by name
std::unique_ptr<ClassifiersConfig> classifiers;
};
}

View File

@ -0,0 +1,15 @@
#include <IO/Resource/registerResourceManagers.h>
#include <IO/ResourceManagerFactory.h>
namespace DB
{
void registerStaticResourceManager(ResourceManagerFactory &);
void registerResourceManagers()
{
auto & factory = ResourceManagerFactory::instance();
registerStaticResourceManager(factory);
}
}

View File

@ -0,0 +1,8 @@
#pragma once
namespace DB
{
void registerResourceManagers();
}

View File

@ -0,0 +1,28 @@
#include <IO/Resource/registerSchedulerNodes.h>
#include <IO/ISchedulerNode.h>
#include <IO/ISchedulerConstraint.h>
#include <IO/SchedulerNodeFactory.h>
namespace DB
{
void registerPriorityPolicy(SchedulerNodeFactory &);
void registerSemaphoreConstraint(SchedulerNodeFactory &);
void registerFifoQueue(SchedulerNodeFactory &);
void registerSchedulerNodes()
{
auto & factory = SchedulerNodeFactory::instance();
// ISchedulerNode
registerPriorityPolicy(factory);
// ISchedulerConstraint
registerSemaphoreConstraint(factory);
// ISchedulerQueue
registerFifoQueue(factory);
}
}

View File

@ -0,0 +1,8 @@
#pragma once
namespace DB
{
void registerSchedulerNodes();
}

View File

@ -0,0 +1,309 @@
#pragma once
#include <IO/IResourceManager.h>
#include <IO/SchedulerRoot.h>
#include <IO/ResourceGuard.h>
#include <IO/SchedulerNodeFactory.h>
#include <IO/Resource/PriorityPolicy.h>
#include <IO/Resource/FifoQueue.h>
#include <IO/Resource/SemaphoreConstraint.h>
#include <IO/Resource/registerSchedulerNodes.h>
#include <IO/Resource/registerResourceManagers.h>
#include <Poco/Util/XMLConfiguration.h>
#include <atomic>
#include <barrier>
#include <unordered_map>
#include <mutex>
#include <set>
#include <sstream>
namespace DB
{
struct ResourceTestBase
{
ResourceTestBase()
{
[[maybe_unused]] static bool typesRegistered = [] { registerSchedulerNodes(); registerResourceManagers(); return true; }();
}
template <class TClass>
static TClass * add(EventQueue * event_queue, SchedulerNodePtr & root_node, const String & path, const String & xml = {})
{
std::stringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
stream << "<resource><node path=\"" << path << "\">" << xml << "</node></resource>";
Poco::AutoPtr config{new Poco::Util::XMLConfiguration(stream)};
String config_prefix = "node";
if (path == "/")
{
EXPECT_TRUE(root_node.get() == nullptr);
root_node.reset(new TClass(event_queue, *config, config_prefix));
return static_cast<TClass *>(root_node.get());
}
EXPECT_TRUE(root_node.get() != nullptr); // root should be initialized first
ISchedulerNode * parent = root_node.get();
size_t pos = 1;
String child_name;
while (pos < path.length())
{
size_t slash = path.find('/', pos);
if (slash != String::npos)
{
parent = parent->getChild(path.substr(pos, slash - pos));
EXPECT_TRUE(parent != nullptr); // parent does not exist
pos = slash + 1;
}
else
{
child_name = path.substr(pos);
pos = String::npos;
}
}
EXPECT_TRUE(!child_name.empty()); // wrong path
SchedulerNodePtr node = std::make_shared<TClass>(event_queue, *config, config_prefix);
node->basename = child_name;
parent->attachChild(node);
return static_cast<TClass *>(node.get());
}
};
struct FlowCtlTest : public SemaphoreConstraint
{
FlowCtlTest(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: SemaphoreConstraint(event_queue_, config, config_prefix)
{}
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
auto [request, active] = SemaphoreConstraint::dequeueRequest();
if (request)
{
std::unique_lock lock(mutex);
requests.insert(request);
}
return {request, active};
}
void finishRequest(ResourceRequest * request) override
{
{
std::unique_lock lock(mutex);
requests.erase(request);
}
SemaphoreConstraint::finishRequest(request);
}
std::mutex mutex;
std::set<ResourceRequest *> requests;
};
class ResourceTestClass : public ResourceTestBase
{
struct Request : public ResourceRequest
{
String name;
Request(ResourceCost cost_, const String & name_)
: ResourceRequest(cost_)
, name(name_)
{}
void execute() override
{
}
};
public:
template <class TClass>
void add(const String & path, const String & xml = {})
{
ResourceTestBase::add<TClass>(&event_queue, root_node, path, xml);
}
void enqueue(const String & path, const std::vector<ResourceCost> & costs)
{
ASSERT_TRUE(root_node.get() != nullptr); // root should be initialized first
ISchedulerNode * node = root_node.get();
size_t pos = 1;
while (pos < path.length())
{
size_t slash = path.find('/', pos);
if (slash != String::npos)
{
node = node->getChild(path.substr(pos, slash - pos));
ASSERT_TRUE(node != nullptr); // does not exist
pos = slash + 1;
}
else
{
node = node->getChild(path.substr(pos));
pos = String::npos;
}
}
ISchedulerQueue * queue = dynamic_cast<ISchedulerQueue *>(node);
ASSERT_TRUE(queue != nullptr); // not a queue
for (ResourceCost cost : costs)
{
queue->enqueueRequest(new Request(cost, queue->basename));
}
processEvents(); // to activate queues
}
void dequeue(size_t count_limit = size_t(-1), ResourceCost cost_limit = ResourceCostMax)
{
while (count_limit > 0 && cost_limit > 0)
{
if (auto [request, _] = root_node->dequeueRequest(); request)
{
count_limit--;
cost_limit -= request->cost;
handle(static_cast<Request *>(request));
}
else
{
break;
}
}
}
void handle(Request * request)
{
consumed_cost[request->name] += request->cost;
delete request;
}
void consumed(const String & name, ResourceCost value, ResourceCost error = 0)
{
EXPECT_TRUE(consumed_cost[name] >= value - error);
EXPECT_TRUE(consumed_cost[name] <= value + error);
consumed_cost[name] -= value;
}
void processEvents()
{
while (event_queue.tryProcess()) {}
}
private:
EventQueue event_queue;
SchedulerNodePtr root_node;
std::unordered_map<String, ResourceCost> consumed_cost;
};
template <class TManager>
struct ResourceTestManager : public ResourceTestBase
{
ResourceManagerPtr manager;
std::vector<ThreadFromGlobalPool> threads;
std::barrier<> busy_period;
struct Guard : public ResourceGuard
{
ResourceTestManager & t;
Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost)
: ResourceGuard(link_, cost, PostponeLocking)
, t(t_)
{
t.onEnqueue(link);
lock();
t.onExecute(link);
}
};
struct TItem
{
std::atomic<Int64> enqueued = 0; // number of enqueued requests
std::atomic<Int64> left = 0; // number of requests left to be executed
};
struct ResourceQueueHash
{
size_t operator()(const ResourceLink & link) const
{
return std::hash<ISchedulerQueue*>()(link.queue);
}
};
std::mutex link_data_mutex;
std::unordered_map<ResourceLink, TItem, ResourceQueueHash> link_data;
explicit ResourceTestManager(size_t thread_count = 1)
: manager(new TManager)
, busy_period(thread_count)
{}
~ResourceTestManager()
{
for (auto & thread : threads)
thread.join();
}
void update(const String & xml)
{
std::istringstream stream(xml); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
Poco::AutoPtr config{new Poco::Util::XMLConfiguration(stream)};
manager->updateConfiguration(*config);
}
auto & getLinkData(ResourceLink link)
{
std::unique_lock lock{link_data_mutex};
return link_data[link];
}
// Use at least two threads for each queue to avoid queue being deactivated:
// while the first request is executing, the second request is in queue - holding it active.
// use onEnqueue() and onExecute() functions for this purpose.
void onEnqueue(ResourceLink link)
{
getLinkData(link).enqueued.fetch_add(1, std::memory_order_relaxed);
}
void onExecute(ResourceLink link)
{
auto & data = getLinkData(link);
Int64 left = data.left.fetch_sub(1, std::memory_order_relaxed) - 1;
Int64 enqueued = data.enqueued.fetch_sub(1, std::memory_order_relaxed) - 1;
while (left > 0 && enqueued <= 0) // Ensure at least one thread has already enqueued itself (or there is no more requests)
{
std::this_thread::yield();
left = data.left.load();
enqueued = data.enqueued.load();
}
}
// This is required for proper busy period start, i.e. everyone to be seen by scheduler as appeared at the same time:
// - resource is blocked with queries by leader thread;
// - leader thread notifies followers to enqueue their requests;
// - leader thread unblocks resource;
// - busy period begins.
// NOTE: actually leader's request(s) make their own small busy period.
void blockResource(ResourceLink link)
{
ResourceGuard g(link, 1, ResourceGuard::PostponeLocking);
g.lock();
// NOTE: at this point we assume resource to be blocked by single request (<max_requests>1</max_requests>)
busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked
busy_period.arrive_and_wait(); // (2) wait all followers to enqueue their requests
}
void startBusyPeriod(ResourceLink link, ResourceCost cost, Int64 total_requests)
{
getLinkData(link).left += total_requests + 1;
busy_period.arrive_and_wait(); // (1) wait leader to block resource
ResourceGuard g(link, cost, ResourceGuard::PostponeLocking);
onEnqueue(link);
busy_period.arrive_and_wait(); // (2) notify leader to unblock
g.lock();
onExecute(link);
}
};
}

View File

@ -0,0 +1,122 @@
#include <gtest/gtest.h>
#include <IO/Resource/tests/ResourceTest.h>
#include <IO/Resource/PriorityPolicy.h>
using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(IOResourcePriorityPolicy, Factory)
{
ResourceTest t;
Poco::AutoPtr cfg = new Poco::Util::XMLConfiguration();
SchedulerNodePtr prio = SchedulerNodeFactory::instance().get("priority", /* event_queue = */ nullptr, *cfg, "");
EXPECT_TRUE(dynamic_cast<PriorityPolicy *>(prio.get()) != nullptr);
}
TEST(IOResourcePriorityPolicy, Priorities)
{
ResourceTest t;
t.add<PriorityPolicy>("/");
t.add<FifoQueue>("/A", "<priority>1</priority>");
t.add<FifoQueue>("/B", "<priority>2</priority>");
t.add<FifoQueue>("/C", "<priority>3</priority>");
t.enqueue("/A", {10, 10, 10});
t.enqueue("/B", {10, 10, 10});
t.enqueue("/C", {10, 10, 10});
t.dequeue(2);
t.consumed("A", 0);
t.consumed("B", 0);
t.consumed("C", 20);
t.dequeue(2);
t.consumed("A", 0);
t.consumed("B", 10);
t.consumed("C", 10);
t.dequeue(2);
t.consumed("A", 0);
t.consumed("B", 20);
t.consumed("C", 0);
t.dequeue();
t.consumed("A", 30);
t.consumed("B", 0);
t.consumed("C", 0);
}
TEST(IOResourcePriorityPolicy, Activation)
{
ResourceTest t;
t.add<PriorityPolicy>("/");
t.add<FifoQueue>("/A", "<priority>1</priority>");
t.add<FifoQueue>("/B", "<priority>2</priority>");
t.add<FifoQueue>("/C", "<priority>3</priority>");
t.enqueue("/A", {10, 10, 10, 10, 10, 10});
t.enqueue("/B", {10});
t.enqueue("/C", {10, 10});
t.dequeue(3);
t.consumed("A", 0);
t.consumed("B", 10);
t.consumed("C", 20);
t.dequeue(2);
t.consumed("A", 20);
t.consumed("B", 0);
t.consumed("C", 0);
t.enqueue("/B", {10, 10, 10});
t.dequeue(2);
t.consumed("A", 0);
t.consumed("B", 20);
t.consumed("C", 0);
t.enqueue("/C", {10, 10});
t.dequeue(3);
t.consumed("A", 0);
t.consumed("B", 10);
t.consumed("C", 20);
t.dequeue(2);
t.consumed("A", 20);
t.consumed("B", 0);
t.consumed("C", 0);
}
TEST(IOResourcePriorityPolicy, SinglePriority)
{
ResourceTest t;
t.add<PriorityPolicy>("/");
t.add<FifoQueue>("/A");
for (int i = 0; i < 3; i++)
{
t.enqueue("/A", {10, 10});
t.dequeue(1);
t.consumed("A", 10);
for (int j = 0; j < 3; j++)
{
t.enqueue("/A", {10, 10, 10});
t.dequeue(1);
t.consumed("A", 10);
t.dequeue(1);
t.consumed("A", 10);
t.dequeue(1);
t.consumed("A", 10);
}
t.dequeue(1);
t.consumed("A", 10);
}
}

View File

@ -0,0 +1,103 @@
#include <gtest/gtest.h>
#include <IO/Resource/tests/ResourceTest.h>
#include <IO/Resource/StaticResourceManager.h>
#include <Poco/Util/XMLConfiguration.h>
using namespace DB;
using ResourceTest = ResourceTestManager<StaticResourceManager>;
using TestGuard = ResourceTest::Guard;
TEST(IOResourceStaticResourceManager, Smoke)
{
ResourceTest t;
t.update(R"CONFIG(
<clickhouse>
<resources>
<res1>
<node path="/"> <type>inflight_limit</type><max_requests>10</max_requests></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/A"></node>
<node path="/prio/B"><priority>1</priority></node>
</res1>
</resources>
<classifiers>
<A><res1>/prio/A</res1></A>
<B><res1>/prio/B</res1></B>
</classifiers>
</clickhouse>
)CONFIG");
ClassifierPtr cA = t.manager->acquire("A");
ClassifierPtr cB = t.manager->acquire("B");
for (int i = 0; i < 10; i++)
{
ResourceGuard gA(cA->get("res1"));
ResourceGuard gB(cB->get("res1"));
}
}
TEST(IOResourceStaticResourceManager, Prioritization)
{
constexpr size_t T = 2; // threads per queue
int N = 100; // requests per thread
ResourceTest t(4 * T + 1);
t.update(R"CONFIG(
<clickhouse>
<resources>
<res1>
<node path="/"> <type>inflight_limit</type><max_requests>1</max_requests></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/A"> <priority>-1</priority></node>
<node path="/prio/B"> <priority>1</priority></node>
<node path="/prio/C"> </node>
<node path="/prio/D"> </node>
<node path="/prio/leader"></node>
</res1>
</resources>
<classifiers>
<A><res1>/prio/A</res1></A>
<B><res1>/prio/B</res1></B>
<C><res1>/prio/C</res1></C>
<D><res1>/prio/D</res1></D>
<leader><res1>/prio/leader</res1></leader>
</classifiers>
</clickhouse>
)CONFIG");
std::optional<Int64> last_priority;
auto check = [&] (Int64 priority)
{
// Lock is not required here because this is called during request execution and we have max_requests = 1
if (last_priority)
EXPECT_TRUE(priority <= *last_priority); // Should be true if every queue arrived at the same time at busy period start
last_priority = priority;
};
for (String name : {"A", "B", "C", "D"})
{
for (int thr = 0; thr < T; thr++)
{
t.threads.emplace_back([&, name]
{
ClassifierPtr c = t.manager->acquire(name);
ResourceLink link = c->get("res1");
t.startBusyPeriod(link, 1, N);
for (int req = 0; req < N; req++)
{
TestGuard g(t, link, 1);
check(link.queue->info.priority);
}
});
}
}
ClassifierPtr c = t.manager->acquire("leader");
ResourceLink link = c->get("res1");
t.blockResource(link);
}

View File

@ -0,0 +1,103 @@
#include <gtest/gtest.h>
#include <IO/SchedulerRoot.h>
#include <IO/Resource/tests/ResourceTest.h>
using namespace DB;
struct ResourceTest : public ResourceTestBase
{
SchedulerRoot scheduler;
ResourceTest()
{
scheduler.start();
}
~ResourceTest()
{
scheduler.stop(true);
}
};
struct ResourceHolder
{
ResourceTest & t;
SchedulerNodePtr root_node;
ResourceHolder(ResourceTest & t_)
: t(t_)
{}
~ResourceHolder()
{
unregisterResource();
}
template <class TClass>
TClass * add(const String & path, const String & xml = {})
{
return ResourceTest::add<TClass>(t.scheduler.event_queue, root_node, path, xml);
}
ResourceLink addQueue(const String & path, const String & xml = {})
{
return {.queue = static_cast<ISchedulerQueue *>(ResourceTest::add<FifoQueue>(t.scheduler.event_queue, root_node, path, xml))};
}
void registerResource()
{
t.scheduler.event_queue->enqueue([this]
{
t.scheduler.attachChild(root_node);
});
}
void unregisterResource()
{
t.scheduler.event_queue->enqueue([this]
{
t.scheduler.removeChild(root_node.get());
});
}
};
TEST(IOSchedulerRoot, Smoke)
{
ResourceTest t;
ResourceHolder r1(t);
auto fc1 = r1.add<FlowCtlTest>("/", "<max_requests>1</max_requests>");
r1.add<PriorityPolicy>("/prio");
auto a = r1.addQueue("/prio/A", "<priority>1</priority>");
auto b = r1.addQueue("/prio/B", "<priority>2</priority>");
r1.registerResource();
ResourceHolder r2(t);
auto fc2 = r2.add<FlowCtlTest>("/", "<max_requests>1</max_requests>");
r2.add<PriorityPolicy>("/prio");
auto c = r2.addQueue("/prio/C", "<priority>-1</priority>");
auto d = r2.addQueue("/prio/D", "<priority>-2</priority>");
r2.registerResource();
{
ResourceGuard rg(a);
EXPECT_TRUE(fc1->requests.contains(&rg.request));
}
{
ResourceGuard rg(b);
EXPECT_TRUE(fc1->requests.contains(&rg.request));
}
{
ResourceGuard rg(c);
EXPECT_TRUE(fc2->requests.contains(&rg.request));
}
{
ResourceGuard rg(d);
EXPECT_TRUE(fc2->requests.contains(&rg.request));
}
}

93
src/IO/ResourceGuard.h Normal file
View File

@ -0,0 +1,93 @@
#pragma once
#include <base/types.h>
#include <IO/ResourceRequest.h>
#include <IO/ISchedulerQueue.h>
#include <IO/ISchedulerConstraint.h>
#include <future>
namespace DB
{
/*
* Scoped resource guard.
* Waits for resource to be available in constructor and releases resource in destructor
*/
class ResourceGuard
{
public:
enum ResourceGuardCtor
{
LockStraightAway, /// Lock inside constructor (default)
PostponeLocking /// Don't lock in constructor, but during later `lock()` call
};
struct Request : public ResourceRequest
{
/// Promise to be set on request execution
std::promise<void> dequeued;
explicit Request(ResourceCost cost_ = 1)
: ResourceRequest(cost_)
{}
void execute() override
{
// This function is executed inside scheduler thread and wakes thread issued this `request` (using ResourceGuard)
// That thread will continue execution and do real consumption of requested resource synchronously.
dequeued.set_value();
}
};
/// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`)
explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway)
: link(link_)
, request(cost)
{
if (link.queue)
{
dequeued_future = request.dequeued.get_future();
link.queue->enqueueRequest(&request);
if (ctor == LockStraightAway)
lock();
}
}
~ResourceGuard()
{
unlock();
}
/// Blocks until resource is available
void lock()
{
if (link.queue)
dequeued_future.get();
}
/// Report request execution has finished
void unlock()
{
if (link.queue)
{
assert(!dequeued_future.valid()); // unlock must be called only after lock()
if (request.constraint)
request.constraint->finishRequest(&request);
}
}
/// Mark request as unsuccessful; by default request is considered to be successful
void setFailure()
{
request.successful = false;
}
public:
ResourceLink link;
Request request;
std::future<void> dequeued_future;
};
}

View File

@ -0,0 +1,55 @@
#pragma once
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <IO/IResourceManager.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <mutex>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
}
class ResourceManagerFactory : private boost::noncopyable
{
public:
static ResourceManagerFactory & instance()
{
static ResourceManagerFactory ret;
return ret;
}
ResourceManagerPtr get(const String & name)
{
std::lock_guard lock{mutex};
if (auto iter = methods.find(name); iter != methods.end())
return iter->second();
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Unknown scheduler node type: {}", name);
}
template <class TDerived>
void registerMethod(const String & name)
{
std::lock_guard lock{mutex};
methods[name] = [] ()
{
return std::make_shared<TDerived>();
};
}
private:
std::mutex mutex;
using Method = std::function<ResourceManagerPtr()>;
std::unordered_map<String, Method> methods;
};
}

92
src/IO/ResourceRequest.h Normal file
View File

@ -0,0 +1,92 @@
#pragma once
#include <base/types.h>
#include <limits>
namespace DB
{
// Forward declarations
class ISchedulerQueue;
class ISchedulerNode;
class ISchedulerConstraint;
/// Cost in terms of used resource (e.g. bytes for network IO)
using ResourceCost = Int64;
constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
/// Internal identifier of a resource (for arrays; unique per scheduler)
using ResourceIdx = size_t;
constexpr ResourceIdx ResourceIdxNotSet = ResourceIdx(-1);
/// Timestamps (nanoseconds since epoch)
using ResourceNs = UInt64;
/*
* Info required for resource consumption.
*/
struct ResourceLink
{
ISchedulerQueue * queue = nullptr;
bool operator==(const ResourceLink &) const = default;
};
/*
* Request for a resource consumption. The main moving part of the scheduling subsystem.
* Resource requests processing workflow:
*
* ----1=2222222222222=3=4=555555555555555=6-----> time
* ^ ^ ^ ^ ^ ^
* | | | | | |
* enqueue wait dequeue execute consume finish
*
* 1) Request is enqueued using ISchedulerQueue::enqueueRequest().
* 2) Request competes with others for access to a resource; effectively just waiting in a queue.
* 3) Scheduler calls ISchedulerNode::dequeueRequest() that returns the request.
* 4) Callback ResourceRequest::execute() is called to provide access to the resource.
* 5) The resource consumption is happening outside of the scheduling subsystem.
* 6) request->constraint->finishRequest() is called when consumption is finished.
*
* Steps (5) and (6) can be omitted if constraint is not used by the resource.
*
* Request can be created on stack or heap.
* Request ownership is done outside of the scheduling subsystem.
* After (6) request can be destructed safely.
*
* Request cancelling is not supported yet.
*/
class ResourceRequest
{
public:
/// Cost of request execution; should be filled before request enqueueing.
/// NOTE: If cost is not known in advance, credit model can be used:
/// NOTE: for the first request use 1 and
ResourceCost cost;
/// Request outcome
/// Should be filled during resource consumption
bool successful = true;
/// Scheduler node to be notified on consumption finish
/// Auto-filled during request enqueue/dequeue
ISchedulerConstraint * constraint = nullptr;
/// Timestamps for introspection
ResourceNs enqueue_ns = 0;
ResourceNs execute_ns = 0;
ResourceNs finish_ns = 0;
explicit ResourceRequest(ResourceCost cost_ = 1)
: cost(cost_)
{}
virtual ~ResourceRequest() = default;
/// Callback to trigger resource consumption.
/// IMPORTANT: is called from scheduler thread and must be fast,
/// just triggering start of a consumption, not doing the consumption itself
/// (e.g. setting an std::promise or creating a job in a thread pool)
virtual void execute() = 0;
};
}

View File

@ -0,0 +1,57 @@
#pragma once
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <IO/ISchedulerNode.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <mutex>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
}
class SchedulerNodeFactory : private boost::noncopyable
{
public:
static SchedulerNodeFactory & instance()
{
static SchedulerNodeFactory ret;
return ret;
}
SchedulerNodePtr get(const String & name, EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
std::lock_guard lock{mutex};
if (auto iter = methods.find(name); iter != methods.end())
return iter->second(event_queue, config, config_prefix);
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Unknown scheduler node type: {}", name);
}
template <class TDerived>
void registerMethod(const String & name)
{
std::lock_guard lock{mutex};
methods[name] = [] (EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
return std::make_shared<TDerived>(event_queue, config, config_prefix);
};
}
private:
std::mutex mutex;
using Method = std::function<SchedulerNodePtr(EventQueue * event_queue, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)>;
std::unordered_map<String, Method> methods;
};
}

250
src/IO/SchedulerRoot.h Normal file
View File

@ -0,0 +1,250 @@
#pragma once
#include <base/defines.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <IO/ISchedulerNode.h>
#include <IO/ISchedulerConstraint.h>
#include <Poco/Util/XMLConfiguration.h>
#include <unordered_map>
#include <map>
#include <memory>
#include <atomic>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
}
/*
* Resource scheduler root node with a dedicated thread.
* Immediate children correspond to different resources.
*/
class SchedulerRoot : public ISchedulerNode
{
private:
struct TResource
{
SchedulerNodePtr root;
// Intrusive cyclic list of active resources
TResource * next = nullptr;
TResource * prev = nullptr;
TResource(const SchedulerNodePtr & root_)
: root(root_)
{
root->info.parent.ptr = this;
}
// Get pointer stored by ctor in info
static TResource * get(SchedulerNodeInfo & info)
{
return reinterpret_cast<TResource *>(info.parent.ptr);
}
};
public:
SchedulerRoot()
: ISchedulerNode(&events)
{}
~SchedulerRoot() override
{
stop();
}
/// Runs separate scheduler thread
void start()
{
if (!scheduler.joinable())
scheduler = ThreadFromGlobalPool([this] { schedulerThread(); });
}
/// Joins scheduler threads and execute every pending request iff graceful
void stop(bool graceful = true)
{
if (scheduler.joinable())
{
stop_flag.store(true);
events.enqueue([]{}); // just to wake up thread
scheduler.join();
if (graceful)
{
// Do the same cycle as schedulerThread() but never block, just exit instead
bool has_work = true;
while (has_work)
{
auto [request, _] = dequeueRequest();
if (request)
execute(request);
else
has_work = false;
while (events.tryProcess())
has_work = true;
}
}
}
}
bool equals(ISchedulerNode * other) override
{
if (auto * o = dynamic_cast<SchedulerRoot *>(other))
return true;
return false;
}
void attachChild(const SchedulerNodePtr & child) override
{
// Take ownership
assert(child->parent == nullptr);
if (auto [it, inserted] = children.emplace(child.get(), child); !inserted)
throw Exception(
ErrorCodes::INVALID_SCHEDULER_NODE,
"Can't add the same scheduler node twice");
// Attach
child->setParent(this);
// Activate child if required
if (child->isActive())
activateChild(child.get());
}
void removeChild(ISchedulerNode * child) override
{
if (auto iter = children.find(child); iter != children.end())
{
SchedulerNodePtr removed = iter->second.root;
// Deactivate if required
deactivate(&iter->second);
// Detach
removed->setParent(nullptr);
// Remove ownership
children.erase(iter);
}
}
ISchedulerNode * getChild(const String &) override
{
abort(); // scheduler is allowed to have multiple children with the same name
}
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (current == nullptr) // No active resources
return {nullptr, false};
// Dequeue request from current resource
auto [request, resource_active] = current->root->dequeueRequest();
assert(request != nullptr);
// Deactivate resource if required
if (!resource_active)
deactivate(current);
else
current = current->next; // Just move round-robin pointer
return {request, current != nullptr};
}
bool isActive() override
{
return current != nullptr;
}
void activateChild(ISchedulerNode * child) override
{
activate(TResource::get(child->info));
}
void setParent(ISchedulerNode *) override
{
abort(); // scheduler must be the root and this function should not be called
}
private:
void activate(TResource * value)
{
assert(value->next == nullptr && value->prev == nullptr);
if (current == nullptr) // No active children
{
current = value;
value->prev = value;
value->next = value;
}
else
{
current->prev->next = value;
value->prev = current->prev;
current->prev = value;
value->next = current;
}
}
void deactivate(TResource * value)
{
if (value->next == nullptr)
return; // Already deactivated
assert(current != nullptr);
if (current == value)
{
if (current->next == current) // We are going to remove the last active child
{
value->next = nullptr;
value->prev = nullptr;
current = nullptr;
return;
}
else // Just move current to next to avoid invalidation
current = current->next;
}
value->prev->next = value->next;
value->next->prev = value->prev;
value->prev = nullptr;
value->next = nullptr;
}
private:
void schedulerThread()
{
while (!stop_flag.load())
{
// Dequeue and execute single request
auto [request, _] = dequeueRequest();
if (request)
execute(request);
else // No more requests -- block until any event happens
events.process();
// Process all events before dequeing to ensure fair competition
while (events.tryProcess()) {}
}
}
void execute(ResourceRequest * request)
{
request->execute_ns = clock_gettime_ns();
request->execute();
}
private:
TResource * current = nullptr; // round-robin pointer
std::unordered_map<ISchedulerNode *, TResource> children; // resources by pointer
std::atomic<bool> stop_flag = false;
EventQueue events;
ThreadFromGlobalPool scheduler;
};
}

View File

@ -47,6 +47,7 @@
#include <Access/SettingsConstraintsAndProfileIDs.h>
#include <Access/ExternalAuthenticators.h>
#include <Access/GSSAcceptor.h>
#include <IO/ResourceManagerFactory.h>
#include <Backups/BackupsWorker.h>
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
@ -209,6 +210,7 @@ struct ContextSharedPart : boost::noncopyable
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
std::unique_ptr<AccessControl> access_control;
mutable ResourceManagerPtr resource_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
@ -950,6 +952,21 @@ std::vector<UUID> Context::getEnabledProfiles() const
}
ResourceManagerPtr Context::getResourceManager() const
{
auto lock = getLock();
if (!shared->resource_manager)
shared->resource_manager = ResourceManagerFactory::instance().get(getConfigRef().getString("resource_manager", "static"));
return shared->resource_manager;
}
ClassifierPtr Context::getClassifier() const
{
auto lock = getLock();
return getResourceManager()->acquire(getSettingsRef().workload);
}
const Scalars & Context::getScalars() const
{
return scalars;

View File

@ -17,7 +17,7 @@
#include <base/types.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/ColumnsDescription.h>
#include <IO/IResourceManager.h>
#include "config_core.h"
@ -520,6 +520,10 @@ public:
std::shared_ptr<const EnabledQuota> getQuota() const;
std::optional<QuotaUsage> getQuotaUsage() const;
/// Resource management related
ResourceManagerPtr getResourceManager() const;
ClassifierPtr getClassifier() const;
/// We have to copy external tables inside executeQuery() to track limits. Therefore, set callback for it. Must set once.
void setExternalTablesInitializer(ExternalTablesInitializer && initializer);
/// This method is called in executeQuery() and will call the external tables initializer.