diff --git a/src/Common/Scheduler/Nodes/IOResourceManager.cpp b/src/Common/Scheduler/Nodes/IOResourceManager.cpp index cf67bf2dfcb..101a0fa4c32 100644 --- a/src/Common/Scheduler/Nodes/IOResourceManager.cpp +++ b/src/Common/Scheduler/Nodes/IOResourceManager.cpp @@ -137,7 +137,7 @@ void IOResourceManager::Resource::updateNode(const NodeInfo & old_info, const No throw Exception(ErrorCodes::LOGICAL_ERROR, "Updating a name of workload '{}' to '{}' is not allowed in resource '{}'", old_info.name, new_info.name, resource_name); - if (old_info.parent != new_info.parent && (old_info.parent.empty() || old_info.parent.empty())) + if (old_info.parent != new_info.parent && (old_info.parent.empty() || new_info.parent.empty())) throw Exception(ErrorCodes::LOGICAL_ERROR, "Workload '{}' invalid update of parent from '{}' to '{}' in resource '{}'", old_info.name, old_info.parent, new_info.parent, resource_name); @@ -157,22 +157,20 @@ void IOResourceManager::Resource::updateNode(const NodeInfo & old_info, const No { auto node = node_for_workload[old_info.name]; bool detached = false; - if (old_info.parent != new_info.parent) + if (UnifiedSchedulerNode::updateRequiresDetach(old_info.parent, new_info.parent, old_info.settings, new_info.settings)) { - node_for_workload[old_info.parent]->detachUnifiedChild(node); + if (!old_info.parent.empty()) + node_for_workload[old_info.parent]->detachUnifiedChild(node); detached = true; } node->updateSchedulingSettings(new_info.settings); - if (!detached && !old_info.parent.empty() && old_info.settings.priority != new_info.settings.priority) - node_for_workload[old_info.parent]->updateUnifiedChildPriority( - node, - old_info.settings.priority, - new_info.settings.priority); if (detached) - node_for_workload[new_info.parent]->attachUnifiedChild(node); - + { + if (!new_info.parent.empty()) + node_for_workload[new_info.parent]->attachUnifiedChild(node); + } updateCurrentVersion(); }); } @@ -268,7 +266,7 @@ IOResourceManager::IOResourceManager(IWorkloadEntityStorage & storage_) case WorkloadEntityType::Resource: { if (entity) - createResource(entity_name, entity); + createOrUpdateResource(entity_name, entity); else deleteResource(entity_name); break; @@ -315,14 +313,11 @@ void IOResourceManager::deleteWorkload(const String & workload_name) } } -void IOResourceManager::createResource(const String & resource_name, const ASTPtr & ast) +void IOResourceManager::createOrUpdateResource(const String & resource_name, const ASTPtr & ast) { std::unique_lock lock{mutex}; if (auto resource_iter = resources.find(resource_name); resource_iter != resources.end()) - { - // Resource to be created already exist -- do nothing, throwing exceptions from a subscription is pointless - // TODO(serxa): add logging - } + resource_iter->second->updateResource(ast); else { // Add all workloads into the new resource @@ -420,6 +415,12 @@ void IOResourceManager::Classifier::attach(const ResourcePtr & resource, const V attachments[resource->getName()] = Attachment{.resource = resource, .version = version, .link = link}; } +void IOResourceManager::Resource::updateResource(const ASTPtr & new_resource_entity) +{ + chassert(getEntityName(new_resource_entity) == resource_name); + resource_entity = new_resource_entity; +} + std::future IOResourceManager::Resource::attachClassifier(Classifier & classifier, const String & workload_name) { auto attach_promise = std::make_shared>(); // event queue task is std::function, which requires copy semantics diff --git a/src/Common/Scheduler/Nodes/IOResourceManager.h b/src/Common/Scheduler/Nodes/IOResourceManager.h index f4871379456..dc57b985455 100644 --- a/src/Common/Scheduler/Nodes/IOResourceManager.h +++ b/src/Common/Scheduler/Nodes/IOResourceManager.h @@ -173,6 +173,9 @@ private: void deleteNode(const NodeInfo & info); void updateNode(const NodeInfo & old_info, const NodeInfo & new_info); + /// Updates resource entity + void updateResource(const ASTPtr & new_resource_entity); + /// Updates a classifier to contain a reference for specified workload std::future attachClassifier(Classifier & classifier, const String & workload_name); @@ -205,7 +208,7 @@ private: future.get(); // Blocks until execution is done in the scheduler thread } - const ASTPtr resource_entity; + ASTPtr resource_entity; const String resource_name; SchedulerRoot scheduler; @@ -256,7 +259,7 @@ private: void createOrUpdateWorkload(const String & workload_name, const ASTPtr & ast); void deleteWorkload(const String & workload_name); - void createResource(const String & resource_name, const ASTPtr & ast); + void createOrUpdateResource(const String & resource_name, const ASTPtr & ast); void deleteResource(const String & resource_name); // Topological sorting of worklaods diff --git a/src/Common/Scheduler/Nodes/UnifiedSchedulerNode.h b/src/Common/Scheduler/Nodes/UnifiedSchedulerNode.h index 2de5131efbb..f0ec17a8dca 100644 --- a/src/Common/Scheduler/Nodes/UnifiedSchedulerNode.h +++ b/src/Common/Scheduler/Nodes/UnifiedSchedulerNode.h @@ -160,6 +160,14 @@ private: // Returns true iff there are no unified children attached bool empty() const { return branches.empty(); } + SchedulerNodePtr getRoot() + { + chassert(!branches.empty()); + if (root) + return root; + return branches.begin()->second.getRoot(); // There should be exactly one child-branch + } + /// Attaches a new child. /// Returns root node if it has been changed to a different node, otherwise returns null. [[nodiscard]] SchedulerNodePtr attachUnifiedChild(EventQueue * event_queue_, const UnifiedSchedulerNodePtr & child) @@ -244,6 +252,14 @@ private: SchedulerNodePtr queue; /// FifoQueue node is used if there are no children ChildrenBranch branch; /// Used if there is at least one child + SchedulerNodePtr getRoot() + { + if (queue) + return queue; + else + return branch.getRoot(); + } + // Should be called after constructor, before any other methods [[nodiscard]] SchedulerNodePtr initialize(EventQueue * event_queue_) { @@ -354,6 +370,52 @@ private: } return {}; } + + /// Detaches a child. + /// Returns root node if it has been changed to a different node, otherwise returns null. + [[nodiscard]] SchedulerNodePtr updateSchedulingSettings(EventQueue * event_queue_, const SchedulingSettings & new_settings) + { + SchedulerNodePtr node = branch.getRoot(); + + if (!settings.hasSemaphore() && new_settings.hasSemaphore()) // Add semaphore + { + semaphore = std::make_shared(event_queue_, SchedulerNodeInfo{}, new_settings.max_requests, new_settings.max_cost); + semaphore->basename = "semaphore"; + reparent(node, semaphore); + node = semaphore; + } + else if (settings.hasSemaphore() && !new_settings.hasSemaphore()) // Remove semaphore + { + detach(semaphore); + semaphore.reset(); + } + else if (settings.hasSemaphore() && new_settings.hasSemaphore()) // Update semaphore + { + static_cast(*semaphore).updateConstraints(semaphore, new_settings.max_requests, new_settings.max_cost); + node = semaphore; + } + + if (!settings.hasThrottler() && new_settings.hasThrottler()) // Add throttler + { + throttler = std::make_shared(event_queue_, SchedulerNodeInfo{}, new_settings.max_speed, new_settings.max_burst); + throttler->basename = "throttler"; + reparent(node, throttler); + node = throttler; + } + else if (settings.hasThrottler() && !new_settings.hasThrottler()) // Remove throttler + { + detach(throttler); + throttler.reset(); + } + else if (settings.hasThrottler() && new_settings.hasThrottler()) // Update throttler + { + static_cast(*throttler).updateConstraints(new_settings.max_speed, new_settings.max_burst); + node = throttler; + } + + settings = new_settings; + return node; + } }; public: @@ -388,20 +450,19 @@ public: reparent(new_child, this); } - /// Updates intermediate nodes subtree according with new priority (priority is set by the caller beforehand) - /// NOTE: Changing a priority of a unified child may lead to change of its parent. - void updateUnifiedChildPriority(const UnifiedSchedulerNodePtr & child, Priority old_priority, Priority new_priority) + static bool updateRequiresDetach(const String & old_parent, const String & new_parent, const SchedulingSettings & old_settings, const SchedulingSettings & new_settings) { - UNUSED(child, old_priority, new_priority); // TODO(serxa): implement updateUnifiedChildPriority() + return old_parent != new_parent || old_settings.priority != new_settings.priority; } /// Updates scheduling settings. Set of constraints might change. - /// NOTE: Caller is responsible for calling `updateUnifiedChildPriority` in parent unified node (if any) + /// NOTE: Caller is responsible for detaching and attaching if `updateRequiresDetach` returns true void updateSchedulingSettings(const SchedulingSettings & new_settings) { - UNUSED(new_settings); // TODO(serxa): implement updateSchedulingSettings() info.setPriority(new_settings.priority); info.setWeight(new_settings.weight); + if (auto new_child = impl.updateSchedulingSettings(event_queue, new_settings)) + reparent(new_child, this); } /// Returns the queue to be used for resource requests or `nullptr` if it has unified children @@ -418,8 +479,7 @@ public: /// all unified nodes. Such a version control is done by `IOResourceManager`. void addRawPointerNodes(std::vector & nodes) { - if (impl.throttler) - nodes.push_back(impl.throttler); + // NOTE: `impl.throttler` could be skipped, because ThrottlerConstraint does not call `request->addConstraint()` if (impl.semaphore) nodes.push_back(impl.semaphore); if (impl.branch.queue)