manager support for CREATE OR REPLACE

This commit is contained in:
serxa 2024-10-07 11:21:14 +00:00
parent 93d0ed126a
commit b3b0e4fef6
3 changed files with 90 additions and 26 deletions

View File

@ -137,7 +137,7 @@ void IOResourceManager::Resource::updateNode(const NodeInfo & old_info, const No
throw Exception(ErrorCodes::LOGICAL_ERROR, "Updating a name of workload '{}' to '{}' is not allowed in resource '{}'",
old_info.name, new_info.name, resource_name);
if (old_info.parent != new_info.parent && (old_info.parent.empty() || old_info.parent.empty()))
if (old_info.parent != new_info.parent && (old_info.parent.empty() || new_info.parent.empty()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Workload '{}' invalid update of parent from '{}' to '{}' in resource '{}'",
old_info.name, old_info.parent, new_info.parent, resource_name);
@ -157,22 +157,20 @@ void IOResourceManager::Resource::updateNode(const NodeInfo & old_info, const No
{
auto node = node_for_workload[old_info.name];
bool detached = false;
if (old_info.parent != new_info.parent)
if (UnifiedSchedulerNode::updateRequiresDetach(old_info.parent, new_info.parent, old_info.settings, new_info.settings))
{
node_for_workload[old_info.parent]->detachUnifiedChild(node);
if (!old_info.parent.empty())
node_for_workload[old_info.parent]->detachUnifiedChild(node);
detached = true;
}
node->updateSchedulingSettings(new_info.settings);
if (!detached && !old_info.parent.empty() && old_info.settings.priority != new_info.settings.priority)
node_for_workload[old_info.parent]->updateUnifiedChildPriority(
node,
old_info.settings.priority,
new_info.settings.priority);
if (detached)
node_for_workload[new_info.parent]->attachUnifiedChild(node);
{
if (!new_info.parent.empty())
node_for_workload[new_info.parent]->attachUnifiedChild(node);
}
updateCurrentVersion();
});
}
@ -268,7 +266,7 @@ IOResourceManager::IOResourceManager(IWorkloadEntityStorage & storage_)
case WorkloadEntityType::Resource:
{
if (entity)
createResource(entity_name, entity);
createOrUpdateResource(entity_name, entity);
else
deleteResource(entity_name);
break;
@ -315,14 +313,11 @@ void IOResourceManager::deleteWorkload(const String & workload_name)
}
}
void IOResourceManager::createResource(const String & resource_name, const ASTPtr & ast)
void IOResourceManager::createOrUpdateResource(const String & resource_name, const ASTPtr & ast)
{
std::unique_lock lock{mutex};
if (auto resource_iter = resources.find(resource_name); resource_iter != resources.end())
{
// Resource to be created already exist -- do nothing, throwing exceptions from a subscription is pointless
// TODO(serxa): add logging
}
resource_iter->second->updateResource(ast);
else
{
// Add all workloads into the new resource
@ -420,6 +415,12 @@ void IOResourceManager::Classifier::attach(const ResourcePtr & resource, const V
attachments[resource->getName()] = Attachment{.resource = resource, .version = version, .link = link};
}
void IOResourceManager::Resource::updateResource(const ASTPtr & new_resource_entity)
{
chassert(getEntityName(new_resource_entity) == resource_name);
resource_entity = new_resource_entity;
}
std::future<void> IOResourceManager::Resource::attachClassifier(Classifier & classifier, const String & workload_name)
{
auto attach_promise = std::make_shared<std::promise<void>>(); // event queue task is std::function, which requires copy semantics

View File

@ -173,6 +173,9 @@ private:
void deleteNode(const NodeInfo & info);
void updateNode(const NodeInfo & old_info, const NodeInfo & new_info);
/// Updates resource entity
void updateResource(const ASTPtr & new_resource_entity);
/// Updates a classifier to contain a reference for specified workload
std::future<void> attachClassifier(Classifier & classifier, const String & workload_name);
@ -205,7 +208,7 @@ private:
future.get(); // Blocks until execution is done in the scheduler thread
}
const ASTPtr resource_entity;
ASTPtr resource_entity;
const String resource_name;
SchedulerRoot scheduler;
@ -256,7 +259,7 @@ private:
void createOrUpdateWorkload(const String & workload_name, const ASTPtr & ast);
void deleteWorkload(const String & workload_name);
void createResource(const String & resource_name, const ASTPtr & ast);
void createOrUpdateResource(const String & resource_name, const ASTPtr & ast);
void deleteResource(const String & resource_name);
// Topological sorting of worklaods

View File

@ -160,6 +160,14 @@ private:
// Returns true iff there are no unified children attached
bool empty() const { return branches.empty(); }
SchedulerNodePtr getRoot()
{
chassert(!branches.empty());
if (root)
return root;
return branches.begin()->second.getRoot(); // There should be exactly one child-branch
}
/// Attaches a new child.
/// Returns root node if it has been changed to a different node, otherwise returns null.
[[nodiscard]] SchedulerNodePtr attachUnifiedChild(EventQueue * event_queue_, const UnifiedSchedulerNodePtr & child)
@ -244,6 +252,14 @@ private:
SchedulerNodePtr queue; /// FifoQueue node is used if there are no children
ChildrenBranch branch; /// Used if there is at least one child
SchedulerNodePtr getRoot()
{
if (queue)
return queue;
else
return branch.getRoot();
}
// Should be called after constructor, before any other methods
[[nodiscard]] SchedulerNodePtr initialize(EventQueue * event_queue_)
{
@ -354,6 +370,52 @@ private:
}
return {};
}
/// Detaches a child.
/// Returns root node if it has been changed to a different node, otherwise returns null.
[[nodiscard]] SchedulerNodePtr updateSchedulingSettings(EventQueue * event_queue_, const SchedulingSettings & new_settings)
{
SchedulerNodePtr node = branch.getRoot();
if (!settings.hasSemaphore() && new_settings.hasSemaphore()) // Add semaphore
{
semaphore = std::make_shared<SemaphoreConstraint>(event_queue_, SchedulerNodeInfo{}, new_settings.max_requests, new_settings.max_cost);
semaphore->basename = "semaphore";
reparent(node, semaphore);
node = semaphore;
}
else if (settings.hasSemaphore() && !new_settings.hasSemaphore()) // Remove semaphore
{
detach(semaphore);
semaphore.reset();
}
else if (settings.hasSemaphore() && new_settings.hasSemaphore()) // Update semaphore
{
static_cast<SemaphoreConstraint&>(*semaphore).updateConstraints(semaphore, new_settings.max_requests, new_settings.max_cost);
node = semaphore;
}
if (!settings.hasThrottler() && new_settings.hasThrottler()) // Add throttler
{
throttler = std::make_shared<ThrottlerConstraint>(event_queue_, SchedulerNodeInfo{}, new_settings.max_speed, new_settings.max_burst);
throttler->basename = "throttler";
reparent(node, throttler);
node = throttler;
}
else if (settings.hasThrottler() && !new_settings.hasThrottler()) // Remove throttler
{
detach(throttler);
throttler.reset();
}
else if (settings.hasThrottler() && new_settings.hasThrottler()) // Update throttler
{
static_cast<ThrottlerConstraint&>(*throttler).updateConstraints(new_settings.max_speed, new_settings.max_burst);
node = throttler;
}
settings = new_settings;
return node;
}
};
public:
@ -388,20 +450,19 @@ public:
reparent(new_child, this);
}
/// Updates intermediate nodes subtree according with new priority (priority is set by the caller beforehand)
/// NOTE: Changing a priority of a unified child may lead to change of its parent.
void updateUnifiedChildPriority(const UnifiedSchedulerNodePtr & child, Priority old_priority, Priority new_priority)
static bool updateRequiresDetach(const String & old_parent, const String & new_parent, const SchedulingSettings & old_settings, const SchedulingSettings & new_settings)
{
UNUSED(child, old_priority, new_priority); // TODO(serxa): implement updateUnifiedChildPriority()
return old_parent != new_parent || old_settings.priority != new_settings.priority;
}
/// Updates scheduling settings. Set of constraints might change.
/// NOTE: Caller is responsible for calling `updateUnifiedChildPriority` in parent unified node (if any)
/// NOTE: Caller is responsible for detaching and attaching if `updateRequiresDetach` returns true
void updateSchedulingSettings(const SchedulingSettings & new_settings)
{
UNUSED(new_settings); // TODO(serxa): implement updateSchedulingSettings()
info.setPriority(new_settings.priority);
info.setWeight(new_settings.weight);
if (auto new_child = impl.updateSchedulingSettings(event_queue, new_settings))
reparent(new_child, this);
}
/// Returns the queue to be used for resource requests or `nullptr` if it has unified children
@ -418,8 +479,7 @@ public:
/// all unified nodes. Such a version control is done by `IOResourceManager`.
void addRawPointerNodes(std::vector<SchedulerNodePtr> & nodes)
{
if (impl.throttler)
nodes.push_back(impl.throttler);
// NOTE: `impl.throttler` could be skipped, because ThrottlerConstraint does not call `request->addConstraint()`
if (impl.semaphore)
nodes.push_back(impl.semaphore);
if (impl.branch.queue)