This commit is contained in:
Sergei Trifonov 2024-09-18 15:34:05 -05:00 committed by GitHub
commit cb2f74cb75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
79 changed files with 4124 additions and 169 deletions

View File

@ -85,7 +85,7 @@
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Common/Scheduler/Nodes/registerSchedulerNodes.h>
#include <Common/Scheduler/Nodes/registerResourceManagers.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Common/Config/ConfigReloader.h>
#include <Server/HTTPHandlerFactory.h>
#include "MetricsTransmitter.h"
@ -780,7 +780,6 @@ try
registerFormats();
registerRemoteFileMetadatas();
registerSchedulerNodes();
registerResourceManagers();
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
@ -2089,6 +2088,8 @@ try
database_catalog.assertDatabaseExists(default_database);
/// Load user-defined SQL functions.
global_context->getUserDefinedSQLObjectsStorage().loadObjects();
/// Load WORKLOADs and RESOURCEs.
global_context->getWorkloadEntityStorage().loadEntities();
}
catch (...)
{

View File

@ -1386,6 +1386,10 @@
If not specified they will be stored locally. -->
<!-- <user_defined_zookeeper_path>/clickhouse/user_defined</user_defined_zookeeper_path> -->
<!-- Path in ZooKeeper to store workload and resource created by the command CREATE WORKLOAD and CREATE REESOURCE.
If not specified they will be stored locally. -->
<!-- <workload_zookeeper_path>/clickhouse/workload</workload_zookeeper_path> -->
<!-- Uncomment if you want data to be compressed 30-100% better.
Don't do that if you just started using ClickHouse.
-->

View File

@ -99,6 +99,8 @@ enum class AccessType : uint8_t
M(CREATE_ARBITRARY_TEMPORARY_TABLE, "", GLOBAL, CREATE) /* allows to create and manipulate temporary tables
with arbitrary table engine */\
M(CREATE_FUNCTION, "", GLOBAL, CREATE) /* allows to execute CREATE FUNCTION */ \
M(CREATE_WORKLOAD, "", GLOBAL, CREATE) /* allows to execute CREATE WORKLOAD */ \
M(CREATE_RESOURCE, "", GLOBAL, CREATE) /* allows to execute CREATE RESOURCE */ \
M(CREATE_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE, "", GROUP, ALL) /* allows to execute {CREATE|ATTACH} */ \
\
@ -108,6 +110,8 @@ enum class AccessType : uint8_t
implicitly enabled by the grant DROP_TABLE */\
M(DROP_DICTIONARY, "", DICTIONARY, DROP) /* allows to execute {DROP|DETACH} DICTIONARY */\
M(DROP_FUNCTION, "", GLOBAL, DROP) /* allows to execute DROP FUNCTION */\
M(DROP_WORKLOAD, "", GLOBAL, DROP) /* allows to execute DROP WORKLOAD */\
M(DROP_RESOURCE, "", GLOBAL, DROP) /* allows to execute DROP RESOURCE */\
M(DROP_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute DROP NAMED COLLECTION */\
M(DROP, "", GROUP, ALL) /* allows to execute {DROP|DETACH} */\
\

View File

@ -689,15 +689,17 @@ bool ContextAccess::checkAccessImplHelper(const ContextPtr & context, AccessFlag
const AccessFlags dictionary_ddl = AccessType::CREATE_DICTIONARY | AccessType::DROP_DICTIONARY;
const AccessFlags function_ddl = AccessType::CREATE_FUNCTION | AccessType::DROP_FUNCTION;
const AccessFlags workload_ddl = AccessType::CREATE_WORKLOAD | AccessType::DROP_WORKLOAD;
const AccessFlags resource_ddl = AccessType::CREATE_RESOURCE | AccessType::DROP_RESOURCE;
const AccessFlags table_and_dictionary_ddl = table_ddl | dictionary_ddl;
const AccessFlags table_and_dictionary_and_function_ddl = table_ddl | dictionary_ddl | function_ddl;
const AccessFlags write_table_access = AccessType::INSERT | AccessType::OPTIMIZE;
const AccessFlags write_dcl_access = AccessType::ACCESS_MANAGEMENT - AccessType::SHOW_ACCESS;
const AccessFlags not_readonly_flags = write_table_access | table_and_dictionary_and_function_ddl | write_dcl_access | AccessType::SYSTEM | AccessType::KILL_QUERY;
const AccessFlags not_readonly_flags = write_table_access | table_and_dictionary_and_function_ddl | workload_ddl | resource_ddl | write_dcl_access | AccessType::SYSTEM | AccessType::KILL_QUERY;
const AccessFlags not_readonly_1_flags = AccessType::CREATE_TEMPORARY_TABLE;
const AccessFlags ddl_flags = table_ddl | dictionary_ddl | function_ddl;
const AccessFlags ddl_flags = table_ddl | dictionary_ddl | function_ddl | workload_ddl | resource_ddl;
const AccessFlags introspection_flags = AccessType::INTROSPECTION;
};
static const PrecalculatedFlags precalc;

View File

@ -114,6 +114,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Common/NamedCollections)
add_headers_and_sources(dbms Common/Scheduler/Workload)
if (TARGET ch_contrib::amqp_cpp)
add_headers_and_sources(dbms Storages/RabbitMQ)

View File

@ -609,7 +609,12 @@
M(728, UNEXPECTED_DATA_TYPE) \
M(729, ILLEGAL_TIME_SERIES_TAGS) \
M(730, REFRESH_FAILED) \
<<<<<<< HEAD
M(731, WORKLOAD_ENTITY_ALREADY_EXISTS) \
M(732, UNKNOWN_WORKLOAD_ENTITY) \
=======
M(731, QUERY_CACHE_USED_WITH_NON_THROW_OVERFLOW_MODE) \
>>>>>>> master
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -6,6 +6,7 @@
/// Separate type (rather than `Int64` is used just to avoid implicit conversion errors and to default-initialize
struct Priority
{
Int64 value = 0; /// Note that lower value means higher priority.
constexpr operator Int64() const { return value; } /// NOLINT
using Value = Int64;
Value value = 0; /// Note that lower value means higher priority.
constexpr operator Value() const { return value; } /// NOLINT
};

View File

@ -51,7 +51,7 @@ public:
virtual ClassifierPtr acquire(const String & classifier_name) = 0;
/// For introspection, see `system.scheduler` table
using VisitorFunc = std::function<void(const String & resource, const String & path, const String & type, const SchedulerNodePtr & node)>;
using VisitorFunc = std::function<void(const String & resource, const String & path, ISchedulerNode * node)>;
virtual void forEachNode(VisitorFunc visitor) = 0;
};

View File

@ -15,8 +15,7 @@ namespace DB
* When constraint is again satisfied, scheduleActivation() is called from finishRequest().
*
* Derived class behaviour requirements:
* - dequeueRequest() must fill `request->constraint` iff it is nullptr;
* - finishRequest() must be recursive: call to `parent_constraint->finishRequest()`.
* - dequeueRequest() must call `request->addConstraint()`.
*/
class ISchedulerConstraint : public ISchedulerNode
{
@ -25,34 +24,16 @@ public:
: ISchedulerNode(event_queue_, config, config_prefix)
{}
ISchedulerConstraint(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
: ISchedulerNode(event_queue_, info_)
{}
/// Resource consumption by `request` is finished.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void finishRequest(ResourceRequest * request) = 0;
void setParent(ISchedulerNode * parent_) override
{
ISchedulerNode::setParent(parent_);
// Assign `parent_constraint` to the nearest parent derived from ISchedulerConstraint
for (ISchedulerNode * node = parent_; node != nullptr; node = node->parent)
{
if (auto * constraint = dynamic_cast<ISchedulerConstraint *>(node))
{
parent_constraint = constraint;
break;
}
}
}
/// For introspection of current state (true = satisfied, false = violated)
virtual bool isSatisfied() = 0;
protected:
// Reference to nearest parent that is also derived from ISchedulerConstraint.
// Request can traverse through multiple constraints while being dequeue from hierarchy,
// while finishing request should traverse the same chain in reverse order.
// NOTE: it must be immutable after initialization, because it is accessed in not thread-safe way from finishRequest()
ISchedulerConstraint * parent_constraint = nullptr;
};
}

View File

@ -57,7 +57,13 @@ struct SchedulerNodeInfo
SchedulerNodeInfo() = default;
explicit SchedulerNodeInfo(const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
explicit SchedulerNodeInfo(double weight_, Priority priority_ = {})
{
setWeight(weight_);
setPriority(priority_);
}
explicit SchedulerNodeInfo(const Poco::Util::AbstractConfiguration & config, const String & config_prefix = {})
{
setWeight(config.getDouble(config_prefix + ".weight", weight));
setPriority(config.getInt64(config_prefix + ".priority", priority));
@ -78,6 +84,11 @@ struct SchedulerNodeInfo
priority.value = value;
}
void setPriority(Priority value)
{
priority = value;
}
// To check if configuration update required
bool equals(const SchedulerNodeInfo & o) const
{
@ -123,8 +134,15 @@ public:
, info(config, config_prefix)
{}
ISchedulerNode(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
: event_queue(event_queue_)
, info(info_)
{}
virtual ~ISchedulerNode() = default;
virtual const String & getTypeName() const = 0;
/// Checks if two nodes configuration is equal
virtual bool equals(ISchedulerNode * other)
{
@ -134,10 +152,11 @@ public:
/// Attach new child
virtual void attachChild(const std::shared_ptr<ISchedulerNode> & child) = 0;
/// Detach and destroy child
/// Detach child
/// NOTE: child might be destroyed if the only reference was stored in parent
virtual void removeChild(ISchedulerNode * child) = 0;
/// Get attached child by name
/// Get attached child by name (for tests only)
virtual ISchedulerNode * getChild(const String & child_name) = 0;
/// Activation of child due to the first pending request
@ -147,7 +166,7 @@ public:
/// Returns true iff node is active
virtual bool isActive() = 0;
/// Returns number of active children
/// Returns number of active children (for introspection only).
virtual size_t activeChildren() = 0;
/// Returns the first request to be executed as the first component of resulting pair.
@ -155,10 +174,10 @@ public:
virtual std::pair<ResourceRequest *, bool> dequeueRequest() = 0;
/// Returns full path string using names of every parent
String getPath()
String getPath() const
{
String result;
ISchedulerNode * ptr = this;
const ISchedulerNode * ptr = this;
while (ptr->parent)
{
result = "/" + ptr->basename + result;

View File

@ -21,6 +21,10 @@ public:
: ISchedulerNode(event_queue_, config, config_prefix)
{}
ISchedulerQueue(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
: ISchedulerNode(event_queue_, info_)
{}
// Wrapper for `enqueueRequest()` that should be used to account for available resource budget
// Returns `estimated_cost` that should be passed later to `adjustBudget()`
[[ nodiscard ]] ResourceCost enqueueRequestUsingBudget(ResourceRequest * request)
@ -47,6 +51,11 @@ public:
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual bool cancelRequest(ResourceRequest * request) = 0;
/// Fails all the resource requests in queue and marks this queue as not usable.
/// Afterwards any new request will be failed on `enqueueRequest()`.
/// NOTE: This is done for queues that are about to be destructed.
virtual void purgeQueue() = 0;
/// For introspection
ResourceCost getBudget() const
{

View File

@ -1,7 +1,6 @@
#include <Common/Scheduler/Nodes/DynamicResourceManager.h>
#include <Common/Scheduler/Nodes/SchedulerNodeFactory.h>
#include <Common/Scheduler/ResourceManagerFactory.h>
#include <Common/Scheduler/ISchedulerQueue.h>
#include <Common/Exception.h>
@ -245,7 +244,7 @@ void DynamicResourceManager::forEachNode(IResourceManager::VisitorFunc visitor)
{
for (auto & [name, resource] : state_ref->resources)
for (auto & [path, node] : resource->nodes)
visitor(name, path, node.type, node.ptr);
visitor(name, path, node.ptr.get());
promise.set_value();
});
@ -253,9 +252,4 @@ void DynamicResourceManager::forEachNode(IResourceManager::VisitorFunc visitor)
future.get();
}
void registerDynamicResourceManager(ResourceManagerFactory & factory)
{
factory.registerMethod<DynamicResourceManager>("dynamic");
}
}

View File

@ -48,6 +48,16 @@ public:
: ISchedulerNode(event_queue_, config, config_prefix)
{}
FairPolicy(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
: ISchedulerNode(event_queue_, info_)
{}
const String & getTypeName() const override
{
static String type_name("fair");
return type_name;
}
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))

View File

@ -30,6 +30,21 @@ public:
: ISchedulerQueue(event_queue_, config, config_prefix)
{}
FifoQueue(EventQueue * event_queue_, const SchedulerNodeInfo & info_)
: ISchedulerQueue(event_queue_, info_)
{}
~FifoQueue() override
{
chassert(requests.empty());
}
const String & getTypeName() const override
{
static String type_name("fifo");
return type_name;
}
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
@ -42,6 +57,8 @@ public:
void enqueueRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
if (is_not_usable)
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Scheduler queue is about to be destructed");
queue_cost += request->cost;
bool was_empty = requests.empty();
requests.push_back(*request);
@ -66,6 +83,8 @@ public:
bool cancelRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
if (is_not_usable)
return false; // Any request should already be failed or executed
if (request->is_linked())
{
// It's impossible to check that `request` is indeed inserted to this queue and not another queue.
@ -88,6 +107,19 @@ public:
return false;
}
void purgeQueue() override
{
std::lock_guard lock(mutex);
is_not_usable = true;
while (!requests.empty())
{
ResourceRequest * request = &requests.front();
requests.pop_front();
request->failed(std::make_exception_ptr(
Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Scheduler queue with resource request is about to be destructed")));
}
}
bool isActive() override
{
std::lock_guard lock(mutex);
@ -131,6 +163,7 @@ private:
std::mutex mutex;
Int64 queue_cost = 0;
boost::intrusive::list<ResourceRequest> requests;
bool is_not_usable = false;
};
}

View File

@ -0,0 +1,502 @@
#include "Common/Scheduler/IResourceManager.h"
#include <Common/Scheduler/Nodes/IOResourceManager.h>
#include <Common/Scheduler/Nodes/FifoQueue.h>
#include <Common/Scheduler/Nodes/FairPolicy.h>
#include <Common/Exception.h>
#include <Common/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/Priority.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTCreateResourceQuery.h>
#include <memory>
#include <mutex>
#include <map>
namespace DB
{
namespace ErrorCodes
{
extern const int RESOURCE_ACCESS_DENIED;
extern const int RESOURCE_NOT_FOUND;
extern const int INVALID_SCHEDULER_NODE;
extern const int LOGICAL_ERROR;
}
namespace
{
String getEntityName(const ASTPtr & ast)
{
if (auto * create = typeid_cast<ASTCreateWorkloadQuery *>(ast.get()))
return create->getWorkloadName();
if (auto * create = typeid_cast<ASTCreateResourceQuery *>(ast.get()))
return create->getResourceName();
return "unknown-workload-entity";
}
}
IOResourceManager::NodeInfo::NodeInfo(const ASTPtr & ast, const String & resource_name)
{
auto * create = typeid_cast<ASTCreateWorkloadQuery *>(ast.get());
name = create->getWorkloadName();
parent = create->getWorkloadParent();
// TODO(serxa): parse workload settings specifically for `resource_name`
UNUSED(resource_name);
}
IOResourceManager::Resource::Resource(const ASTPtr & resource_entity_)
: resource_entity(resource_entity_)
, resource_name(getEntityName(resource_entity))
{
scheduler.start();
}
IOResourceManager::Resource::~Resource()
{
// TODO(serxa): destroy all workloads, purge all queue, abort all resource requests
scheduler.stop();
}
void IOResourceManager::Resource::createNode(const NodeInfo & info)
{
// TODO(serxa): make sure all possible callers validate empty workload name!
if (info.name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Workload must have a name in resource '{}'",
resource_name);
// TODO(serxa): make sure all possible callers validate self-reference!
if (info.name == info.parent)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Self-referencing workload '{}' is not allowed in resource '{}'",
info.name, resource_name);
if (node_for_workload.contains(info.name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Node for creating workload '{}' already exist in resource '{}'",
info.name, resource_name);
// TODO(serxa): make sure all possible callers validate parent existence, add tests for creating workload with invalid parent
if (!info.parent.empty() && !node_for_workload.contains(info.parent))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parent node '{}' for creating workload '{}' does not exist in resource '{}'",
info.parent, info.name, resource_name);
// TODO(serxa): make sure all possible callers validate second root, add tests for creating the second root
if (info.parent.empty() && root_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The second root workload '{}' is not allowed (current root '{}') in resource '{}'",
info.name, root_node->basename, resource_name);
executeInSchedulerThread([&, this]
{
auto node = std::make_shared<UnifiedSchedulerNode>(scheduler.event_queue, info.settings);
node->basename = info.name;
if (!info.parent.empty())
node_for_workload[info.parent]->attachUnifiedChild(node);
else
{
root_node = node;
scheduler.attachChild(root_node);
}
node_for_workload[info.name] = node;
updateCurrentVersion();
});
}
void IOResourceManager::Resource::deleteNode(const NodeInfo & info)
{
if (!node_for_workload.contains(info.name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Node for removing workload '{}' does not exist in resource '{}'",
info.name, resource_name);
if (!info.parent.empty() && !node_for_workload.contains(info.parent))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parent node '{}' for removing workload '{}' does not exist in resource '{}'",
info.parent, info.name, resource_name);
auto node = node_for_workload[info.name];
// TODO(serxa): make sure all possible callers validate that removing workload has no children workloads
if (node->hasUnifiedChildren())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Removing workload '{}' with children in resource '{}'",
info.name, resource_name);
executeInSchedulerThread([&, this]
{
if (!info.parent.empty())
node_for_workload[info.parent]->detachUnifiedChild(node);
else
{
chassert(node == root_node);
scheduler.removeChild(root_node.get());
root_node.reset();
}
updateCurrentVersion();
});
}
void IOResourceManager::Resource::updateNode(const NodeInfo & old_info, const NodeInfo & new_info)
{
if (old_info.name != new_info.name)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Updating a name of workload '{}' to '{}' is not allowed in resource '{}'",
old_info.name, new_info.name, resource_name);
if (old_info.parent != new_info.parent && (old_info.parent.empty() || old_info.parent.empty()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Workload '{}' invalid update of parent from '{}' to '{}' in resource '{}'",
old_info.name, old_info.parent, new_info.parent, resource_name);
if (!node_for_workload.contains(old_info.name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Node for updating workload '{}' does not exist in resource '{}'",
old_info.name, resource_name);
if (!old_info.parent.empty() && !node_for_workload.contains(old_info.parent))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Old parent node '{}' for updating workload '{}' does not exist in resource '{}'",
old_info.parent, old_info.name, resource_name);
if (!new_info.parent.empty() && !node_for_workload.contains(new_info.parent))
throw Exception(ErrorCodes::LOGICAL_ERROR, "New parent node '{}' for updating workload '{}' does not exist in resource '{}'",
new_info.parent, new_info.name, resource_name);
executeInSchedulerThread([&, this]
{
auto node = node_for_workload[old_info.name];
bool detached = false;
if (old_info.parent != new_info.parent)
{
node_for_workload[old_info.parent]->detachUnifiedChild(node);
detached = true;
}
node->updateSchedulingSettings(new_info.settings);
if (!detached && !old_info.parent.empty() && old_info.settings.priority != new_info.settings.priority)
node_for_workload[old_info.parent]->updateUnifiedChildPriority(
node,
old_info.settings.priority,
new_info.settings.priority);
if (detached)
node_for_workload[new_info.parent]->attachUnifiedChild(node);
updateCurrentVersion();
});
}
void IOResourceManager::Resource::updateCurrentVersion()
{
auto previous_version = current_version;
// Create a full list of constraints and queues in the current hierarchy
current_version = std::make_shared<Version>();
if (root_node)
root_node->addRawPointerNodes(current_version->nodes);
// See details in version control section of description in IOResourceManager.h
if (previous_version)
{
previous_version->newer_version = current_version;
// TODO(serxa): Node activations might be in event queue on destruction. How to process them? should we just process all events in queue on important updates? add a separate queue for hierarchy modifications? Or maybe everything works as expected, we need unit tests for this.
// Looks like the problem of activations could be solved just by unliking activation from intrusive list on destruction, but we must make sure all destruction are done under event_queue::mutex (which seems imposible)
previous_version.reset(); // Destroys previous version nodes if there are no classifiers referencing it
}
}
IOResourceManager::Workload::Workload(IOResourceManager * resource_manager_, const ASTPtr & workload_entity_)
: resource_manager(resource_manager_)
, workload_entity(workload_entity_)
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->createNode(NodeInfo(workload_entity, resource_name));
}
IOResourceManager::Workload::~Workload()
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->deleteNode(NodeInfo(workload_entity, resource_name));
}
void IOResourceManager::Workload::updateWorkload(const ASTPtr & new_entity)
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->updateNode(NodeInfo(workload_entity, resource_name), NodeInfo(new_entity, resource_name));
workload_entity = new_entity;
}
String IOResourceManager::Workload::getParent() const
{
return typeid_cast<ASTCreateWorkloadQuery *>(workload_entity.get())->getWorkloadParent();
}
IOResourceManager::IOResourceManager(IWorkloadEntityStorage & storage_)
: storage(storage_)
{
workload_change_subscription = storage.subscribeForChanges(WorkloadEntityType::Workload, [this] (
WorkloadEntityType,
const String & entity_name,
const ASTPtr & entity)
{
try
{
if (entity)
createOrUpdateWorkload(entity_name, entity);
else
deleteWorkload(entity_name);
}
catch (...)
{
// TODO(serxa): handle CRUD errors
}
});
resource_change_subscription = storage.subscribeForChanges(WorkloadEntityType::Resource, [this] (
WorkloadEntityType,
const String & entity_name,
const ASTPtr & entity /* new or changed entity, null if removed */)
{
try
{
if (entity)
createResource(entity_name, entity);
else
deleteResource(entity_name);
}
catch (...)
{
// TODO(serxa): handle CRUD errors
}
});
}
void IOResourceManager::updateConfiguration(const Poco::Util::AbstractConfiguration &)
{
// No-op
}
void IOResourceManager::createOrUpdateWorkload(const String & workload_name, const ASTPtr & ast)
{
std::unique_lock lock{mutex};
if (auto workload_iter = workloads.find(workload_name); workload_iter != workloads.end())
workload_iter->second->updateWorkload(ast);
else
workloads.emplace(workload_name, std::make_shared<Workload>(this, ast));
}
void IOResourceManager::deleteWorkload(const String & workload_name)
{
std::unique_lock lock{mutex};
if (auto workload_iter = workloads.find(workload_name); workload_iter != workloads.end())
workloads.erase(workload_iter);
else
{
// Workload to be deleted does not exist -- do nothing, throwing exceptions from a subscription is pointless
// TODO(serxa): add logging
}
}
void IOResourceManager::createResource(const String & resource_name, const ASTPtr & ast)
{
std::unique_lock lock{mutex};
if (auto resource_iter = resources.find(resource_name); resource_iter != resources.end())
{
// Resource to be created already exist -- do nothing, throwing exceptions from a subscription is pointless
// TODO(serxa): add logging
}
else
{
// Add all workloads into the new resource
auto resource = std::make_shared<Resource>(ast);
for (Workload * workload : topologicallySortedWorkloads())
resource->createNode(NodeInfo(workload->workload_entity, resource_name));
// Attach the resource
resources.emplace(resource_name, resource);
}
}
void IOResourceManager::deleteResource(const String & resource_name)
{
std::unique_lock lock{mutex};
if (auto resource_iter = resources.find(resource_name); resource_iter != resources.end())
{
resources.erase(resource_iter);
}
else
{
// Resource to be deleted does not exist -- do nothing, throwing exceptions from a subscription is pointless
// TODO(serxa): add logging
}
}
IOResourceManager::Classifier::~Classifier()
{
// Detach classifier from all resources in parallel (executed in every scheduler thread)
std::vector<std::future<void>> futures;
{
std::unique_lock lock{mutex};
futures.reserve(attachments.size());
for (auto & [resource_name, attachment] : attachments)
{
futures.emplace_back(attachment.resource->detachClassifier(std::move(attachment.version)));
attachment.link.reset(); // Just in case because it is not valid any longer
}
}
// Wait for all tasks to finish (to avoid races in case of exceptions)
for (auto & future : futures)
future.wait();
// There should not be any exceptions because it just destruct few objects, but let's rethrow just in case
for (auto & future : futures)
future.get();
// This unreferences and probably destroys `Resource` objects.
// NOTE: We cannot do it in the scheduler threads (because thread cannot join itself).
attachments.clear();
}
std::future<void> IOResourceManager::Resource::detachClassifier(VersionPtr && version)
{
auto detach_promise = std::make_shared<std::promise<void>>(); // event queue task is std::function, which requires copy semanticss
auto future = detach_promise->get_future();
scheduler.event_queue->enqueue([detached_version = std::move(version), promise = std::move(detach_promise)] mutable
{
try
{
// Unreferences and probably destroys the version and scheduler nodes it owns.
// The main reason from moving destruction into the scheduler thread is to
// free memory in the same thread it was allocated to avoid memtrackers drift.
detached_version.reset();
promise->set_value();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
});
return future;
}
ResourceLink IOResourceManager::Classifier::get(const String & resource_name)
{
std::unique_lock lock{mutex};
if (auto iter = attachments.find(resource_name); iter != attachments.end())
return iter->second.link;
else
throw Exception(ErrorCodes::RESOURCE_NOT_FOUND, "Access denied to resource '{}'", resource_name);
}
void IOResourceManager::Classifier::attach(const ResourcePtr & resource, const VersionPtr & version, ResourceLink link)
{
std::unique_lock lock{mutex};
chassert(!attachments.contains(resource->getName()));
attachments[resource->getName()] = Attachment{.resource = resource, .version = version, .link = link};
}
std::future<void> IOResourceManager::Resource::attachClassifier(Classifier & classifier, const String & workload_name)
{
auto attach_promise = std::make_shared<std::promise<void>>(); // event queue task is std::function, which requires copy semantics
auto future = attach_promise->get_future();
scheduler.event_queue->enqueue([&, this, promise = std::move(attach_promise)] mutable
{
try
{
if (auto iter = node_for_workload.find(workload_name); iter != node_for_workload.end())
{
auto queue = iter->second->getQueue();
if (!queue)
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Unable to use workload '{}' that have children for resource '{}'",
workload_name, resource_name);
classifier.attach(shared_from_this(), current_version, ResourceLink{.queue = queue.get()});
}
else
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Unable to find workload '{}' for resource '{}'", workload_name, resource_name);
promise->set_value();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
});
return future;
}
ClassifierPtr IOResourceManager::acquire(const String & workload_name)
{
auto classifier = std::make_shared<Classifier>();
// Attach classifier to all resources in parallel (executed in every scheduler thread)
std::vector<std::future<void>> futures;
{
std::unique_lock lock{mutex};
futures.reserve(resources.size());
for (auto & [resource_name, resource] : resources)
futures.emplace_back(resource->attachClassifier(*classifier, workload_name));
}
// Wait for all tasks to finish (to avoid races in case of exceptions)
for (auto & future : futures)
future.wait();
// Rethrow exceptions if any
for (auto & future : futures)
future.get();
return classifier;
}
void IOResourceManager::Resource::forEachResourceNode(IResourceManager::VisitorFunc & visitor)
{
executeInSchedulerThread([&, this]
{
for (auto & [path, node] : node_for_workload)
{
node->forEachSchedulerNode([&] (ISchedulerNode * scheduler_node)
{
visitor(resource_name, scheduler_node->getPath(), scheduler_node);
});
}
});
}
void IOResourceManager::forEachNode(IResourceManager::VisitorFunc visitor)
{
// Gather resource upfront to avoid holding mutex for a long time
std::map<String, ResourcePtr> sorted_resources;
{
std::unique_lock lock{mutex};
for (auto & [resource_name, resource] : resources)
sorted_resources[resource_name] = resource;
}
/// Run tasks one by one to avoid concurrent calls to visitor
for (auto & [resource_name, resource] : sorted_resources)
resource->forEachResourceNode(visitor);
}
void IOResourceManager::topologicallySortedWorkloadsImpl(Workload * workload, std::unordered_set<Workload *> & visited, std::vector<Workload *> & sorted_workloads)
{
if (visited.contains(workload))
return;
visited.insert(workload);
// Recurse into parent (if any)
String parent = workload->getParent();
if (!parent.empty())
{
auto parent_iter = workloads.find(parent);
chassert(parent_iter != workloads.end()); // validations check that all parents exist
topologicallySortedWorkloadsImpl(parent_iter->second.get(), visited, sorted_workloads);
}
sorted_workloads.push_back(workload);
}
std::vector<IOResourceManager::Workload *> IOResourceManager::topologicallySortedWorkloads()
{
std::vector<Workload *> sorted_workloads;
std::unordered_set<Workload *> visited;
for (auto & [workload_name, workload] : workloads)
topologicallySortedWorkloadsImpl(workload.get(), visited, sorted_workloads);
return sorted_workloads;
}
}

View File

@ -0,0 +1,272 @@
#pragma once
#include <base/defines.h>
#include <base/scope_guard.h>
#include <Common/Scheduler/SchedulingSettings.h>
#include <Common/Scheduler/IResourceManager.h>
#include <Common/Scheduler/SchedulerRoot.h>
#include <Common/Scheduler/Nodes/UnifiedSchedulerNode.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Parsers/IAST_fwd.h>
#include <boost/core/noncopyable.hpp>
#include <exception>
#include <memory>
#include <mutex>
#include <future>
#include <unordered_set>
namespace DB
{
/*
* Implementation of `IResourceManager` that creates hierarchy of scheduler nodes according to
* workload entities (WORKLOADs and RESOURCEs). It subscribes for updates in IWorkloadEntityStorage and
* creates hierarchy of UnifiedSchedulerNode identical to the hierarchy of WORKLOADs.
* For every RESOURCE an independent hierarchy of scheduler nodes is created.
*
* Manager process updates of WORKLOADs and RESOURCEs: CREATE/DROP/ALTER.
* When a RESOURCE is created (dropped) a corresponding scheduler nodes hierarchy is created (destroyed).
* After DROP RESOURCE parts of hierarchy might be keept alive while at least one query uses it.
*
* Manager is specific to IO only because it create scheduler node hierarchies for RESOURCEs having
* WRITE DISK and/or READ DISK definitions. CPU and memory resources are managed separately.
*
* Classifiers are used (1) to access IO resources and (2) to keep shared ownership of scheduling nodes.
* This allows `ResourceRequest` and `ResourceLink` to hold raw pointers as long as
* `ClassifierPtr` is acquired and held.
*
* === RESOURCE ARCHITECTURE ===
* Let's consider how a single resource is implemented. Every workload is represented by corresponding UnifiedSchedulerNode.
* Every UnifiedSchedulerNode manages its own subtree of ISchedulerNode objects (see details in UnifiedSchedulerNode.h)
* UnifiedSchedulerNode for workload w/o children has a queue, which provide a ResourceLink for consumption.
* Parent of the root workload for a resource is SchedulerRoot with its own scheduler thread.
* So every resource has its dedicated thread for processing of resource request and other events (see EventQueue).
*
* Here is an example of SQL and corresponding heirarchy of scheduler nodes:
* CREATE RESOURCE my_io_resource (...)
* CREATE WORKLOAD all
* CREATE WORKLOAD production PARENT all
* CREATE WORKLOAD development PARENT all
*
* root - SchedulerRoot (with scheduler thread and EventQueue)
* |
* all - UnifiedSchedulerNode
* |
* p0_fair - FairPolicy (part of parent UnifiedSchedulerNode internal structure)
* / \
* production development - UnifiedSchedulerNode
* | |
* queue queue - FifoQueue (part of parent UnifiedSchedulerNode internal structure)
*
* === UPDATING WORKLOADS ===
* Workload may be created, updated or deleted.
* Updating a child of a workload might lead to updating other workloads:
* 1. Workload itself: it's structure depend on settings of children workloads
* (e.g. fifo node of a leaf workload is remove when the first child is added;
* and a fair node is inserted after the first two children are added).
* 2. Other children: for them path to root might be changed (e.g. intermediate priority node is inserted)
*
* === VERSION CONTROL ===
* Versions are created on hierarchy updates and hold ownership of nodes that are used through raw pointers.
* Classifier reference version of every resource it use. Older version reference newer version.
* Here is a diagram explaining version control based on Version objects (for 1 resource):
*
* [nodes] [nodes] [nodes]
* ^ ^ ^
* | | |
* version1 --> version2 -...-> versionN
* ^ ^ ^
* | | |
* old_classifier new_classifier current_version
*
* Previous version should hold reference to a newer version. It is required for proper handling of updates.
* Classifiers that were created for any of old versions may use nodes of newer version due to updateNode().
* It may move a queue to a new position in the hierarchy or create/destry constraints, thus resource requests
* created by old classifier may reference constraints of newer versions through `request->constraints` which
* is filled during dequeueRequst().
*
* === THREADS ===
* scheduler thread:
* - one thread per resource
* - uses event_queue (per resource) for processing w/o holding mutex for every scheduler node
* - handle resource requests
* - node activations
* - scheduler hierarchy updates
* query thread:
* - multiple independent threads
* - send resource requests
* - acquire and release classifiers (via scheduler event queues)
* control thread:
* - modify workload and resources through subscription
*
* === SYNCHRONIZATION ===
* List of related sync primitives and their roles:
* IOResourceManager::mutex
* - protects resource manager data structures - resource and workloads
* - serialize control thread actions
* IOResourceManager::Resource::scheduler->event_queue
* - serializes scheduler hierarchy events
* - events are created in control and query threads
* - all events are processed by specific scheduler thread
* - hierarchy-wide actions: requests dequeueing, activations propagation and nodes updates.
* - resource version control management
* FifoQueue::mutex and SemaphoreContraint::mutex
* - serializes query and scheduler threads on specific node accesses
* - resource request processing: enqueueRequest(), dequeueRequest() and finishRequest()
*/
class IOResourceManager : public IResourceManager
{
public:
explicit IOResourceManager(IWorkloadEntityStorage & storage_);
void updateConfiguration(const Poco::Util::AbstractConfiguration & config) override;
ClassifierPtr acquire(const String & workload_name) override;
void forEachNode(VisitorFunc visitor) override;
private:
// Forward declarations
struct NodeInfo;
struct Version;
class Resource;
struct Workload;
class Classifier;
friend struct Workload;
using VersionPtr = std::shared_ptr<Version>;
using ResourcePtr = std::shared_ptr<Resource>;
using WorkloadPtr = std::shared_ptr<Workload>;
/// Helper for parsing workload AST for a specific resource
struct NodeInfo
{
String name; // Workload name
String parent; // Name of parent workload
SchedulingSettings settings; // Settings specific for a given resource
NodeInfo(const ASTPtr & ast, const String & resource_name);
};
/// Ownership control for scheduler nodes, which could be referenced by raw pointers
struct Version
{
std::vector<SchedulerNodePtr> nodes;
VersionPtr newer_version;
};
/// Holds a thread and hierarchy of unified scheduler nodes for specific RESOURCE
class Resource : public std::enable_shared_from_this<Resource>, boost::noncopyable
{
public:
explicit Resource(const ASTPtr & resource_entity_);
~Resource();
const String & getName() const { return resource_name; }
/// Hierarchy management
void createNode(const NodeInfo & info);
void deleteNode(const NodeInfo & info);
void updateNode(const NodeInfo & old_info, const NodeInfo & new_info);
/// Updates a classifier to contain a reference for specified workload
std::future<void> attachClassifier(Classifier & classifier, const String & workload_name);
/// Remove classifier reference. This destroys scheduler nodes in proper scheduler thread
std::future<void> detachClassifier(VersionPtr && version);
/// Introspection
void forEachResourceNode(IOResourceManager::VisitorFunc & visitor);
private:
void updateCurrentVersion();
template <class Task>
void executeInSchedulerThread(Task && task)
{
std::promise<void> promise;
auto future = promise.get_future();
scheduler.event_queue->enqueue([&]
{
try
{
task();
promise.set_value();
}
catch (...)
{
promise.set_exception(std::current_exception());
}
});
future.get(); // Blocks until execution is done in the scheduler thread
}
const ASTPtr resource_entity;
const String resource_name;
SchedulerRoot scheduler;
// TODO(serxa): consider using resource_manager->mutex + scheduler thread for updates and mutex only for reading to avoid slow acquire/release of classifier
/// These field should be accessed only by the scheduler thread
std::unordered_map<String, UnifiedSchedulerNodePtr> node_for_workload;
UnifiedSchedulerNodePtr root_node;
VersionPtr current_version;
};
struct Workload : boost::noncopyable
{
IOResourceManager * resource_manager;
ASTPtr workload_entity;
Workload(IOResourceManager * resource_manager_, const ASTPtr & workload_entity_);
~Workload();
void updateWorkload(const ASTPtr & new_entity);
String getParent() const;
};
class Classifier : public IClassifier
{
public:
~Classifier() override;
/// Implements IClassifier interface
/// NOTE: It is called from query threads (possibly multiple)
ResourceLink get(const String & resource_name) override;
/// Attaches/detaches a specific resource
/// NOTE: It is called from scheduler threads (possibly multiple)
void attach(const ResourcePtr & resource, const VersionPtr & version, ResourceLink link);
void detach(const ResourcePtr & resource);
private:
IOResourceManager * resource_manager;
std::mutex mutex;
struct Attachment {
ResourcePtr resource;
VersionPtr version;
ResourceLink link;
};
std::unordered_map<String, Attachment> attachments; // TSA_GUARDED_BY(mutex);
};
void createOrUpdateWorkload(const String & workload_name, const ASTPtr & ast);
void deleteWorkload(const String & workload_name);
void createResource(const String & resource_name, const ASTPtr & ast);
void deleteResource(const String & resource_name);
// Topological sorting of worklaods
void topologicallySortedWorkloadsImpl(Workload * workload, std::unordered_set<Workload *> & visited, std::vector<Workload *> & sorted_workloads);
std::vector<Workload *> topologicallySortedWorkloads();
IWorkloadEntityStorage & storage;
scope_guard workload_change_subscription;
scope_guard resource_change_subscription;
std::mutex mutex;
std::unordered_map<String, WorkloadPtr> workloads; // TSA_GUARDED_BY(mutex);
std::unordered_map<String, ResourcePtr> resources; // TSA_GUARDED_BY(mutex);
};
}

View File

@ -39,6 +39,16 @@ public:
: ISchedulerNode(event_queue_, config, config_prefix)
{}
explicit PriorityPolicy(EventQueue * event_queue_, const SchedulerNodeInfo & node_info)
: ISchedulerNode(event_queue_, node_info)
{}
const String & getTypeName() const override
{
static String type_name("priority");
return type_name;
}
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))

View File

@ -1,5 +1,6 @@
#pragma once
#include "Common/Scheduler/ISchedulerNode.h"
#include <Common/Scheduler/ISchedulerConstraint.h>
#include <mutex>
@ -24,6 +25,18 @@ public:
, max_cost(config.getInt64(config_prefix + ".max_cost", config.getInt64(config_prefix + ".max_bytes", default_max_cost)))
{}
SemaphoreConstraint(EventQueue * event_queue_, const SchedulerNodeInfo & info_, Int64 max_requests_, Int64 max_cost_)
: ISchedulerConstraint(event_queue_, info_)
, max_requests(max_requests_)
, max_cost(max_cost_)
{}
const String & getTypeName() const override
{
static String type_name("inflight_limit");
return type_name;
}
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
@ -69,10 +82,7 @@ public:
if (!request)
return {nullptr, false};
// Request has reference to the first (closest to leaf) `constraint`, which can have `parent_constraint`.
// The former is initialized here dynamically and the latter is initialized once during hierarchy construction.
if (!request->constraint)
request->constraint = this;
request->addConstraint(this);
// Update state on request arrival
std::unique_lock lock(mutex);
@ -87,10 +97,6 @@ public:
void finishRequest(ResourceRequest * request) override
{
// Recursive traverse of parent flow controls in reverse order
if (parent_constraint)
parent_constraint->finishRequest(request);
// Update state on request departure
std::unique_lock lock(mutex);
bool was_active = active();

View File

@ -3,8 +3,6 @@
#include <Common/Scheduler/ISchedulerConstraint.h>
#include <chrono>
#include <mutex>
#include <limits>
#include <utility>
@ -28,12 +26,26 @@ public:
, tokens(max_burst)
{}
ThrottlerConstraint(EventQueue * event_queue_, const SchedulerNodeInfo & info_, double max_speed_, double max_burst_)
: ISchedulerConstraint(event_queue_, info_)
, max_speed(max_speed_)
, max_burst(max_burst_)
, last_update(event_queue_->now())
, tokens(max_burst)
{}
~ThrottlerConstraint() override
{
// We should cancel event on destruction to avoid dangling references from event queue
event_queue->cancelPostponed(postponed);
}
const String & getTypeName() const override
{
static String type_name("bandwidth_limit");
return type_name;
}
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))
@ -79,10 +91,7 @@ public:
if (!request)
return {nullptr, false};
// Request has reference to the first (closest to leaf) `constraint`, which can have `parent_constraint`.
// The former is initialized here dynamically and the latter is initialized once during hierarchy construction.
if (!request->constraint)
request->constraint = this;
// We don't do `request->addConstraint(this)` because `finishRequest()` is no-op
updateBucket(request->cost);
@ -93,12 +102,8 @@ public:
return {request, active()};
}
void finishRequest(ResourceRequest * request) override
void finishRequest(ResourceRequest *) override
{
// Recursive traverse of parent flow controls in reverse order
if (parent_constraint)
parent_constraint->finishRequest(request);
// NOTE: Token-bucket constraint does not require any action when consumption ends
}

View File

@ -0,0 +1,433 @@
#pragma once
#include <Common/Priority.h>
#include <Common/Scheduler/Nodes/PriorityPolicy.h>
#include <Common/Scheduler/Nodes/FairPolicy.h>
#include <Common/Scheduler/Nodes/ThrottlerConstraint.h>
#include <Common/Scheduler/Nodes/SemaphoreConstraint.h>
#include <Common/Scheduler/ISchedulerQueue.h>
#include <Common/Scheduler/Nodes/FifoQueue.h>
#include <Common/Scheduler/ISchedulerNode.h>
#include <Common/Scheduler/SchedulingSettings.h>
#include <Common/Exception.h>
#include <memory>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
extern const int LOGICAL_ERROR;
}
class UnifiedSchedulerNode;
using UnifiedSchedulerNodePtr = std::shared_ptr<UnifiedSchedulerNode>;
/*
* Unified scheduler node combines multiple nodes internally to provide all available scheduling policies and constraints.
* Whole scheduling hierarchy could "logically" consist of unified nodes only. Physically intermediate "internal" nodes
* are also present. This approch is easiers for manipulations in runtime than using multiple types of nodes.
*
* Unified node is capable of updating its internal structure based on:
* 1. Number of children (fifo if =0 or fairness/priority if >0).
* 2. Priorities of its children (for subtree structure).
* 3. `SchedulingSettings` associated with unified node (for throttler and semaphore constraints).
*
* In general, unified node has "internal" subtree with the following structure:
*
* THIS <-- UnifiedSchedulerNode object
* |
* THROTTLER <-- [Optional] Throttling scheduling constraint
* |
* [If no children]------ SEMAPHORE <-- [Optional] Semaphore constraint
* | |
* FIFO PRIORITY <-- [Optional] Scheduling policy distinguishing priorities
* .-------' '-------.
* FAIRNESS[p1] ... FAIRNESS[pN] <-- [Optional] Policies for fairness if priorities are equal
* / \ / \
* CHILD[p1,w1] ... CHILD[p1,wM] CHILD[pN,w1] ... CHILD[pN,wM] <-- Unified children (UnifiedSchedulerNode objects)
*
* NOTE: to distinguish different kinds of children we use the following terms:
* - immediate child: child of unified object (THROTTLER);
* - unified child: leaf of this "internal" subtree (CHILD[p,w]);
* - intermediate node: any child that is not UnifiedSchedulerNode (unified child or `this`)
*/
class UnifiedSchedulerNode : public ISchedulerNode
{
private:
/// Helper function for managing a parent of a node
static void reparent(const SchedulerNodePtr & node, const SchedulerNodePtr & new_parent)
{
reparent(node, new_parent.get());
}
/// Helper function for managing a parent of a node
static void reparent(const SchedulerNodePtr & node, ISchedulerNode * new_parent)
{
chassert(new_parent);
if (new_parent == node->parent)
return;
if (node->parent)
node->parent->removeChild(node.get());
new_parent->attachChild(node);
}
/// Helper function for managing a parent of a node
static void detach(const SchedulerNodePtr & node)
{
if (node->parent)
node->parent->removeChild(node.get());
}
/// A branch of the tree for a specific priority value
struct FairnessBranch {
SchedulerNodePtr root; /// FairPolicy node is used if multiple children with the same priority are attached
std::unordered_map<String, UnifiedSchedulerNodePtr> children; // basename -> child
SchedulerNodePtr getRoot()
{
chassert(!children.empty());
if (root)
return root;
return children.begin()->second; // There should be exactly one child
}
/// Attaches a new child.
/// Returns root node if it has been changed to a different node, otherwise returns null.
[[nodiscard]] SchedulerNodePtr attachUnifiedChild(EventQueue * event_queue_, const UnifiedSchedulerNodePtr & child)
{
if (auto [it, inserted] = children.emplace(child->basename, child); !inserted)
throw Exception(
ErrorCodes::INVALID_SCHEDULER_NODE,
"Can't add another child with the same path: {}",
it->second->getPath());
if (children.size() == 2)
{
// Insert fair node if we have just added the second child
chassert(!root);
root = std::make_shared<FairPolicy>(event_queue_, SchedulerNodeInfo{});
root->info.setPriority(child->info.priority);
root->basename = fmt::format("p{}_fair", child->info.priority.value);
for (auto & [_, node] : children)
reparent(node, root);
return root; // New root has been created
}
else if (children.size() == 1)
return child; // We have added single child so far and it is the new root
else
reparent(child, root);
return {}; // Root is the same
}
};
/// Handles all the children nodes with intermediate fair and/or priority nodes
struct ChildrenBranch
{
SchedulerNodePtr root; /// PriorityPolicy node is used if multiple children with different priority are attached
std::unordered_map<Priority::Value, FairnessBranch> branches; /// Branches for different priority values
/// Attaches a new child.
/// Returns root node if it has been changed to a different node, otherwise returns null.
[[nodiscard]] SchedulerNodePtr attachUnifiedChild(EventQueue * event_queue_, const UnifiedSchedulerNodePtr & child)
{
bool existing_branch = branches.contains(child->info.priority);
auto & child_branch = branches[child->info.priority];
auto branch_root = child_branch.attachUnifiedChild(event_queue_, child);
if (existing_branch)
{
if (branch_root)
{
if (root)
reparent(branch_root, root);
else
return branch_root;
}
return {};
}
else
{
chassert(branch_root);
if (branches.size() == 2)
{
// Insert priority node if we have just added the second branch
chassert(!root);
root = std::make_shared<PriorityPolicy>(event_queue_, SchedulerNodeInfo{});
root->basename = "prio";
for (auto & [_, branch] : branches)
reparent(branch.getRoot(), root);
return root; // New root has been created
}
else if (branches.size() == 1)
return child; // We have added single child so far and it is the new root
else
reparent(child, root);
return {}; // Root is the same
}
}
};
/// Handles degenerate case of zero children (a fifo queue) or delegate to `ChildrenBranch`.
struct QueueOrChildrenBranch
{
SchedulerNodePtr queue; /// FifoQueue node is used if there are no children
ChildrenBranch branch; /// Used if there is at least one child
// Should be called after constructor, before any other methods
[[nodiscard]] SchedulerNodePtr initialize(EventQueue * event_queue_)
{
createQueue(event_queue_);
return queue;
}
/// Attaches a new child.
/// Returns root node if it has been changed to a different node, otherwise returns null.
[[nodiscard]] SchedulerNodePtr attachUnifiedChild(EventQueue * event_queue_, const UnifiedSchedulerNodePtr & child)
{
if (queue)
removeQueue();
return branch.attachUnifiedChild(event_queue_, child);
}
private:
void createQueue(EventQueue * event_queue_)
{
queue = std::make_shared<FifoQueue>(event_queue_, SchedulerNodeInfo{});
queue->basename = "fifo";
}
void removeQueue()
{
// This unified node will not be able to process resource requests any longer
// All remaining resource requests are be aborted on queue destruction
detach(queue);
std::static_pointer_cast<ISchedulerQueue>(queue)->purgeQueue();
queue.reset();
}
};
/// Handles all the nodes under this unified node
/// Specifically handles constraints with `QueueOrChildrenBranch` under it
struct ConstraintsBranch
{
SchedulerNodePtr throttler;
SchedulerNodePtr semaphore;
QueueOrChildrenBranch branch;
SchedulingSettings settings;
// Should be called after constructor, before any other methods
[[nodiscard]] SchedulerNodePtr initialize(EventQueue * event_queue_, const SchedulingSettings & settings_)
{
settings = settings_;
SchedulerNodePtr node = branch.initialize(event_queue_);
if (settings.hasSemaphore())
{
semaphore = std::make_shared<SemaphoreConstraint>(event_queue_, SchedulerNodeInfo{}, settings.max_requests, settings.max_cost);
semaphore->basename = "semaphore";
reparent(node, semaphore);
node = semaphore;
}
if (settings.hasThrottler())
{
throttler = std::make_shared<ThrottlerConstraint>(event_queue_, SchedulerNodeInfo{}, settings.max_speed, settings.max_burst);
throttler->basename = "throttler";
reparent(node, throttler);
node = throttler;
}
return node;
}
/// Attaches a new child.
/// Returns root node if it has been changed to a different node, otherwise returns null.
[[nodiscard]] SchedulerNodePtr attachUnifiedChild(EventQueue * event_queue_, const UnifiedSchedulerNodePtr & child)
{
if (auto branch_root = branch.attachUnifiedChild(event_queue_, child))
{
if (semaphore)
reparent(branch_root, semaphore);
else if (throttler)
reparent(branch_root, throttler);
else
return branch_root;
}
return {};
}
};
public:
explicit UnifiedSchedulerNode(EventQueue * event_queue_, const SchedulingSettings & settings)
: ISchedulerNode(event_queue_, SchedulerNodeInfo(settings.weight, settings.priority))
{
immediate_child = impl.initialize(event_queue, settings);
reparent(immediate_child, this);
}
/// Attaches a unified child as a leaf of internal subtree and insert or update all the intermediate nodes
/// NOTE: Do not confuse with `attachChild()` which is used only for immediate children
void attachUnifiedChild(const UnifiedSchedulerNodePtr & child)
{
if (auto new_child = impl.attachUnifiedChild(event_queue, child))
reparent(new_child, this);
}
/// Detaches unified child and update all the intermediate nodes.
/// Detached child could be safely attached to another parent.
/// NOTE: Do not confuse with `removeChild()` which is used only for immediate children
void detachUnifiedChild(const UnifiedSchedulerNodePtr & child)
{
UNUSED(child); // TODO(serxa): implement detachUnifiedChild()
}
/// Updates intermediate nodes subtree according with new priority (priority is set by the caller beforehand)
/// NOTE: Changing a priority of a unified child may lead to change of its parent.
void updateUnifiedChildPriority(const UnifiedSchedulerNodePtr & child, Priority old_priority, Priority new_priority)
{
UNUSED(child, old_priority, new_priority); // TODO(serxa): implement updateUnifiedChildPriority()
}
/// Updates scheduling settings. Set of constraints might change.
/// NOTE: Caller is responsible for calling `updateUnifiedChildPriority` in parent unified node (if any)
void updateSchedulingSettings(const SchedulingSettings & new_settings)
{
UNUSED(new_settings); // TODO(serxa): implement updateSchedulingSettings()
info.setPriority(new_settings.priority);
info.setWeight(new_settings.weight);
}
/// Returns the queue to be used for resource requests or `nullptr` if it has unified children
std::shared_ptr<ISchedulerQueue> getQueue()
{
return static_pointer_cast<ISchedulerQueue>(impl.branch.queue);
}
/// Collects nodes that could be accessed with raw pointers by resource requests (queue and constraints)
/// NOTE: This is a building block for classifier. Note that due to possible movement of a queue, set of constraints
/// for that queue might change in future, and `request->constraints` might reference nodes not in
/// the initial set of nodes returned by `addRawPointerNodes()`. To avoid destruction of such additional nodes
/// classifier must (indirectly) hold nodes return by `addRawPointerNodes()` for all future versions of
/// all unified nodes. Such a version control is done by `IOResourceManager`.
void addRawPointerNodes(std::vector<SchedulerNodePtr> & nodes)
{
if (impl.throttler)
nodes.push_back(impl.throttler);
if (impl.semaphore)
nodes.push_back(impl.semaphore);
if (impl.branch.queue)
nodes.push_back(impl.branch.queue);
for (auto & [_, branch] : impl.branch.branch.branches)
{
for (auto & [_, child] : branch.children)
child->addRawPointerNodes(nodes);
}
}
bool hasUnifiedChildren() const
{
return impl.branch.queue == nullptr;
}
/// Introspection. Calls a visitor for self and every internal node. Do not recurse into unified children.
void forEachSchedulerNode(std::function<void(ISchedulerNode *)> visitor)
{
visitor(this);
if (impl.throttler)
visitor(impl.throttler.get());
if (impl.semaphore)
visitor(impl.semaphore.get());
if (impl.branch.queue)
visitor(impl.branch.queue.get());
if (impl.branch.branch.root) // priority
visitor(impl.branch.branch.root.get());
for (auto & [_, branch] : impl.branch.branch.branches)
{
if (branch.root) // fairness
visitor(branch.root.get());
}
}
protected: // Hide all the ISchedulerNode interface methods as an implementation details
const String & getTypeName() const override
{
static String type_name("unified");
return type_name;
}
bool equals(ISchedulerNode *) override
{
assert(false);
return false;
}
/// Attaches an immediate child (used through `reparent()`)
void attachChild(const SchedulerNodePtr & child_) override
{
immediate_child = child_;
immediate_child->setParent(this);
// Activate if required
if (immediate_child->isActive())
activateChild(immediate_child.get());
}
/// Removes an immediate child (used through `reparent()`)
void removeChild(ISchedulerNode * child) override
{
if (immediate_child.get() == child)
{
child_active = false; // deactivate
immediate_child->setParent(nullptr); // detach
immediate_child.reset();
}
}
ISchedulerNode * getChild(const String & child_name) override
{
if (immediate_child->basename == child_name)
return immediate_child.get();
else
return nullptr;
}
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
auto [request, child_now_active] = immediate_child->dequeueRequest();
if (!request)
return {nullptr, false};
child_active = child_now_active;
if (!child_active)
busy_periods++;
incrementDequeued(request->cost);
return {request, child_active};
}
bool isActive() override
{
return child_active;
}
/// Shows number of immediate active children (for introspection)
size_t activeChildren() override
{
return child_active;
}
/// Activate an immediate child
void activateChild(ISchedulerNode * child) override
{
if (child == immediate_child.get())
if (!std::exchange(child_active, true) && parent)
parent->activateChild(this);
}
private:
ConstraintsBranch impl;
SchedulerNodePtr immediate_child; // An immediate child (actually the root of the whole subtree)
bool child_active = false;
};
}

View File

@ -1,15 +0,0 @@
#include <Common/Scheduler/Nodes/registerResourceManagers.h>
#include <Common/Scheduler/ResourceManagerFactory.h>
namespace DB
{
void registerDynamicResourceManager(ResourceManagerFactory &);
void registerResourceManagers()
{
auto & factory = ResourceManagerFactory::instance();
registerDynamicResourceManager(factory);
}
}

View File

@ -1,8 +0,0 @@
#pragma once
namespace DB
{
void registerResourceManagers();
}

View File

@ -1,5 +1,6 @@
#pragma once
#include "Common/Scheduler/SchedulingSettings.h"
#include <Common/Scheduler/IResourceManager.h>
#include <Common/Scheduler/SchedulerRoot.h>
#include <Common/Scheduler/ResourceGuard.h>
@ -7,17 +8,21 @@
#include <Common/Scheduler/Nodes/PriorityPolicy.h>
#include <Common/Scheduler/Nodes/FifoQueue.h>
#include <Common/Scheduler/Nodes/SemaphoreConstraint.h>
#include <Common/Scheduler/Nodes/UnifiedSchedulerNode.h>
#include <Common/Scheduler/Nodes/registerSchedulerNodes.h>
#include <Common/Scheduler/Nodes/registerResourceManagers.h>
#include <Poco/Util/XMLConfiguration.h>
#include <atomic>
#include <barrier>
#include <exception>
#include <functional>
#include <memory>
#include <unordered_map>
#include <mutex>
#include <set>
#include <sstream>
#include <utility>
namespace DB
{
@ -26,7 +31,7 @@ struct ResourceTestBase
{
ResourceTestBase()
{
[[maybe_unused]] static bool typesRegistered = [] { registerSchedulerNodes(); registerResourceManagers(); return true; }();
[[maybe_unused]] static bool typesRegistered = [] { registerSchedulerNodes(); return true; }();
}
template <class TClass>
@ -37,10 +42,16 @@ struct ResourceTestBase
Poco::AutoPtr config{new Poco::Util::XMLConfiguration(stream)};
String config_prefix = "node";
return add<TClass>(event_queue, root_node, path, std::ref(*config), config_prefix);
}
template <class TClass, class... Args>
static TClass * add(EventQueue * event_queue, SchedulerNodePtr & root_node, const String & path, Args... args)
{
if (path == "/")
{
EXPECT_TRUE(root_node.get() == nullptr);
root_node.reset(new TClass(event_queue, *config, config_prefix));
root_node.reset(new TClass(event_queue, std::forward<Args>(args)...));
return static_cast<TClass *>(root_node.get());
}
@ -65,7 +76,7 @@ struct ResourceTestBase
}
EXPECT_TRUE(!child_name.empty()); // wrong path
SchedulerNodePtr node = std::make_shared<TClass>(event_queue, *config, config_prefix);
SchedulerNodePtr node = std::make_shared<TClass>(event_queue, std::forward<Args>(args)...);
node->basename = child_name;
parent->attachChild(node);
return static_cast<TClass *>(node.get());
@ -107,25 +118,70 @@ class ResourceTestClass : public ResourceTestBase
{
struct Request : public ResourceRequest
{
ResourceTestClass * test;
String name;
Request(ResourceCost cost_, const String & name_)
Request(ResourceTestClass * test_, ResourceCost cost_, const String & name_)
: ResourceRequest(cost_)
, test(test_)
, name(name_)
{}
void execute() override
{
}
void failed(const std::exception_ptr &) override
{
test->failed_cost += cost;
delete this;
}
};
public:
~ResourceTestClass()
{
dequeue(); // Just to avoid any leaks of `Request` object
}
template <class TClass>
void add(const String & path, const String & xml = {})
{
ResourceTestBase::add<TClass>(&event_queue, root_node, path, xml);
}
template <class TClass, class... Args>
void addCustom(const String & path, Args... args)
{
ResourceTestBase::add<TClass>(&event_queue, root_node, path, std::forward<Args>(args)...);
}
UnifiedSchedulerNodePtr createUnifiedNode(const String & basename, const SchedulingSettings & settings = {})
{
return createUnifiedNode(basename, {}, settings);
}
UnifiedSchedulerNodePtr createUnifiedNode(const String & basename, const UnifiedSchedulerNodePtr & parent, const SchedulingSettings & settings = {})
{
auto node = std::make_shared<UnifiedSchedulerNode>(&event_queue, settings);
node->basename = basename;
if (parent)
{
parent->attachUnifiedChild(node);
}
else
{
EXPECT_TRUE(root_node.get() == nullptr);
root_node = node;
}
return node;
}
void enqueue(const UnifiedSchedulerNodePtr & node, const std::vector<ResourceCost> & costs)
{
enqueueImpl(node->getQueue().get(), costs, node->basename);
}
void enqueue(const String & path, const std::vector<ResourceCost> & costs)
{
ASSERT_TRUE(root_node.get() != nullptr); // root should be initialized first
@ -146,13 +202,14 @@ public:
pos = String::npos;
}
}
ISchedulerQueue * queue = dynamic_cast<ISchedulerQueue *>(node);
ASSERT_TRUE(queue != nullptr); // not a queue
enqueueImpl(dynamic_cast<ISchedulerQueue *>(node), costs);
}
void enqueueImpl(ISchedulerQueue * queue, const std::vector<ResourceCost> & costs, const String & name = {})
{
ASSERT_TRUE(queue != nullptr); // not a queue
for (ResourceCost cost : costs)
{
queue->enqueueRequest(new Request(cost, queue->basename));
}
queue->enqueueRequest(new Request(this, cost, name.empty() ? queue->basename : name));
processEvents(); // to activate queues
}
@ -208,6 +265,12 @@ public:
consumed_cost[name] -= value;
}
void failed(ResourceCost value)
{
EXPECT_EQ(failed_cost, value);
failed_cost -= value;
}
void processEvents()
{
while (event_queue.tryProcess()) {}
@ -217,6 +280,7 @@ private:
EventQueue event_queue;
SchedulerNodePtr root_node;
std::unordered_map<String, ResourceCost> consumed_cost;
ResourceCost failed_cost = 0;
};
template <class TManager>

View File

@ -13,6 +13,12 @@ public:
, log(log_)
{}
const String & getTypeName() const override
{
static String type_name("fake");
return type_name;
}
void attachChild(const SchedulerNodePtr & child) override
{
log += " +" + child->basename;

View File

@ -101,6 +101,11 @@ struct MyRequest : public ResourceRequest
if (on_execute)
on_execute();
}
void failed(const std::exception_ptr &) override
{
FAIL();
}
};
TEST(SchedulerRoot, Smoke)

View File

@ -0,0 +1,495 @@
#include <chrono>
#include <gtest/gtest.h>
#include <Common/Scheduler/ResourceGuard.h>
#include <Common/Scheduler/ResourceLink.h>
#include <Common/Scheduler/Nodes/tests/ResourceTest.h>
#include <Common/Priority.h>
#include <Common/Scheduler/Nodes/FairPolicy.h>
#include <Common/Scheduler/Nodes/UnifiedSchedulerNode.h>
using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(SchedulerUnifiedNode, Smoke)
{
ResourceTest t;
t.addCustom<UnifiedSchedulerNode>("/", SchedulingSettings{});
t.enqueue("/fifo", {10, 10});
t.dequeue(2);
t.consumed("fifo", 20);
}
TEST(SchedulerUnifiedNode, FairnessWeight)
{
ResourceTest t;
auto all = t.createUnifiedNode("all");
auto a = t.createUnifiedNode("A", all, {.weight = 1.0, .priority = Priority{}});
auto b = t.createUnifiedNode("B", all, {.weight = 3.0, .priority = Priority{}});
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(b, {10, 10, 10, 10, 10, 10, 10, 10});
t.dequeue(4);
t.consumed("A", 10);
t.consumed("B", 30);
t.dequeue(4);
t.consumed("A", 10);
t.consumed("B", 30);
t.dequeue();
t.consumed("A", 60);
t.consumed("B", 20);
}
TEST(SchedulerUnifiedNode, FairnessActivation)
{
ResourceTest t;
auto all = t.createUnifiedNode("all");
auto a = t.createUnifiedNode("A", all);
auto b = t.createUnifiedNode("B", all);
auto c = t.createUnifiedNode("C", all);
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(b, {10});
t.enqueue(c, {10, 10});
t.dequeue(3);
t.consumed("A", 10);
t.consumed("B", 10);
t.consumed("C", 10);
t.dequeue(4);
t.consumed("A", 30);
t.consumed("B", 0);
t.consumed("C", 10);
t.enqueue(b, {10, 10});
t.dequeue(1);
t.consumed("B", 10);
t.enqueue(c, {10, 10});
t.dequeue(1);
t.consumed("C", 10);
t.dequeue(2); // A B or B A
t.consumed("A", 10);
t.consumed("B", 10);
}
TEST(SchedulerUnifiedNode, FairnessMaxMin)
{
ResourceTest t;
auto all = t.createUnifiedNode("all");
auto a = t.createUnifiedNode("A", all);
auto b = t.createUnifiedNode("B", all);
t.enqueue(a, {10, 10}); // make sure A is never empty
for (int i = 0; i < 10; i++)
{
t.enqueue(a, {10, 10, 10, 10});
t.enqueue(b, {10, 10});
t.dequeue(6);
t.consumed("A", 40);
t.consumed("B", 20);
}
t.dequeue(2);
t.consumed("A", 20);
}
TEST(SchedulerUnifiedNode, FairnessHierarchical)
{
ResourceTest t;
auto all = t.createUnifiedNode("all");
auto x = t.createUnifiedNode("X", all);
auto y = t.createUnifiedNode("Y", all);
auto a = t.createUnifiedNode("A", x);
auto b = t.createUnifiedNode("B", x);
auto c = t.createUnifiedNode("C", y);
auto d = t.createUnifiedNode("D", y);
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(b, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(c, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(d, {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 20);
t.consumed("B", 20);
t.consumed("C", 20);
t.consumed("D", 20);
}
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(c, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(d, {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 40);
t.consumed("C", 20);
t.consumed("D", 20);
}
t.enqueue(b, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(b, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(c, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(d, {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("B", 40);
t.consumed("C", 20);
t.consumed("D", 20);
}
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(b, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(c, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(c, {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 20);
t.consumed("B", 20);
t.consumed("C", 40);
}
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(b, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(d, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(d, {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 20);
t.consumed("B", 20);
t.consumed("D", 40);
}
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(a, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(d, {10, 10, 10, 10, 10, 10, 10, 10});
t.enqueue(d, {10, 10, 10, 10, 10, 10, 10, 10});
for (int i = 0; i < 4; i++)
{
t.dequeue(8);
t.consumed("A", 40);
t.consumed("D", 40);
}
}
TEST(SchedulerUnifiedNode, Priority)
{
ResourceTest t;
auto all = t.createUnifiedNode("all");
auto a = t.createUnifiedNode("A", all, {.priority = Priority{3}});
auto b = t.createUnifiedNode("B", all, {.priority = Priority{2}});
auto c = t.createUnifiedNode("C", all, {.priority = Priority{1}});
t.enqueue(a, {10, 10, 10});
t.enqueue(b, {10, 10, 10});
t.enqueue(c, {10, 10, 10});
t.dequeue(2);
t.consumed("A", 0);
t.consumed("B", 0);
t.consumed("C", 20);
t.dequeue(2);
t.consumed("A", 0);
t.consumed("B", 10);
t.consumed("C", 10);
t.dequeue(2);
t.consumed("A", 0);
t.consumed("B", 20);
t.consumed("C", 0);
t.dequeue();
t.consumed("A", 30);
t.consumed("B", 0);
t.consumed("C", 0);
}
TEST(SchedulerUnifiedNode, PriorityActivation)
{
ResourceTest t;
auto all = t.createUnifiedNode("all");
auto a = t.createUnifiedNode("A", all, {.priority = Priority{3}});
auto b = t.createUnifiedNode("B", all, {.priority = Priority{2}});
auto c = t.createUnifiedNode("C", all, {.priority = Priority{1}});
t.enqueue(a, {10, 10, 10, 10, 10, 10});
t.enqueue(b, {10});
t.enqueue(c, {10, 10});
t.dequeue(3);
t.consumed("A", 0);
t.consumed("B", 10);
t.consumed("C", 20);
t.dequeue(2);
t.consumed("A", 20);
t.consumed("B", 0);
t.consumed("C", 0);
t.enqueue(b, {10, 10, 10});
t.dequeue(2);
t.consumed("A", 0);
t.consumed("B", 20);
t.consumed("C", 0);
t.enqueue(c, {10, 10});
t.dequeue(3);
t.consumed("A", 0);
t.consumed("B", 10);
t.consumed("C", 20);
t.dequeue(2);
t.consumed("A", 20);
t.consumed("B", 0);
t.consumed("C", 0);
}
TEST(SchedulerUnifiedNode, List)
{
ResourceTest t;
std::list<UnifiedSchedulerNodePtr> list;
list.push_back(t.createUnifiedNode("all"));
for (int length = 1; length < 5; length++)
{
String name = fmt::format("L{}", length);
list.push_back(t.createUnifiedNode(name, list.back()));
for (int i = 0; i < 3; i++)
{
t.enqueue(list.back(), {10, 10});
t.dequeue(1);
t.consumed(name, 10);
for (int j = 0; j < 3; j++)
{
t.enqueue(list.back(), {10, 10, 10});
t.dequeue(1);
t.consumed(name, 10);
t.dequeue(1);
t.consumed(name, 10);
t.dequeue(1);
t.consumed(name, 10);
}
t.dequeue(1);
t.consumed(name, 10);
}
}
}
TEST(SchedulerUnifiedNode, ThrottlerLeakyBucket)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
t.process(start, 0);
auto all = t.createUnifiedNode("all", {.priority = Priority{}, .max_speed = 10.0, .max_burst = 20.0});
t.enqueue(all, {10, 10, 10, 10, 10, 10, 10, 10});
t.process(start + std::chrono::seconds(0));
t.consumed("all", 30); // It is allowed to go below zero for exactly one resource request
t.process(start + std::chrono::seconds(1));
t.consumed("all", 10);
t.process(start + std::chrono::seconds(2));
t.consumed("all", 10);
t.process(start + std::chrono::seconds(3));
t.consumed("all", 10);
t.process(start + std::chrono::seconds(4));
t.consumed("all", 10);
t.process(start + std::chrono::seconds(100500));
t.consumed("all", 10);
}
TEST(SchedulerUnifiedNode, ThrottlerPacing)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
t.process(start, 0);
// Zero burst allows you to send one request of any `size` and than throttle for `size/max_speed` seconds.
// Useful if outgoing traffic should be "paced", i.e. have the least possible burstiness.
auto all = t.createUnifiedNode("all", {.priority = Priority{}, .max_speed = 1.0, .max_burst = 0.0});
t.enqueue(all, {1, 2, 3, 1, 2, 1});
int output[] = {1, 2, 0, 3, 0, 0, 1, 2, 0, 1, 0};
for (int i = 0; i < std::size(output); i++)
{
t.process(start + std::chrono::seconds(i));
t.consumed("all", output[i]);
}
}
TEST(SchedulerUnifiedNode, ThrottlerBucketFilling)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
t.process(start, 0);
auto all = t.createUnifiedNode("all", {.priority = Priority{}, .max_speed = 10.0, .max_burst = 100.0});
t.enqueue(all, {100});
t.process(start + std::chrono::seconds(0));
t.consumed("all", 100); // consume all tokens, but it is still active (not negative)
t.process(start + std::chrono::seconds(5));
t.consumed("all", 0); // There was nothing to consume
t.enqueue(all, {10, 10, 10, 10, 10, 10, 10, 10, 10, 10});
t.process(start + std::chrono::seconds(5));
t.consumed("all", 60); // 5 sec * 10 tokens/sec = 50 tokens + 1 extra request to go below zero
t.process(start + std::chrono::seconds(100));
t.consumed("all", 40); // Consume rest
t.process(start + std::chrono::seconds(200));
t.enqueue(all, {95, 1, 1, 1, 1, 1, 1, 1, 1, 1});
t.process(start + std::chrono::seconds(200));
t.consumed("all", 101); // check we cannot consume more than max_burst + 1 request
t.process(start + std::chrono::seconds(100500));
t.consumed("all", 3);
}
TEST(SchedulerUnifiedNode, ThrottlerAndFairness)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
t.process(start, 0);
auto all = t.createUnifiedNode("all", {.priority = Priority{}, .max_speed = 10.0, .max_burst = 100.0});
auto a = t.createUnifiedNode("A", all, {.weight = 10.0, .priority = Priority{}});
auto b = t.createUnifiedNode("B", all, {.weight = 90.0, .priority = Priority{}});
ResourceCost req_cost = 1;
ResourceCost total_cost = 2000;
for (int i = 0; i < total_cost / req_cost; i++)
{
t.enqueue(a, {req_cost});
t.enqueue(b, {req_cost});
}
double shareA = 0.1;
double shareB = 0.9;
// Bandwidth-latency coupling due to fairness: worst latency is inversely proportional to share
auto max_latencyA = static_cast<ResourceCost>(req_cost * (1.0 + 1.0 / shareA));
auto max_latencyB = static_cast<ResourceCost>(req_cost * (1.0 + 1.0 / shareB));
double consumedA = 0;
double consumedB = 0;
for (int seconds = 0; seconds < 100; seconds++)
{
t.process(start + std::chrono::seconds(seconds));
double arrival_curve = 100.0 + 10.0 * seconds + req_cost;
t.consumed("A", static_cast<ResourceCost>(arrival_curve * shareA - consumedA), max_latencyA);
t.consumed("B", static_cast<ResourceCost>(arrival_curve * shareB - consumedB), max_latencyB);
consumedA = arrival_curve * shareA;
consumedB = arrival_curve * shareB;
}
}
TEST(SchedulerUnifiedNode, QueueWithRequestsDestruction)
{
ResourceTest t;
auto all = t.createUnifiedNode("all");
t.enqueue(all, {10, 10}); // enqueue reqeuests to be canceled
// This will destory the queue and fail both requests
auto a = t.createUnifiedNode("A", all);
t.failed(20);
// Check that everything works fine after destruction
auto b = t.createUnifiedNode("B", all);
t.enqueue(a, {10, 10}); // make sure A is never empty
for (int i = 0; i < 10; i++)
{
t.enqueue(a, {10, 10, 10, 10});
t.enqueue(b, {10, 10});
t.dequeue(6);
t.consumed("A", 40);
t.consumed("B", 20);
}
t.dequeue(2);
t.consumed("A", 20);
}
TEST(SchedulerUnifiedNode, ResourceGuardException)
{
ResourceTest t;
auto all = t.createUnifiedNode("all");
t.enqueue(all, {10, 10}); // enqueue reqeuests to be canceled
std::thread consumer([queue = all->getQueue()]
{
ResourceLink link{.queue = queue.get()};
try
{
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), link);
FAIL();
}
catch (...)
{
}
});
// This will destory the queue and fail both requests
auto a = t.createUnifiedNode("A", all);
t.failed(20);
consumer.join();
// Check that everything works fine after destruction
auto b = t.createUnifiedNode("B", all);
t.enqueue(a, {10, 10}); // make sure A is never empty
for (int i = 0; i < 10; i++)
{
t.enqueue(a, {10, 10, 10, 10});
t.enqueue(b, {10, 10});
t.dequeue(6);
t.consumed("A", 40);
t.consumed("B", 20);
}
t.dequeue(2);
t.consumed("A", 20);
}

View File

@ -12,6 +12,7 @@
#include <Common/CurrentMetrics.h>
#include <condition_variable>
#include <exception>
#include <mutex>
@ -34,6 +35,11 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int RESOURCE_ACCESS_DENIED;
}
/*
* Scoped resource guard.
* Waits for resource to be available in constructor and releases resource in destructor
@ -109,12 +115,25 @@ public:
dequeued_cv.notify_one();
}
// This function is executed inside scheduler thread and wakes thread issued this `request`.
// That thread will throw an exception.
void failed(const std::exception_ptr & ptr) override
{
std::unique_lock lock(mutex);
chassert(state == Enqueued);
state = Dequeued;
exception = ptr;
dequeued_cv.notify_one();
}
void wait()
{
CurrentMetrics::Increment scheduled(metrics->scheduled_count);
auto timer = CurrentThread::getProfileEvents().timer(metrics->wait_microseconds);
std::unique_lock lock(mutex);
dequeued_cv.wait(lock, [this] { return state == Dequeued; });
if (exception)
throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "Resource request failed: {}", getExceptionMessage(exception, /* with_stacktrace = */ false));
}
void finish(ResourceCost real_cost_, ResourceLink link_)
@ -151,6 +170,7 @@ public:
std::mutex mutex;
std::condition_variable dequeued_cv;
RequestState state = Finished;
std::exception_ptr exception;
};
/// Creates pending request for resource; blocks while resource is not available (unless `Lock::Defer`)

View File

@ -1,55 +0,0 @@
#pragma once
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <Common/Scheduler/IResourceManager.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <mutex>
#include <unordered_map>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_SCHEDULER_NODE;
}
class ResourceManagerFactory : private boost::noncopyable
{
public:
static ResourceManagerFactory & instance()
{
static ResourceManagerFactory ret;
return ret;
}
ResourceManagerPtr get(const String & name)
{
std::lock_guard lock{mutex};
if (auto iter = methods.find(name); iter != methods.end())
return iter->second();
throw Exception(ErrorCodes::INVALID_SCHEDULER_NODE, "Unknown scheduler node type: {}", name);
}
template <class TDerived>
void registerMethod(const String & name)
{
std::lock_guard lock{mutex};
methods[name] = [] ()
{
return std::make_shared<TDerived>();
};
}
private:
std::mutex mutex;
using Method = std::function<ResourceManagerPtr()>;
std::unordered_map<String, Method> methods;
};
}

View File

@ -1,13 +1,42 @@
#include <Common/Scheduler/ResourceRequest.h>
#include <Common/Scheduler/ISchedulerConstraint.h>
#include <Common/Exception.h>
#include <ranges>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void ResourceRequest::finish()
{
if (constraint)
constraint->finishRequest(this);
// Iterate over constraints in reverse order
for (ISchedulerConstraint * constraint : std::ranges::reverse_view(constraints))
{
if (constraint)
constraint->finishRequest(this);
}
}
void ResourceRequest::addConstraint(ISchedulerConstraint * new_constraint)
{
for (auto & constraint : constraints)
{
if (!constraint)
{
constraint = new_constraint;
return;
}
}
// TODO(serxa): is it possible to validate it during enqueue of resource request to avoid LOGICAL_ERRORs in the scheduler thread? possible but will not cover case of moving queue with requests inside to invalid position
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Max number of simultaneous workload constraints exceeded ({}). Remove extra constraints before using this workload.",
ResourceMaxConstraints);
}
}

View File

@ -2,7 +2,9 @@
#include <boost/intrusive/list.hpp>
#include <base/types.h>
#include <array>
#include <limits>
#include <exception>
namespace DB
{
@ -15,6 +17,10 @@ class ISchedulerConstraint;
using ResourceCost = Int64;
constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
// TODO(serxa): validate hierarchy to avoid too many constrants
/// Max number of constraints for a request to pass though (depth of constaints chain)
constexpr size_t ResourceMaxConstraints = 8;
/*
* Request for a resource consumption. The main moving part of the scheduling subsystem.
* Resource requests processing workflow:
@ -49,9 +55,10 @@ public:
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
ResourceCost cost;
/// Scheduler node to be notified on consumption finish
/// Auto-filled during request enqueue/dequeue
ISchedulerConstraint * constraint;
/// Scheduler nodes to be notified on consumption finish
/// Auto-filled during request dequeue
/// Vector is not used to avoid allocations in the scheduler thread
std::array<ISchedulerConstraint *, ResourceMaxConstraints> constraints;
explicit ResourceRequest(ResourceCost cost_ = 1)
{
@ -62,7 +69,8 @@ public:
void reset(ResourceCost cost_)
{
cost = cost_;
constraint = nullptr;
for (auto & constraint : constraints)
constraint = nullptr;
// Note that list_base_hook should be reset independently (by intrusive list)
}
@ -74,11 +82,17 @@ public:
/// (e.g. setting an std::promise or creating a job in a thread pool)
virtual void execute() = 0;
/// Callback to trigger an error in case if resource is unavailable.
virtual void failed(const std::exception_ptr & ptr) = 0;
/// Stop resource consumption and notify resource scheduler.
/// Should be called when resource consumption is finished by consumer.
/// ResourceRequest should not be destructed or reset before calling to `finish()`.
/// WARNING: this function MUST not be called if request was canceled.
/// WARNING: this function MUST not be called if request was canceled or failed.
void finish();
/// Is called from the scheduler thread to fill `constraints` chain
void addConstraint(ISchedulerConstraint * new_constraint);
};
}

View File

@ -95,6 +95,12 @@ public:
}
}
const String & getTypeName() const override
{
static String type_name("scheduler");
return type_name;
}
bool equals(ISchedulerNode * other) override
{
if (!ISchedulerNode::equals(other))

View File

@ -0,0 +1,38 @@
#pragma once
#include <base/types.h>
#include <Common/Priority.h>
#include <limits>
namespace DB
{
struct SchedulingSettings
{
/// Priority and weight among siblings
double weight = 1.0;
Priority priority;
/// Throttling constraints.
/// Up to 2 independent throttlers: one for average speed and one for peek speed.
static constexpr double default_burst_seconds = 1.0;
double max_speed = 0; // Zero means unlimited
double max_burst = 0; // default is `default_burst_seconds * max_speed`
/// Limits total number of concurrent resource requests that are allowed to consume
static constexpr Int64 default_max_requests = std::numeric_limits<Int64>::max();
Int64 max_requests = default_max_requests;
/// Limits total cost of concurrent resource requests that are allowed to consume
static constexpr Int64 default_max_cost = std::numeric_limits<Int64>::max();
Int64 max_cost = default_max_cost;
bool hasThrottler() const { return max_speed != 0; }
bool hasSemaphore() const { return max_requests != default_max_requests || max_cost != default_max_cost; }
// TODO(serxa): add helper functions for parsing, printing and validating
};
}

View File

@ -0,0 +1,93 @@
#pragma once
#include <base/types.h>
#include <base/scope_guard.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class IAST;
struct Settings;
enum class WorkloadEntityType : uint8_t
{
Workload,
Resource,
MAX
};
/// Interface for a storage of workload entities (WORKLOAD and RESOURCE).
class IWorkloadEntityStorage
{
public:
virtual ~IWorkloadEntityStorage() = default;
/// Whether this storage can replicate entities to another node.
virtual bool isReplicated() const { return false; }
virtual String getReplicationID() const { return ""; }
/// Loads all entities. Can be called once - if entities are already loaded the function does nothing.
virtual void loadEntities() = 0;
/// Get entity by name. If no entity stored with entity_name throws exception.
virtual ASTPtr get(const String & entity_name) const = 0;
/// Get entity by name. If no entity stored with entity_name return nullptr.
virtual ASTPtr tryGet(const String & entity_name) const = 0;
/// Check if entity with entity_name is stored.
virtual bool has(const String & entity_name) const = 0;
/// Get all entity names.
virtual std::vector<String> getAllEntityNames() const = 0;
/// Get all entity names of specified type.
virtual std::vector<String> getAllEntityNames(WorkloadEntityType entity_type) const = 0;
/// Get all entities.
virtual std::vector<std::pair<String, ASTPtr>> getAllEntities() const = 0;
/// Check whether any entity have been stored.
virtual bool empty() const = 0;
/// Stops watching.
virtual void stopWatching() {}
/// Immediately reloads all entities, throws an exception if failed.
virtual void reloadEntities() = 0;
/// Stores an entity.
virtual bool storeEntity(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
ASTPtr create_entity_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) = 0;
/// Removes an entity.
virtual bool removeEntity(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
bool throw_if_not_exists) = 0;
using OnChangedHandler = std::function<void(
WorkloadEntityType /* entity_type */,
const String & /* entity_name */,
const ASTPtr & /* new or changed entity, null if removed */)>;
/// Subscribes for all changes.
virtual scope_guard subscribeForChanges(
WorkloadEntityType entity_type,
const OnChangedHandler & handler) = 0;
};
}

View File

@ -0,0 +1,299 @@
#include <Common/Scheduler/Workload/WorkloadEntityDiskStorage.h>
#include <Common/StringUtils.h>
#include <Common/atomicRename.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <Core/Settings.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ParserCreateWorkloadQuery.h>
#include <Parsers/ParserCreateResourceQuery.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Logger.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int DIRECTORY_DOESNT_EXIST;
extern const int WORKLOAD_ENTITY_ALREADY_EXISTS;
extern const int UNKNOWN_WORKLOAD_ENTITY;
}
namespace
{
/// Converts a path to an absolute path and append it with a separator.
String makeDirectoryPathCanonical(const String & directory_path)
{
auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path);
if (canonical_directory_path.has_filename())
canonical_directory_path += std::filesystem::path::preferred_separator;
return canonical_directory_path;
}
}
WorkloadEntityDiskStorage::WorkloadEntityDiskStorage(const ContextPtr & global_context_, const String & dir_path_)
: WorkloadEntityStorageBase(global_context_)
, dir_path{makeDirectoryPathCanonical(dir_path_)}
, log{getLogger("WorkloadEntityDiskStorage")}
{
}
ASTPtr WorkloadEntityDiskStorage::tryLoadEntity(WorkloadEntityType entity_type, const String & entity_name)
{
return tryLoadEntity(entity_type, entity_name, getFilePath(entity_type, entity_name), /* check_file_exists= */ true);
}
ASTPtr WorkloadEntityDiskStorage::tryLoadEntity(WorkloadEntityType entity_type, const String & entity_name, const String & path, bool check_file_exists)
{
LOG_DEBUG(log, "Loading workload entity {} from file {}", backQuote(entity_name), path);
try
{
if (check_file_exists && !fs::exists(path))
return nullptr;
/// There is .sql file with workload entity creation statement.
ReadBufferFromFile in(path);
String entity_create_query;
readStringUntilEOF(entity_create_query, in);
switch (entity_type)
{
case WorkloadEntityType::Workload:
{
ParserCreateWorkloadQuery parser;
ASTPtr ast = parseQuery(
parser,
entity_create_query.data(),
entity_create_query.data() + entity_create_query.size(),
"",
0,
global_context->getSettingsRef().max_parser_depth,
global_context->getSettingsRef().max_parser_backtracks);
return ast;
}
case WorkloadEntityType::Resource:
{
ParserCreateResourceQuery parser;
ASTPtr ast = parseQuery(
parser,
entity_create_query.data(),
entity_create_query.data() + entity_create_query.size(),
"",
0,
global_context->getSettingsRef().max_parser_depth,
global_context->getSettingsRef().max_parser_backtracks);
return ast;
}
case WorkloadEntityType::MAX: return nullptr;
}
}
catch (...)
{
tryLogCurrentException(log, fmt::format("while loading workload entity {} from path {}", backQuote(entity_name), path));
return nullptr; /// Failed to load this entity, will ignore it
}
}
void WorkloadEntityDiskStorage::loadEntities()
{
if (!entities_loaded)
loadEntitiesImpl();
}
void WorkloadEntityDiskStorage::reloadEntities()
{
// TODO(serxa): it does not send notifications, maybe better to remove this method completely
loadEntitiesImpl();
}
void WorkloadEntityDiskStorage::loadEntitiesImpl()
{
LOG_INFO(log, "Loading workload entities from {}", dir_path);
if (!std::filesystem::exists(dir_path))
{
LOG_DEBUG(log, "The directory for workload entities ({}) does not exist: nothing to load", dir_path);
return;
}
std::vector<std::pair<String, ASTPtr>> entities_name_and_queries;
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
{
if (it->isDirectory())
continue;
const String & file_name = it.name();
if (startsWith(file_name, "workload_") && endsWith(file_name, ".sql"))
{
size_t prefix_length = strlen("workload_");
size_t suffix_length = strlen(".sql");
String name = unescapeForFileName(file_name.substr(prefix_length, file_name.length() - prefix_length - suffix_length));
if (name.empty())
continue;
ASTPtr ast = tryLoadEntity(WorkloadEntityType::Workload, name, dir_path + it.name(), /* check_file_exists= */ false);
if (ast)
entities_name_and_queries.emplace_back(name, ast);
}
if (startsWith(file_name, "resource_") && endsWith(file_name, ".sql"))
{
size_t prefix_length = strlen("resource_");
size_t suffix_length = strlen(".sql");
String name = unescapeForFileName(file_name.substr(prefix_length, file_name.length() - prefix_length - suffix_length));
if (name.empty())
continue;
ASTPtr ast = tryLoadEntity(WorkloadEntityType::Resource, name, dir_path + it.name(), /* check_file_exists= */ false);
if (ast)
entities_name_and_queries.emplace_back(name, ast);
}
}
setAllEntities(entities_name_and_queries);
entities_loaded = true;
LOG_DEBUG(log, "Workload entities loaded");
}
void WorkloadEntityDiskStorage::createDirectory()
{
std::error_code create_dir_error_code;
fs::create_directories(dir_path, create_dir_error_code);
if (!fs::exists(dir_path) || !fs::is_directory(dir_path) || create_dir_error_code)
throw Exception(ErrorCodes::DIRECTORY_DOESNT_EXIST, "Couldn't create directory {} reason: '{}'",
dir_path, create_dir_error_code.message());
}
bool WorkloadEntityDiskStorage::storeEntityImpl(
const ContextPtr & /*current_context*/,
WorkloadEntityType entity_type,
const String & entity_name,
ASTPtr create_entity_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings)
{
createDirectory();
String file_path = getFilePath(entity_type, entity_name);
LOG_DEBUG(log, "Storing workload entity {} to file {}", backQuote(entity_name), file_path);
if (fs::exists(file_path))
{
if (throw_if_exists)
throw Exception(ErrorCodes::WORKLOAD_ENTITY_ALREADY_EXISTS, "Workload entity '{}' already exists", entity_name);
else if (!replace_if_exists)
return false;
}
WriteBufferFromOwnString create_statement_buf;
formatAST(*create_entity_query, create_statement_buf, false);
writeChar('\n', create_statement_buf);
String create_statement = create_statement_buf.str();
String temp_file_path = file_path + ".tmp";
try
{
WriteBufferFromFile out(temp_file_path, create_statement.size());
writeString(create_statement, out);
out.next();
if (settings.fsync_metadata)
out.sync();
out.close();
if (replace_if_exists)
fs::rename(temp_file_path, file_path);
else
renameNoReplace(temp_file_path, file_path);
}
catch (...)
{
fs::remove(temp_file_path);
throw;
}
LOG_TRACE(log, "Entity {} stored", backQuote(entity_name));
return true;
}
bool WorkloadEntityDiskStorage::removeEntityImpl(
const ContextPtr & /*current_context*/,
WorkloadEntityType entity_type,
const String & entity_name,
bool throw_if_not_exists)
{
String file_path = getFilePath(entity_type, entity_name);
LOG_DEBUG(log, "Removing workload entity {} stored in file {}", backQuote(entity_name), file_path);
bool existed = fs::remove(file_path);
if (!existed)
{
if (throw_if_not_exists)
throw Exception(ErrorCodes::UNKNOWN_WORKLOAD_ENTITY, "Workload entity '{}' doesn't exist", entity_name);
else
return false;
}
LOG_TRACE(log, "Entity {} removed", backQuote(entity_name));
return true;
}
String WorkloadEntityDiskStorage::getFilePath(WorkloadEntityType entity_type, const String & entity_name) const
{
String file_path;
switch (entity_type)
{
case WorkloadEntityType::Workload:
{
file_path = dir_path + "workload_" + escapeForFileName(entity_name) + ".sql";
break;
}
case WorkloadEntityType::Resource:
{
file_path = dir_path + "resource_" + escapeForFileName(entity_name) + ".sql";
break;
}
case WorkloadEntityType::MAX: break;
}
return file_path;
}
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <Common/Scheduler/Workload/WorkloadEntityStorageBase.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
/// Loads workload entities from a specified folder.
class WorkloadEntityDiskStorage : public WorkloadEntityStorageBase
{
public:
WorkloadEntityDiskStorage(const ContextPtr & global_context_, const String & dir_path_);
void loadEntities() override;
void reloadEntities() override;
private:
bool storeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
ASTPtr create_entity_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
bool removeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
bool throw_if_not_exists) override;
void createDirectory();
void loadEntitiesImpl();
ASTPtr tryLoadEntity(WorkloadEntityType entity_type, const String & entity_name);
ASTPtr tryLoadEntity(WorkloadEntityType entity_type, const String & entity_name, const String & file_path, bool check_file_exists);
String getFilePath(WorkloadEntityType entity_type, const String & entity_name) const;
String dir_path;
LoggerPtr log;
std::atomic<bool> entities_loaded = false;
};
}

View File

@ -0,0 +1,315 @@
#include <Common/Scheduler/Workload/WorkloadEntityStorageBase.h>
#include <boost/container/flat_set.hpp>
#include <boost/range/algorithm/copy.hpp>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTCreateResourceQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int WORKLOAD_ENTITY_ALREADY_EXISTS;
extern const int UNKNOWN_WORKLOAD_ENTITY;
}
namespace
{
ASTPtr normalizeCreateWorkloadEntityQuery(const IAST & create_query, const ContextPtr & context)
{
UNUSED(context);
auto ptr = create_query.clone();
if (auto * res = typeid_cast<ASTCreateWorkloadQuery *>(ptr.get()))
{
res->if_not_exists = false;
res->or_replace = false;
}
if (auto * res = typeid_cast<ASTCreateResourceQuery *>(ptr.get()))
{
res->if_not_exists = false;
res->or_replace = false;
}
return ptr;
}
WorkloadEntityType getEntityType(const ASTPtr & ptr)
{
if (auto * res = typeid_cast<ASTCreateWorkloadQuery *>(ptr.get()))
return WorkloadEntityType::Workload;
if (auto * res = typeid_cast<ASTCreateResourceQuery *>(ptr.get()))
return WorkloadEntityType::Resource;
chassert(false);
return WorkloadEntityType::MAX;
}
}
WorkloadEntityStorageBase::WorkloadEntityStorageBase(ContextPtr global_context_)
: global_context(std::move(global_context_))
{}
ASTPtr WorkloadEntityStorageBase::get(const String & entity_name) const
{
std::lock_guard lock(mutex);
auto it = entities.find(entity_name);
if (it == entities.end())
throw Exception(ErrorCodes::UNKNOWN_WORKLOAD_ENTITY,
"The workload entity name '{}' is not saved",
entity_name);
return it->second;
}
ASTPtr WorkloadEntityStorageBase::tryGet(const std::string & entity_name) const
{
std::lock_guard lock(mutex);
auto it = entities.find(entity_name);
if (it == entities.end())
return nullptr;
return it->second;
}
bool WorkloadEntityStorageBase::has(const String & entity_name) const
{
return tryGet(entity_name) != nullptr;
}
std::vector<std::string> WorkloadEntityStorageBase::getAllEntityNames() const
{
std::vector<std::string> entity_names;
std::lock_guard lock(mutex);
entity_names.reserve(entities.size());
for (const auto & [name, _] : entities)
entity_names.emplace_back(name);
return entity_names;
}
std::vector<std::string> WorkloadEntityStorageBase::getAllEntityNames(WorkloadEntityType entity_type) const
{
std::vector<std::string> entity_names;
std::lock_guard lock(mutex);
for (const auto & [name, entity] : entities)
{
if (getEntityType(entity) == entity_type)
entity_names.emplace_back(name);
}
return entity_names;
}
bool WorkloadEntityStorageBase::empty() const
{
std::lock_guard lock(mutex);
return entities.empty();
}
bool WorkloadEntityStorageBase::storeEntity(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
ASTPtr create_entity_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings)
{
std::lock_guard lock{mutex};
create_entity_query = normalizeCreateWorkloadEntityQuery(*create_entity_query, global_context);
auto it = entities.find(entity_name);
if (it != entities.end())
{
if (throw_if_exists)
throw Exception(ErrorCodes::WORKLOAD_ENTITY_ALREADY_EXISTS, "Workload entity '{}' already exists", entity_name);
else if (!replace_if_exists)
return false;
}
bool stored = storeEntityImpl(
current_context,
entity_type,
entity_name,
create_entity_query,
throw_if_exists,
replace_if_exists,
settings);
if (stored)
{
entities[entity_name] = create_entity_query;
onEntityAdded(entity_type, entity_name, create_entity_query);
}
sendNotifications();
return stored;
}
bool WorkloadEntityStorageBase::removeEntity(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
bool throw_if_not_exists)
{
std::lock_guard lock(mutex);
auto it = entities.find(entity_name);
if (it == entities.end())
{
if (throw_if_not_exists)
throw Exception(ErrorCodes::UNKNOWN_WORKLOAD_ENTITY, "Workload entity '{}' doesn't exist", entity_name);
else
return false;
}
bool removed = removeEntityImpl(
current_context,
entity_type,
entity_name,
throw_if_not_exists);
if (removed)
{
entities.erase(entity_name);
onEntityRemoved(entity_type, entity_name);
}
sendNotifications();
return removed;
}
scope_guard WorkloadEntityStorageBase::subscribeForChanges(
WorkloadEntityType entity_type,
const OnChangedHandler & handler)
{
std::lock_guard lock{handlers->mutex};
auto & list = handlers->by_type[static_cast<size_t>(entity_type)];
list.push_back(handler);
auto handler_it = std::prev(list.end());
return [my_handlers = handlers, entity_type, handler_it]
{
std::lock_guard lock2{my_handlers->mutex};
auto & list2 = my_handlers->by_type[static_cast<size_t>(entity_type)];
list2.erase(handler_it);
};
}
void WorkloadEntityStorageBase::onEntityAdded(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & new_entity)
{
std::lock_guard lock{queue_mutex};
Event event;
event.name = entity_name;
event.type = entity_type;
event.entity = new_entity;
queue.push(std::move(event));
}
void WorkloadEntityStorageBase::onEntityUpdated(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & changed_entity)
{
std::lock_guard lock{queue_mutex};
Event event;
event.name = entity_name;
event.type = entity_type;
event.entity = changed_entity;
queue.push(std::move(event));
}
void WorkloadEntityStorageBase::onEntityRemoved(WorkloadEntityType entity_type, const String & entity_name)
{
std::lock_guard lock{queue_mutex};
Event event;
event.name = entity_name;
event.type = entity_type;
queue.push(std::move(event));
}
void WorkloadEntityStorageBase::sendNotifications()
{
/// Only one thread can send notification at any time.
std::lock_guard sending_notifications_lock{sending_notifications};
std::unique_lock queue_lock{queue_mutex};
while (!queue.empty())
{
auto event = std::move(queue.front());
queue.pop();
queue_lock.unlock();
std::vector<OnChangedHandler> current_handlers;
{
std::lock_guard handlers_lock{handlers->mutex};
boost::range::copy(handlers->by_type[static_cast<size_t>(event.type)], std::back_inserter(current_handlers));
}
for (const auto & handler : current_handlers)
{
try
{
handler(event.type, event.name, event.entity);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
queue_lock.lock();
}
}
std::unique_lock<std::recursive_mutex> WorkloadEntityStorageBase::getLock() const
{
return std::unique_lock{mutex};
}
void WorkloadEntityStorageBase::setAllEntities(const std::vector<std::pair<String, ASTPtr>> & new_entities)
{
std::unordered_map<String, ASTPtr> normalized_entities;
for (const auto & [entity_name, create_query] : new_entities)
normalized_entities[entity_name] = normalizeCreateWorkloadEntityQuery(*create_query, global_context);
// TODO(serxa): do validation and throw LOGICAL_ERROR if failed
// Note that notifications are not sent, because it is hard to send notifications in right order to maintain invariants.
// Another code path using getAllEntities() should be used for initialization
std::lock_guard lock(mutex);
entities = std::move(normalized_entities);
}
std::vector<std::pair<String, ASTPtr>> WorkloadEntityStorageBase::getAllEntities() const
{
std::lock_guard lock{mutex};
std::vector<std::pair<String, ASTPtr>> all_entities;
all_entities.reserve(entities.size());
std::copy(entities.begin(), entities.end(), std::back_inserter(all_entities));
return all_entities;
}
// TODO(serxa): add notifications or remove this function
void WorkloadEntityStorageBase::removeAllEntitiesExcept(const Strings & entity_names_to_keep)
{
boost::container::flat_set<std::string_view> names_set_to_keep{entity_names_to_keep.begin(), entity_names_to_keep.end()};
std::lock_guard lock(mutex);
for (auto it = entities.begin(); it != entities.end();)
{
auto current = it++;
if (!names_set_to_keep.contains(current->first))
entities.erase(current);
}
}
}

View File

@ -0,0 +1,109 @@
#pragma once
#include <unordered_map>
#include <list>
#include <mutex>
#include <queue>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST.h>
namespace DB
{
class WorkloadEntityStorageBase : public IWorkloadEntityStorage
{
public:
explicit WorkloadEntityStorageBase(ContextPtr global_context_);
ASTPtr get(const String & entity_name) const override;
ASTPtr tryGet(const String & entity_name) const override;
bool has(const String & entity_name) const override;
std::vector<String> getAllEntityNames() const override;
std::vector<String> getAllEntityNames(WorkloadEntityType entity_type) const override;
std::vector<std::pair<String, ASTPtr>> getAllEntities() const override;
bool empty() const override;
bool storeEntity(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
ASTPtr create_entity_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
bool removeEntity(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
bool throw_if_not_exists) override;
virtual scope_guard subscribeForChanges(
WorkloadEntityType entity_type,
const OnChangedHandler & handler) override;
protected:
virtual bool storeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
ASTPtr create_entity_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) = 0;
virtual bool removeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
bool throw_if_not_exists) = 0;
std::unique_lock<std::recursive_mutex> getLock() const;
void setAllEntities(const std::vector<std::pair<String, ASTPtr>> & new_entities);
void removeAllEntitiesExcept(const Strings & entity_names_to_keep);
/// Called by derived class after a new workload entity has been added.
void onEntityAdded(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & new_entity);
/// Called by derived class after an workload entity has been changed.
void onEntityUpdated(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & changed_entity);
/// Called by derived class after an workload entity has been removed.
void onEntityRemoved(WorkloadEntityType entity_type, const String & entity_name);
/// Sends notifications to subscribers about changes in workload entities
/// (added with previous calls onEntityAdded(), onEntityUpdated(), onEntityRemoved()).
void sendNotifications();
struct Handlers
{
std::mutex mutex;
std::list<OnChangedHandler> by_type[static_cast<size_t>(WorkloadEntityType::MAX)];
};
/// shared_ptr is here for safety because WorkloadEntityStorageBase can be destroyed before all subscriptions are removed.
std::shared_ptr<Handlers> handlers;
struct Event
{
WorkloadEntityType type;
String name;
ASTPtr entity;
};
std::queue<Event> queue;
std::mutex queue_mutex;
std::mutex sending_notifications;
mutable std::recursive_mutex mutex;
std::unordered_map<String, ASTPtr> entities; // Maps entity name into CREATE entity query
ContextPtr global_context;
};
}

View File

@ -0,0 +1,48 @@
#include <Common/Scheduler/Workload/createWorkloadEntityStorage.h>
#include <Common/Scheduler/Workload/WorkloadEntityDiskStorage.h>
#include <Common/Scheduler/Workload/WorkloadEntityKeeperStorage.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
#include <memory>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
}
std::unique_ptr<IWorkloadEntityStorage> createWorkloadEntityStorage(const ContextMutablePtr & global_context)
{
const String zookeeper_path_key = "workload_zookeeper_path";
const String disk_path_key = "workload_path";
const auto & config = global_context->getConfigRef();
if (config.has(zookeeper_path_key))
{
if (config.has(disk_path_key))
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"'{}' and '{}' must not be both specified in the config",
zookeeper_path_key,
disk_path_key);
}
abort(); // TODO(serxa): crate WorkloadEntityKeeperStorage object
//return std::make_unique<WorkloadEntityKeeperStorage>(global_context, config.getString(zookeeper_path_key));
}
else
{
String default_path = fs::path{global_context->getPath()} / "workload" / "";
String path = config.getString(disk_path_key, default_path);
return std::make_unique<WorkloadEntityDiskStorage>(global_context, path);
}
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
namespace DB
{
std::unique_ptr<IWorkloadEntityStorage> createWorkloadEntityStorage(const ContextMutablePtr & global_context);
}

View File

@ -0,0 +1,17 @@
#include <Common/Scheduler/createResourceManager.h>
#include <Common/Scheduler/Nodes/DynamicResourceManager.h>
#include <Common/Scheduler/Nodes/IOResourceManager.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
ResourceManagerPtr createResourceManager(const ContextMutablePtr & global_context)
{
// TODO(serxa): combine DynamicResourceManager and IOResourceManaged to work together, because now old ResourceManager is disabled
// const auto & config = global_context->getConfigRef();
return std::make_shared<IOResourceManager>(global_context->getWorkloadEntityStorage());
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Common/Scheduler/IResourceManager.h>
namespace DB
{
ResourceManagerPtr createResourceManager(const ContextMutablePtr & global_context);
}

View File

@ -66,7 +66,6 @@
#include <Access/SettingsConstraintsAndProfileIDs.h>
#include <Access/ExternalAuthenticators.h>
#include <Access/GSSAcceptor.h>
#include <Common/Scheduler/ResourceManagerFactory.h>
#include <Backups/BackupsWorker.h>
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
@ -91,6 +90,8 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/Scheduler/createResourceManager.h>
#include <Common/Scheduler/Workload/createWorkloadEntityStorage.h>
#include <Common/StackTrace.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/Config/ConfigProcessor.h>
@ -274,6 +275,9 @@ struct ContextSharedPart : boost::noncopyable
mutable OnceFlag user_defined_sql_objects_storage_initialized;
mutable std::unique_ptr<IUserDefinedSQLObjectsStorage> user_defined_sql_objects_storage;
mutable OnceFlag workload_entity_storage_initialized;
mutable std::unique_ptr<IWorkloadEntityStorage> workload_entity_storage;
#if USE_NLP
mutable OnceFlag synonyms_extensions_initialized;
mutable std::optional<SynonymsExtensions> synonyms_extensions;
@ -615,6 +619,7 @@ struct ContextSharedPart : boost::noncopyable
SHUTDOWN(log, "dictionaries loader", external_dictionaries_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "UDFs loader", external_user_defined_executable_functions_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "another UDFs storage", user_defined_sql_objects_storage, stopWatching());
SHUTDOWN(log, "workload entity storage", workload_entity_storage, stopWatching());
LOG_TRACE(log, "Shutting down named sessions");
Session::shutdownNamedSessions();
@ -646,6 +651,7 @@ struct ContextSharedPart : boost::noncopyable
std::unique_ptr<ExternalDictionariesLoader> delete_external_dictionaries_loader;
std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> delete_external_user_defined_executable_functions_loader;
std::unique_ptr<IUserDefinedSQLObjectsStorage> delete_user_defined_sql_objects_storage;
std::unique_ptr<IWorkloadEntityStorage> delete_workload_entity_storage;
std::unique_ptr<BackgroundSchedulePool> delete_buffer_flush_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_distributed_schedule_pool;
@ -730,6 +736,7 @@ struct ContextSharedPart : boost::noncopyable
delete_external_dictionaries_loader = std::move(external_dictionaries_loader);
delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader);
delete_user_defined_sql_objects_storage = std::move(user_defined_sql_objects_storage);
delete_workload_entity_storage = std::move(workload_entity_storage);
delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool);
delete_schedule_pool = std::move(schedule_pool);
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
@ -748,6 +755,7 @@ struct ContextSharedPart : boost::noncopyable
delete_external_dictionaries_loader.reset();
delete_external_user_defined_executable_functions_loader.reset();
delete_user_defined_sql_objects_storage.reset();
delete_workload_entity_storage.reset();
delete_ddl_worker.reset();
delete_buffer_flush_schedule_pool.reset();
delete_schedule_pool.reset();
@ -1674,7 +1682,7 @@ std::vector<UUID> Context::getEnabledProfiles() const
ResourceManagerPtr Context::getResourceManager() const
{
callOnce(shared->resource_manager_initialized, [&] {
shared->resource_manager = ResourceManagerFactory::instance().get(getConfigRef().getString("resource_manager", "dynamic"));
shared->resource_manager = createResourceManager(getGlobalContext());
});
return shared->resource_manager;
@ -2909,6 +2917,32 @@ void Context::setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObj
shared->user_defined_sql_objects_storage = std::move(storage);
}
const IWorkloadEntityStorage & Context::getWorkloadEntityStorage() const
{
callOnce(shared->workload_entity_storage_initialized, [&] {
shared->workload_entity_storage = createWorkloadEntityStorage(getGlobalContext());
});
SharedLockGuard lock(shared->mutex);
return *shared->workload_entity_storage;
}
IWorkloadEntityStorage & Context::getWorkloadEntityStorage()
{
callOnce(shared->workload_entity_storage_initialized, [&] {
shared->workload_entity_storage = createWorkloadEntityStorage(getGlobalContext());
});
std::lock_guard lock(shared->mutex);
return *shared->workload_entity_storage;
}
void Context::setWorkloadEntityStorage(std::unique_ptr<IWorkloadEntityStorage> storage)
{
std::lock_guard lock(shared->mutex);
shared->workload_entity_storage = std::move(storage);
}
#if USE_NLP
SynonymsExtensions & Context::getSynonymsExtensions() const

View File

@ -70,6 +70,7 @@ class EmbeddedDictionaries;
class ExternalDictionariesLoader;
class ExternalUserDefinedExecutableFunctionsLoader;
class IUserDefinedSQLObjectsStorage;
class IWorkloadEntityStorage;
class InterserverCredentials;
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
@ -880,6 +881,10 @@ public:
void setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage);
void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config);
const IWorkloadEntityStorage & getWorkloadEntityStorage() const;
IWorkloadEntityStorage & getWorkloadEntityStorage();
void setWorkloadEntityStorage(std::unique_ptr<IWorkloadEntityStorage> storage);
#if USE_NLP
SynonymsExtensions & getSynonymsExtensions() const;
Lemmatizers & getLemmatizers() const;

View File

@ -0,0 +1,68 @@
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterCreateResourceQuery.h>
#include <Access/ContextAccess.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTCreateResourceQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}
BlockIO InterpreterCreateResourceQuery::execute()
{
ASTCreateResourceQuery & create_resource_query = query_ptr->as<ASTCreateResourceQuery &>();
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::CREATE_RESOURCE);
if (create_resource_query.or_replace)
access_rights_elements.emplace_back(AccessType::DROP_RESOURCE);
auto current_context = getContext();
if (!create_resource_query.cluster.empty())
{
if (current_context->getWorkloadEntityStorage().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because workload entities are replicated automatically");
DDLQueryOnClusterParams params;
params.access_to_check = std::move(access_rights_elements);
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
current_context->checkAccess(access_rights_elements);
auto resource_name = create_resource_query.getResourceName();
bool throw_if_exists = !create_resource_query.if_not_exists && !create_resource_query.or_replace;
bool replace_if_exists = create_resource_query.or_replace;
current_context->getWorkloadEntityStorage().storeEntity(
current_context,
WorkloadEntityType::Resource,
resource_name,
query_ptr,
throw_if_exists,
replace_if_exists,
current_context->getSettingsRef());
return {};
}
void registerInterpreterCreateResourceQuery(InterpreterFactory & factory)
{
auto create_fn = [] (const InterpreterFactory::Arguments & args)
{
return std::make_unique<InterpreterCreateResourceQuery>(args.query, args.context);
};
factory.registerInterpreter("InterpreterCreateResourceQuery", create_fn);
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterCreateResourceQuery : public IInterpreter, WithMutableContext
{
public:
InterpreterCreateResourceQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_)
: WithMutableContext(context_), query_ptr(query_ptr_)
{
}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -0,0 +1,68 @@
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterCreateWorkloadQuery.h>
#include <Access/ContextAccess.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}
BlockIO InterpreterCreateWorkloadQuery::execute()
{
ASTCreateWorkloadQuery & create_workload_query = query_ptr->as<ASTCreateWorkloadQuery &>();
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::CREATE_WORKLOAD);
if (create_workload_query.or_replace)
access_rights_elements.emplace_back(AccessType::DROP_WORKLOAD);
auto current_context = getContext();
if (!create_workload_query.cluster.empty())
{
if (current_context->getWorkloadEntityStorage().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because workload entities are replicated automatically");
DDLQueryOnClusterParams params;
params.access_to_check = std::move(access_rights_elements);
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
current_context->checkAccess(access_rights_elements);
auto workload_name = create_workload_query.getWorkloadName();
bool throw_if_exists = !create_workload_query.if_not_exists && !create_workload_query.or_replace;
bool replace_if_exists = create_workload_query.or_replace;
current_context->getWorkloadEntityStorage().storeEntity(
current_context,
WorkloadEntityType::Workload,
workload_name,
query_ptr,
throw_if_exists,
replace_if_exists,
current_context->getSettingsRef());
return {};
}
void registerInterpreterCreateWorkloadQuery(InterpreterFactory & factory)
{
auto create_fn = [] (const InterpreterFactory::Arguments & args)
{
return std::make_unique<InterpreterCreateWorkloadQuery>(args.query, args.context);
};
factory.registerInterpreter("InterpreterCreateWorkloadQuery", create_fn);
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterCreateWorkloadQuery : public IInterpreter, WithMutableContext
{
public:
InterpreterCreateWorkloadQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_)
: WithMutableContext(context_), query_ptr(query_ptr_)
{
}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -0,0 +1,60 @@
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterDropResourceQuery.h>
#include <Access/ContextAccess.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTDropResourceQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}
BlockIO InterpreterDropResourceQuery::execute()
{
ASTDropResourceQuery & drop_resource_query = query_ptr->as<ASTDropResourceQuery &>();
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::DROP_RESOURCE);
auto current_context = getContext();
if (!drop_resource_query.cluster.empty())
{
if (current_context->getWorkloadEntityStorage().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because workload entities are replicated automatically");
DDLQueryOnClusterParams params;
params.access_to_check = std::move(access_rights_elements);
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
current_context->checkAccess(access_rights_elements);
bool throw_if_not_exists = !drop_resource_query.if_exists;
current_context->getWorkloadEntityStorage().removeEntity(
current_context,
WorkloadEntityType::Resource,
drop_resource_query.resource_name,
throw_if_not_exists);
return {};
}
void registerInterpreterDropResourceQuery(InterpreterFactory & factory)
{
auto create_fn = [] (const InterpreterFactory::Arguments & args)
{
return std::make_unique<InterpreterDropResourceQuery>(args.query, args.context);
};
factory.registerInterpreter("InterpreterDropResourceQuery", create_fn);
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterDropResourceQuery : public IInterpreter, WithMutableContext
{
public:
InterpreterDropResourceQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_) : WithMutableContext(context_), query_ptr(query_ptr_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -0,0 +1,60 @@
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterDropWorkloadQuery.h>
#include <Access/ContextAccess.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTDropWorkloadQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}
BlockIO InterpreterDropWorkloadQuery::execute()
{
ASTDropWorkloadQuery & drop_workload_query = query_ptr->as<ASTDropWorkloadQuery &>();
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::DROP_WORKLOAD);
auto current_context = getContext();
if (!drop_workload_query.cluster.empty())
{
if (current_context->getWorkloadEntityStorage().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because workload entities are replicated automatically");
DDLQueryOnClusterParams params;
params.access_to_check = std::move(access_rights_elements);
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
current_context->checkAccess(access_rights_elements);
bool throw_if_not_exists = !drop_workload_query.if_exists;
current_context->getWorkloadEntityStorage().removeEntity(
current_context,
WorkloadEntityType::Workload,
drop_workload_query.workload_name,
throw_if_not_exists);
return {};
}
void registerInterpreterDropWorkloadQuery(InterpreterFactory & factory)
{
auto create_fn = [] (const InterpreterFactory::Arguments & args)
{
return std::make_unique<InterpreterDropWorkloadQuery>(args.query, args.context);
};
factory.registerInterpreter("InterpreterDropWorkloadQuery", create_fn);
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <Interpreters/IInterpreter.h>
namespace DB
{
class Context;
class InterpreterDropWorkloadQuery : public IInterpreter, WithMutableContext
{
public:
InterpreterDropWorkloadQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_) : WithMutableContext(context_), query_ptr(query_ptr_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
};
}

View File

@ -3,9 +3,13 @@
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTCreateResourceQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDeleteQuery.h>
#include <Parsers/ASTDropFunctionQuery.h>
#include <Parsers/ASTDropWorkloadQuery.h>
#include <Parsers/ASTDropResourceQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTUndropQuery.h>
@ -326,6 +330,22 @@ InterpreterFactory::InterpreterPtr InterpreterFactory::get(ASTPtr & query, Conte
{
interpreter_name = "InterpreterDropFunctionQuery";
}
else if (query->as<ASTCreateWorkloadQuery>())
{
interpreter_name = "InterpreterCreateWorkloadQuery";
}
else if (query->as<ASTDropWorkloadQuery>())
{
interpreter_name = "InterpreterDropWorkloadQuery";
}
else if (query->as<ASTCreateResourceQuery>())
{
interpreter_name = "InterpreterCreateResourceQuery";
}
else if (query->as<ASTDropResourceQuery>())
{
interpreter_name = "InterpreterDropResourceQuery";
}
else if (query->as<ASTCreateIndexQuery>())
{
interpreter_name = "InterpreterCreateIndexQuery";

View File

@ -52,6 +52,10 @@ void registerInterpreterExternalDDLQuery(InterpreterFactory & factory);
void registerInterpreterTransactionControlQuery(InterpreterFactory & factory);
void registerInterpreterCreateFunctionQuery(InterpreterFactory & factory);
void registerInterpreterDropFunctionQuery(InterpreterFactory & factory);
void registerInterpreterCreateWorkloadQuery(InterpreterFactory & factory);
void registerInterpreterDropWorkloadQuery(InterpreterFactory & factory);
void registerInterpreterCreateResourceQuery(InterpreterFactory & factory);
void registerInterpreterDropResourceQuery(InterpreterFactory & factory);
void registerInterpreterCreateIndexQuery(InterpreterFactory & factory);
void registerInterpreterCreateNamedCollectionQuery(InterpreterFactory & factory);
void registerInterpreterDropIndexQuery(InterpreterFactory & factory);
@ -111,6 +115,10 @@ void registerInterpreters()
registerInterpreterTransactionControlQuery(factory);
registerInterpreterCreateFunctionQuery(factory);
registerInterpreterDropFunctionQuery(factory);
registerInterpreterCreateWorkloadQuery(factory);
registerInterpreterDropWorkloadQuery(factory);
registerInterpreterCreateResourceQuery(factory);
registerInterpreterDropResourceQuery(factory);
registerInterpreterCreateIndexQuery(factory);
registerInterpreterCreateNamedCollectionQuery(factory);
registerInterpreterDropIndexQuery(factory);

View File

@ -0,0 +1,47 @@
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTCreateResourceQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
namespace DB
{
ASTPtr ASTCreateResourceQuery::clone() const
{
auto res = std::make_shared<ASTCreateResourceQuery>(*this);
res->children.clear();
res->resource_name = resource_name->clone();
res->children.push_back(res->resource_name);
return res;
}
void ASTCreateResourceQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE ";
if (or_replace)
settings.ostr << "OR REPLACE ";
settings.ostr << "RESOURCE ";
if (if_not_exists)
settings.ostr << "IF NOT EXISTS ";
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getResourceName()) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
}
String ASTCreateResourceQuery::getResourceName() const
{
String name;
tryGetIdentifierNameInto(resource_name, name);
return name;
}
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
class ASTCreateResourceQuery : public IAST, public ASTQueryWithOnCluster
{
public:
ASTPtr resource_name;
// TODO(serxa): add resource definition
bool or_replace = false;
bool if_not_exists = false;
String getID(char delim) const override { return "CreateResourceQuery" + (delim + getResourceName()); }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams &) const override { return removeOnCluster<ASTCreateResourceQuery>(clone()); }
String getResourceName() const;
QueryKind getQueryKind() const override { return QueryKind::Create; }
};
}

View File

@ -0,0 +1,67 @@
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
namespace DB
{
ASTPtr ASTCreateWorkloadQuery::clone() const
{
auto res = std::make_shared<ASTCreateWorkloadQuery>(*this);
res->children.clear();
res->workload_name = workload_name->clone();
res->children.push_back(res->workload_name);
// TODO(serxa): clone settings
return res;
}
void ASTCreateWorkloadQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE ";
if (or_replace)
settings.ostr << "OR REPLACE ";
settings.ostr << "WORKLOAD ";
if (if_not_exists)
settings.ostr << "IF NOT EXISTS ";
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getWorkloadName()) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
if (hasParent())
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getWorkloadParent()) << (settings.hilite ? hilite_none : "");
}
}
String ASTCreateWorkloadQuery::getWorkloadName() const
{
String name;
tryGetIdentifierNameInto(workload_name, name);
return name;
}
bool ASTCreateWorkloadQuery::hasParent() const
{
return workload_parent != nullptr;
}
String ASTCreateWorkloadQuery::getWorkloadParent() const
{
String name;
tryGetIdentifierNameInto(workload_parent, name);
return name;
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
class ASTCreateWorkloadQuery : public IAST, public ASTQueryWithOnCluster
{
public:
ASTPtr workload_name;
ASTPtr workload_parent;
// TODO(serxa): add workload settings (weight and priority should also go inside settings, because they can differ for different resources)
bool or_replace = false;
bool if_not_exists = false;
String getID(char delim) const override { return "CreateWorkloadQuery" + (delim + getWorkloadName()); }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams &) const override { return removeOnCluster<ASTCreateWorkloadQuery>(clone()); }
String getWorkloadName() const;
bool hasParent() const;
String getWorkloadParent() const;
QueryKind getQueryKind() const override { return QueryKind::Create; }
};
}

View File

@ -0,0 +1,25 @@
#include <Parsers/ASTDropResourceQuery.h>
#include <Common/quoteString.h>
#include <IO/Operators.h>
namespace DB
{
ASTPtr ASTDropResourceQuery::clone() const
{
return std::make_shared<ASTDropResourceQuery>(*this);
}
void ASTDropResourceQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "DROP RESOURCE ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(resource_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
class ASTDropResourceQuery : public IAST, public ASTQueryWithOnCluster
{
public:
String resource_name;
bool if_exists = false;
String getID(char) const override { return "DropResourceQuery"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams &) const override { return removeOnCluster<ASTDropResourceQuery>(clone()); }
QueryKind getQueryKind() const override { return QueryKind::Drop; }
};
}

View File

@ -0,0 +1,25 @@
#include <Parsers/ASTDropWorkloadQuery.h>
#include <Common/quoteString.h>
#include <IO/Operators.h>
namespace DB
{
ASTPtr ASTDropWorkloadQuery::clone() const
{
return std::make_shared<ASTDropWorkloadQuery>(*this);
}
void ASTDropWorkloadQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "DROP WORKLOAD ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(workload_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
class ASTDropWorkloadQuery : public IAST, public ASTQueryWithOnCluster
{
public:
String workload_name;
bool if_exists = false;
String getID(char) const override { return "DropWorkloadQuery"; }
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const WithoutOnClusterASTRewriteParams &) const override { return removeOnCluster<ASTDropWorkloadQuery>(clone()); }
QueryKind getQueryKind() const override { return QueryKind::Drop; }
};
}

View File

@ -408,6 +408,7 @@ namespace DB
MR_MACROS(REPLACE, "REPLACE") \
MR_MACROS(RESET_SETTING, "RESET SETTING") \
MR_MACROS(RESET_AUTHENTICATION_METHODS_TO_NEW, "RESET AUTHENTICATION METHODS TO NEW") \
MR_MACROS(RESOURCE, "RESOURCE") \
MR_MACROS(RESPECT_NULLS, "RESPECT NULLS") \
MR_MACROS(RESTORE, "RESTORE") \
MR_MACROS(RESTRICT, "RESTRICT") \
@ -520,6 +521,7 @@ namespace DB
MR_MACROS(WHEN, "WHEN") \
MR_MACROS(WHERE, "WHERE") \
MR_MACROS(WINDOW, "WINDOW") \
MR_MACROS(WORKLOAD, "WORKLOAD") \
MR_MACROS(QUALIFY, "QUALIFY") \
MR_MACROS(WITH_ADMIN_OPTION, "WITH ADMIN OPTION") \
MR_MACROS(WITH_CHECK, "WITH CHECK") \

View File

@ -0,0 +1,62 @@
#include <Parsers/ParserCreateResourceQuery.h>
#include <Parsers/ASTCreateResourceQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
namespace DB
{
bool ParserCreateResourceQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_create(Keyword::CREATE);
ParserKeyword s_resource(Keyword::RESOURCE);
ParserKeyword s_or_replace(Keyword::OR_REPLACE);
ParserKeyword s_if_not_exists(Keyword::IF_NOT_EXISTS);
ParserKeyword s_on(Keyword::ON);
ParserIdentifier resource_name_p;
// TODO(serxa): parse resource definition
ASTPtr resource_name;
String cluster_str;
bool or_replace = false;
bool if_not_exists = false;
if (!s_create.ignore(pos, expected))
return false;
if (s_or_replace.ignore(pos, expected))
or_replace = true;
if (!s_resource.ignore(pos, expected))
return false;
if (!or_replace && s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!resource_name_p.parse(pos, resource_name, expected))
return false;
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
auto create_resource_query = std::make_shared<ASTCreateResourceQuery>();
node = create_resource_query;
create_resource_query->resource_name = resource_name;
create_resource_query->children.push_back(resource_name);
create_resource_query->or_replace = or_replace;
create_resource_query->if_not_exists = if_not_exists;
create_resource_query->cluster = std::move(cluster_str);
return true;
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include "IParserBase.h"
namespace DB
{
/// CREATE RESOURCE cache_io (WRITE DISK s3diskWithCache, READ DISK s3diskWithCache)
class ParserCreateResourceQuery : public IParserBase
{
protected:
const char * getName() const override { return "CREATE RESOURCE query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -0,0 +1,76 @@
#include <Parsers/ParserCreateWorkloadQuery.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
namespace DB
{
bool ParserCreateWorkloadQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_create(Keyword::CREATE);
ParserKeyword s_workload(Keyword::WORKLOAD);
ParserKeyword s_or_replace(Keyword::OR_REPLACE);
ParserKeyword s_if_not_exists(Keyword::IF_NOT_EXISTS);
ParserIdentifier workload_name_p;
ParserKeyword s_on(Keyword::ON);
ParserKeyword s_in(Keyword::IN);
// TODO(serxa): parse workload settings
ASTPtr workload_name;
ASTPtr workload_parent;
String cluster_str;
bool or_replace = false;
bool if_not_exists = false;
if (!s_create.ignore(pos, expected))
return false;
if (s_or_replace.ignore(pos, expected))
or_replace = true;
if (!s_workload.ignore(pos, expected))
return false;
if (!or_replace && s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!workload_name_p.parse(pos, workload_name, expected))
return false;
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
if (s_in.ignore(pos, expected))
{
if (!workload_name_p.parse(pos, workload_parent, expected))
return false;
}
auto create_workload_query = std::make_shared<ASTCreateWorkloadQuery>();
node = create_workload_query;
create_workload_query->workload_name = workload_name;
create_workload_query->children.push_back(workload_name);
if (workload_parent)
{
create_workload_query->workload_parent = workload_parent;
create_workload_query->children.push_back(workload_parent);
}
create_workload_query->or_replace = or_replace;
create_workload_query->if_not_exists = if_not_exists;
create_workload_query->cluster = std::move(cluster_str);
return true;
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include "IParserBase.h"
namespace DB
{
/// CREATE WORKLOAD production IN all SETTINGS weight = 3, max_speed = '1G' FOR network_read, max_speed = '2G' FOR network_write
class ParserCreateWorkloadQuery : public IParserBase
{
protected:
const char * getName() const override { return "CREATE WORKLOAD query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -0,0 +1,52 @@
#include <Parsers/ASTDropResourceQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserDropResourceQuery.h>
namespace DB
{
bool ParserDropResourceQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_drop(Keyword::DROP);
ParserKeyword s_resource(Keyword::RESOURCE);
ParserKeyword s_if_exists(Keyword::IF_EXISTS);
ParserKeyword s_on(Keyword::ON);
ParserIdentifier resource_name_p;
String cluster_str;
bool if_exists = false;
ASTPtr resource_name;
if (!s_drop.ignore(pos, expected))
return false;
if (!s_resource.ignore(pos, expected))
return false;
if (s_if_exists.ignore(pos, expected))
if_exists = true;
if (!resource_name_p.parse(pos, resource_name, expected))
return false;
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
auto drop_resource_query = std::make_shared<ASTDropResourceQuery>();
drop_resource_query->if_exists = if_exists;
drop_resource_query->cluster = std::move(cluster_str);
node = drop_resource_query;
drop_resource_query->resource_name = resource_name->as<ASTIdentifier &>().name();
return true;
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include "IParserBase.h"
namespace DB
{
/// DROP RESOURCE resource1
class ParserDropResourceQuery : public IParserBase
{
protected:
const char * getName() const override { return "DROP RESOURCE query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -0,0 +1,52 @@
#include <Parsers/ASTDropWorkloadQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserDropWorkloadQuery.h>
namespace DB
{
bool ParserDropWorkloadQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_drop(Keyword::DROP);
ParserKeyword s_workload(Keyword::WORKLOAD);
ParserKeyword s_if_exists(Keyword::IF_EXISTS);
ParserKeyword s_on(Keyword::ON);
ParserIdentifier workload_name_p;
String cluster_str;
bool if_exists = false;
ASTPtr workload_name;
if (!s_drop.ignore(pos, expected))
return false;
if (!s_workload.ignore(pos, expected))
return false;
if (s_if_exists.ignore(pos, expected))
if_exists = true;
if (!workload_name_p.parse(pos, workload_name, expected))
return false;
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
auto drop_workload_query = std::make_shared<ASTDropWorkloadQuery>();
drop_workload_query->if_exists = if_exists;
drop_workload_query->cluster = std::move(cluster_str);
node = drop_workload_query;
drop_workload_query->workload_name = workload_name->as<ASTIdentifier &>().name();
return true;
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include "IParserBase.h"
namespace DB
{
/// DROP WORKLOAD workload1
class ParserDropWorkloadQuery : public IParserBase
{
protected:
const char * getName() const override { return "DROP WORKLOAD query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -1,8 +1,12 @@
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Parsers/ParserCreateWorkloadQuery.h>
#include <Parsers/ParserCreateResourceQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserCreateIndexQuery.h>
#include <Parsers/ParserDropFunctionQuery.h>
#include <Parsers/ParserDropWorkloadQuery.h>
#include <Parsers/ParserDropResourceQuery.h>
#include <Parsers/ParserDropIndexQuery.h>
#include <Parsers/ParserDropNamedCollectionQuery.h>
#include <Parsers/ParserAlterNamedCollectionQuery.h>
@ -48,6 +52,10 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserCreateSettingsProfileQuery create_settings_profile_p;
ParserCreateFunctionQuery create_function_p;
ParserDropFunctionQuery drop_function_p;
ParserCreateWorkloadQuery create_workload_p;
ParserDropWorkloadQuery drop_workload_p;
ParserCreateResourceQuery create_resource_p;
ParserDropResourceQuery drop_resource_p;
ParserCreateNamedCollectionQuery create_named_collection_p;
ParserDropNamedCollectionQuery drop_named_collection_p;
ParserAlterNamedCollectionQuery alter_named_collection_p;
@ -74,6 +82,10 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| create_function_p.parse(pos, node, expected)
|| drop_function_p.parse(pos, node, expected)
|| create_workload_p.parse(pos, node, expected)
|| drop_workload_p.parse(pos, node, expected)
|| create_resource_p.parse(pos, node, expected)
|| drop_resource_p.parse(pos, node, expected)
|| create_named_collection_p.parse(pos, node, expected)
|| drop_named_collection_p.parse(pos, node, expected)
|| alter_named_collection_p.parse(pos, node, expected)

View File

@ -84,12 +84,12 @@ ColumnsDescription StorageSystemScheduler::getColumnsDescription()
void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
{
context->getResourceManager()->forEachNode([&] (const String & resource, const String & path, const String & type, const SchedulerNodePtr & node)
context->getResourceManager()->forEachNode([&] (const String & resource, const String & path, ISchedulerNode * node)
{
size_t i = 0;
res_columns[i++]->insert(resource);
res_columns[i++]->insert(path);
res_columns[i++]->insert(type);
res_columns[i++]->insert(node->getTypeName());
res_columns[i++]->insert(node->info.weight);
res_columns[i++]->insert(node->info.priority.value);
res_columns[i++]->insert(node->isActive());
@ -118,23 +118,23 @@ void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr c
if (auto * parent = dynamic_cast<FairPolicy *>(node->parent))
{
if (auto value = parent->getChildVRuntime(node.get()))
if (auto value = parent->getChildVRuntime(node))
vruntime = *value;
}
if (auto * ptr = dynamic_cast<FairPolicy *>(node.get()))
if (auto * ptr = dynamic_cast<FairPolicy *>(node))
system_vruntime = ptr->getSystemVRuntime();
if (auto * ptr = dynamic_cast<FifoQueue *>(node.get()))
if (auto * ptr = dynamic_cast<FifoQueue *>(node))
std::tie(queue_length, queue_cost) = ptr->getQueueLengthAndCost();
if (auto * ptr = dynamic_cast<ISchedulerQueue *>(node.get()))
if (auto * ptr = dynamic_cast<ISchedulerQueue *>(node))
budget = ptr->getBudget();
if (auto * ptr = dynamic_cast<ISchedulerConstraint *>(node.get()))
if (auto * ptr = dynamic_cast<ISchedulerConstraint *>(node))
is_satisfied = ptr->isSatisfied();
if (auto * ptr = dynamic_cast<SemaphoreConstraint *>(node.get()))
if (auto * ptr = dynamic_cast<SemaphoreConstraint *>(node))
{
std::tie(inflight_requests, inflight_cost) = ptr->getInflights();
std::tie(max_requests, max_cost) = ptr->getLimits();
}
if (auto * ptr = dynamic_cast<ThrottlerConstraint *>(node.get()))
if (auto * ptr = dynamic_cast<ThrottlerConstraint *>(node))
{
std::tie(max_speed, max_burst) = ptr->getParams();
throttling_us = ptr->getThrottlingDuration().count() / 1000;

View File

@ -0,0 +1,48 @@
#include <DataTypes/DataTypeString.h>
#include <Interpreters/Context.h>
#include <Parsers/queryToString.h>
#include <Storages/System/StorageSystemWorkloads.h>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
namespace DB
{
ColumnsDescription StorageSystemWorkloads::getColumnsDescription()
{
return ColumnsDescription
{
{"name", std::make_shared<DataTypeString>(), "The name of the workload."},
{"parent", std::make_shared<DataTypeString>(), "The name of the parent workload."},
{"create_query", std::make_shared<DataTypeString>(), "CREATE query of the workload."},
};
}
void StorageSystemWorkloads::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
{
const auto & storage = context->getWorkloadEntityStorage();
const auto & workload_names = storage.getAllEntityNames(WorkloadEntityType::Workload);
for (const auto & workload_name : workload_names)
{
auto ast = storage.get(workload_name);
auto & workload = typeid_cast<ASTCreateWorkloadQuery &>(*ast);
res_columns[0]->insert(workload_name);
res_columns[1]->insert(workload.getWorkloadParent());
res_columns[2]->insert(queryToString(ast));
}
}
void StorageSystemWorkloads::backupData(BackupEntriesCollector & /*backup_entries_collector*/, const String & /*data_path_in_backup*/, const std::optional<ASTs> & /* partitions */)
{
// TODO(serxa): add backup for workloads and resources
// storage.backup(backup_entries_collector, data_path_in_backup);
}
void StorageSystemWorkloads::restoreDataFromBackup(RestorerFromBackup & /*restorer*/, const String & /*data_path_in_backup*/, const std::optional<ASTs> & /* partitions */)
{
// TODO(serxa): add restore for workloads and resources
// storage.restore(restorer, data_path_in_backup);
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
/// Implements `workloads` system table, which allows you to get a list of all workloads
class StorageSystemWorkloads final : public IStorageSystemOneBlock
{
public:
std::string getName() const override { return "SystemWorkloads"; }
static ColumnsDescription getColumnsDescription();
void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
};
}

View File

@ -23,6 +23,7 @@
#include <Storages/System/StorageSystemEvents.h>
#include <Storages/System/StorageSystemFormats.h>
#include <Storages/System/StorageSystemFunctions.h>
#include <Storages/System/StorageSystemWorkloads.h>
#include <Storages/System/StorageSystemGraphite.h>
#include <Storages/System/StorageSystemMacros.h>
#include <Storages/System/StorageSystemMerges.h>
@ -229,6 +230,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
attachNoDescription<StorageSystemS3Queue>(context, system_database, "s3queue", "Contains in-memory state of S3Queue metadata and currently processed rows per file.");
attach<StorageSystemDashboards>(context, system_database, "dashboards", "Contains queries used by /dashboard page accessible though HTTP interface. This table can be useful for monitoring and troubleshooting. The table contains a row for every chart in a dashboard.");
attach<StorageSystemViewRefreshes>(context, system_database, "view_refreshes", "Lists all Refreshable Materialized Views of current server.");
attach<StorageSystemWorkloads>(context, system_database, "workloads", "Contains a list of all currently existing workloads.");
if (has_zookeeper)
{

View File

@ -0,0 +1,5 @@
all CREATE WORKLOAD `all`
development all CREATE WORKLOAD development IN `all`
production all CREATE WORKLOAD production IN `all`
all CREATE WORKLOAD `all`
all CREATE WORKLOAD `all`

View File

@ -0,0 +1,11 @@
-- Tags: no-parallel
-- Do not run this test in parallel because `all` workload might affect other queries execution process
CREATE OR REPLACE WORKLOAD all;
SELECT name, parent, create_query FROM system.workloads;
CREATE WORKLOAD IF NOT EXISTS production IN all;
CREATE WORKLOAD development IN all;
SELECT name, parent, create_query FROM system.workloads;
DROP WORKLOAD IF EXISTS production;
DROP WORKLOAD development;
SELECT name, parent, create_query FROM system.workloads;
DROP WORKLOAD all;