fixes and tests for DROP WORKLOAD query

This commit is contained in:
serxa 2024-10-02 17:54:43 +00:00
parent 809f0ee0a2
commit 7722a5e4fa
8 changed files with 115 additions and 13 deletions

View File

@ -52,6 +52,13 @@ public:
: ISchedulerNode(event_queue_, info_)
{}
~FairPolicy() override
{
// We need to clear `parent` in all children to avoid dangling references
while (!children.empty())
removeChild(children.begin()->second.get());
}
const String & getTypeName() const override
{
static String type_name("fair");

View File

@ -125,6 +125,8 @@ void IOResourceManager::Resource::deleteNode(const NodeInfo & info)
root_node.reset();
}
node_for_workload.erase(info.name);
updateCurrentVersion();
});
}

View File

@ -43,6 +43,13 @@ public:
: ISchedulerNode(event_queue_, node_info)
{}
~PriorityPolicy() override
{
// We need to clear `parent` in all children to avoid dangling references
while (!children.empty())
removeChild(children.begin()->second.get());
}
const String & getTypeName() const override
{
static String type_name("priority");

View File

@ -31,6 +31,13 @@ public:
, max_cost(max_cost_)
{}
~SemaphoreConstraint() override
{
// We need to clear `parent` in child to avoid dangling references
if (child)
removeChild(child.get());
}
const String & getTypeName() const override
{
static String type_name("inflight_limit");

View File

@ -38,6 +38,10 @@ public:
{
// We should cancel event on destruction to avoid dangling references from event queue
event_queue->cancelPostponed(postponed);
// We need to clear `parent` in child to avoid dangling reference
if (child)
removeChild(child.get());
}
const String & getTypeName() const override

View File

@ -67,6 +67,7 @@ private:
/// Helper function for managing a parent of a node
static void reparent(const SchedulerNodePtr & node, ISchedulerNode * new_parent)
{
chassert(node);
chassert(new_parent);
if (new_parent == node->parent)
return;
@ -139,7 +140,8 @@ private:
{
// Remove fair if the only child has left
chassert(root);
root.reset(); // it will be still alive because it is attached to hierarchy for now
detach(root);
root.reset();
return children.begin()->second; // The last child is a new root now
}
else if (children.empty())
@ -216,7 +218,8 @@ private:
{
// Remove priority node if the only child-branch has left
chassert(root);
root.reset(); // it will be still alive because it is attached to hierarchy for now
detach(root);
root.reset();
return branches.begin()->second.getRoot(); // The last child-branch is a new root now
}
else if (branches.empty())
@ -361,6 +364,13 @@ public:
reparent(immediate_child, this);
}
~UnifiedSchedulerNode() override
{
// We need to clear `parent` in child to avoid dangling references
if (immediate_child)
removeChild(immediate_child.get());
}
/// Attaches a unified child as a leaf of internal subtree and insert or update all the intermediate nodes
/// NOTE: Do not confuse with `attachChild()` which is used only for immediate children
void attachUnifiedChild(const UnifiedSchedulerNodePtr & child)

View File

@ -31,24 +31,24 @@ namespace ErrorCodes
class SchedulerRoot : public ISchedulerNode
{
private:
struct TResource
struct Resource
{
SchedulerNodePtr root;
// Intrusive cyclic list of active resources
TResource * next = nullptr;
TResource * prev = nullptr;
Resource * next = nullptr;
Resource * prev = nullptr;
explicit TResource(const SchedulerNodePtr & root_)
explicit Resource(const SchedulerNodePtr & root_)
: root(root_)
{
root->info.parent.ptr = this;
}
// Get pointer stored by ctor in info
static TResource * get(SchedulerNodeInfo & info)
static Resource * get(SchedulerNodeInfo & info)
{
return reinterpret_cast<TResource *>(info.parent.ptr);
return reinterpret_cast<Resource *>(info.parent.ptr);
}
};
@ -60,6 +60,8 @@ public:
~SchedulerRoot() override
{
stop();
while (!children.empty())
removeChild(children.begin()->first);
}
/// Runs separate scheduler thread
@ -185,7 +187,7 @@ public:
void activateChild(ISchedulerNode * child) override
{
activate(TResource::get(child->info));
activate(Resource::get(child->info));
}
void setParent(ISchedulerNode *) override
@ -194,7 +196,7 @@ public:
}
private:
void activate(TResource * value)
void activate(Resource * value)
{
assert(value->next == nullptr && value->prev == nullptr);
if (current == nullptr) // No active children
@ -212,7 +214,7 @@ private:
}
}
void deactivate(TResource * value)
void deactivate(Resource * value)
{
if (value->next == nullptr)
return; // Already deactivated
@ -257,8 +259,8 @@ private:
request->execute();
}
TResource * current = nullptr; // round-robin pointer
std::unordered_map<ISchedulerNode *, TResource> children; // resources by pointer
Resource * current = nullptr; // round-robin pointer
std::unordered_map<ISchedulerNode *, Resource> children; // resources by pointer
std::atomic<bool> stop_flag = false;
EventQueue events;
ThreadFromGlobalPool scheduler;

View File

@ -5,6 +5,7 @@
import time
import threading
import pytest
import random
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
@ -647,6 +648,68 @@ def test_create_workload():
do_checks()
def test_workload_hierarchy_changes():
node.query("create resource io_write (write disk s3_no_resource);")
node.query("create resource io_read (read disk s3_no_resource);")
queries = [
"create workload all;",
"create workload X in all settings priority = 0;",
"create workload Y in all settings priority = 1;",
"create workload A1 in X settings priority = -1;",
"create workload B1 in X settings priority = 1;",
"create workload C1 in Y settings priority = -1;",
"create workload D1 in Y settings priority = 1;",
"create workload A2 in X settings priority = -1;",
"create workload B2 in X settings priority = 1;",
"create workload C2 in Y settings priority = -1;",
"create workload D2 in Y settings priority = 1;",
"drop workload A1;",
"drop workload A2;",
"drop workload B1;",
"drop workload B2;",
"drop workload C1;",
"drop workload C2;",
"drop workload D1;",
"drop workload D2;",
"create workload Z in all;",
"create workload A1 in Z settings priority = -1;",
"create workload A2 in Z settings priority = -1;",
"create workload A3 in Z settings priority = -1;",
"create workload B1 in Z settings priority = 1;",
"create workload B2 in Z settings priority = 1;",
"create workload B3 in Z settings priority = 1;",
"create workload C1 in X settings priority = -1;",
"create workload C2 in X settings priority = -1;",
"create workload C3 in X settings priority = -1;",
"create workload D1 in X settings priority = 1;",
"create workload D2 in X settings priority = 1;",
"create workload D3 in X settings priority = 1;",
"drop workload A1;",
"drop workload B1;",
"drop workload C1;",
"drop workload D1;",
"drop workload A2;",
"drop workload B2;",
"drop workload C2;",
"drop workload D2;",
"drop workload A3;",
"drop workload B3;",
"drop workload C3;",
"drop workload D3;",
"drop workload X;",
"drop workload Y;",
"drop workload Z;",
"drop workload all;",
]
for iteration in range(3):
split_idx = random.randint(1, len(queries) - 2)
for query_idx in range(0, split_idx):
node.query(queries[query_idx])
node.query("create resource io_test (write disk non_existent_disk, read disk non_existent_disk);")
node.query("drop resource io_test;")
for query_idx in range(split_idx, len(queries)):
node.query(queries[query_idx])
def test_resource_read_and_write():
node.query(