diff --git a/docs/en/operations/system-tables/scheduler.md b/docs/en/operations/system-tables/scheduler.md index c4de7f76fdc..953db4c28f2 100644 --- a/docs/en/operations/system-tables/scheduler.md +++ b/docs/en/operations/system-tables/scheduler.md @@ -26,9 +26,7 @@ priority: 0 is_active: 0 active_children: 0 dequeued_requests: 67 -canceled_requests: 0 dequeued_cost: 4692272 -canceled_cost: 0 busy_periods: 63 vruntime: 938454.1999999989 system_vruntime: ᴺᵁᴸᴸ @@ -56,9 +54,7 @@ Columns: - `is_active` (`UInt8`) - Whether this node is currently active - has resource requests to be dequeued and constraints satisfied. - `active_children` (`UInt64`) - The number of children in active state. - `dequeued_requests` (`UInt64`) - The total number of resource requests dequeued from this node. -- `canceled_requests` (`UInt64`) - The total number of resource requests canceled from this node. - `dequeued_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests dequeued from this node. -- `canceled_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests canceled from this node. - `busy_periods` (`UInt64`) - The total number of deactivations of this node. - `vruntime` (`Nullable(Float64)`) - For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner. - `system_vruntime` (`Nullable(Float64)`) - For `fair` nodes only. Virtual runtime showing `vruntime` of the last processed resource request. Used during child activation as the new value of `vruntime`. diff --git a/src/Common/Scheduler/ISchedulerNode.h b/src/Common/Scheduler/ISchedulerNode.h index 20c1f4332da..804026d7bf4 100644 --- a/src/Common/Scheduler/ISchedulerNode.h +++ b/src/Common/Scheduler/ISchedulerNode.h @@ -387,9 +387,7 @@ public: /// Introspection std::atomic dequeued_requests{0}; - std::atomic canceled_requests{0}; std::atomic dequeued_cost{0}; - std::atomic canceled_cost{0}; std::atomic busy_periods{0}; }; diff --git a/src/Common/Scheduler/ISchedulerQueue.h b/src/Common/Scheduler/ISchedulerQueue.h index 532f4bf6c63..cbe63bd304a 100644 --- a/src/Common/Scheduler/ISchedulerQueue.h +++ b/src/Common/Scheduler/ISchedulerQueue.h @@ -50,12 +50,6 @@ public: /// Should be called outside of scheduling subsystem, implementation must be thread-safe. virtual void enqueueRequest(ResourceRequest * request) = 0; - /// Cancel previously enqueued request. - /// Returns `false` and does nothing given unknown or already executed request. - /// Returns `true` if requests has been found and canceled. - /// Should be called outside of scheduling subsystem, implementation must be thread-safe. - virtual bool cancelRequest(ResourceRequest * request) = 0; - /// For introspection ResourceCost getBudget() const { diff --git a/src/Common/Scheduler/Nodes/FairPolicy.h b/src/Common/Scheduler/Nodes/FairPolicy.h index ce2bf729a04..c0e187e6fa9 100644 --- a/src/Common/Scheduler/Nodes/FairPolicy.h +++ b/src/Common/Scheduler/Nodes/FairPolicy.h @@ -134,65 +134,56 @@ public: std::pair dequeueRequest() override { - // Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr` - while (true) + if (heap_size == 0) + return {nullptr, false}; + + // Recursively pull request from child + auto [request, child_active] = items.front().child->dequeueRequest(); + assert(request != nullptr); + std::pop_heap(items.begin(), items.begin() + heap_size); + Item & current = items[heap_size - 1]; + + // SFQ fairness invariant: system vruntime equals last served request start-time + assert(current.vruntime >= system_vruntime); + system_vruntime = current.vruntime; + + // By definition vruntime is amount of consumed resource (cost) divided by weight + current.vruntime += double(request->cost) / current.child->info.weight; + max_vruntime = std::max(max_vruntime, current.vruntime); + + if (child_active) // Put active child back in heap after vruntime update { - if (heap_size == 0) - return {nullptr, false}; - - // Recursively pull request from child - auto [request, child_active] = items.front().child->dequeueRequest(); - std::pop_heap(items.begin(), items.begin() + heap_size); - Item & current = items[heap_size - 1]; - - if (request) - { - // SFQ fairness invariant: system vruntime equals last served request start-time - assert(current.vruntime >= system_vruntime); - system_vruntime = current.vruntime; - - // By definition vruntime is amount of consumed resource (cost) divided by weight - current.vruntime += double(request->cost) / current.child->info.weight; - max_vruntime = std::max(max_vruntime, current.vruntime); - } - - if (child_active) // Put active child back in heap after vruntime update - { - std::push_heap(items.begin(), items.begin() + heap_size); - } - else // Deactivate child if it is empty, but remember it's vruntime for latter activations - { - heap_size--; - - // Store index of this inactive child in `parent.idx` - // This enables O(1) search of inactive children instead of O(n) - current.child->info.parent.idx = heap_size; - } - - // Reset any difference between children on busy period end - if (heap_size == 0) - { - // Reset vtime to zero to avoid floating-point error accumulation, - // but do not reset too often, because it's O(N) - UInt64 ns = clock_gettime_ns(); - if (last_reset_ns + 1000000000 < ns) - { - last_reset_ns = ns; - for (Item & item : items) - item.vruntime = 0; - max_vruntime = 0; - } - system_vruntime = max_vruntime; - busy_periods++; - } - - if (request) - { - dequeued_requests++; - dequeued_cost += request->cost; - return {request, heap_size > 0}; - } + std::push_heap(items.begin(), items.begin() + heap_size); } + else // Deactivate child if it is empty, but remember it's vruntime for latter activations + { + heap_size--; + + // Store index of this inactive child in `parent.idx` + // This enables O(1) search of inactive children instead of O(n) + current.child->info.parent.idx = heap_size; + } + + // Reset any difference between children on busy period end + if (heap_size == 0) + { + // Reset vtime to zero to avoid floating-point error accumulation, + // but do not reset too often, because it's O(N) + UInt64 ns = clock_gettime_ns(); + if (last_reset_ns + 1000000000 < ns) + { + last_reset_ns = ns; + for (Item & item : items) + item.vruntime = 0; + max_vruntime = 0; + } + system_vruntime = max_vruntime; + busy_periods++; + } + + dequeued_requests++; + dequeued_cost += request->cost; + return {request, heap_size > 0}; } bool isActive() override diff --git a/src/Common/Scheduler/Nodes/FifoQueue.h b/src/Common/Scheduler/Nodes/FifoQueue.h index 45ed32343ff..38ae902bc2f 100644 --- a/src/Common/Scheduler/Nodes/FifoQueue.h +++ b/src/Common/Scheduler/Nodes/FifoQueue.h @@ -39,7 +39,8 @@ public: void enqueueRequest(ResourceRequest * request) override { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); + request->enqueue_ns = clock_gettime_ns(); queue_cost += request->cost; bool was_empty = requests.empty(); requests.push_back(request); @@ -49,7 +50,7 @@ public: std::pair dequeueRequest() override { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); if (requests.empty()) return {nullptr, false}; ResourceRequest * result = requests.front(); @@ -62,29 +63,9 @@ public: return {result, !requests.empty()}; } - bool cancelRequest(ResourceRequest * request) override - { - std::lock_guard lock(mutex); - // TODO(serxa): reimplement queue as intrusive list of ResourceRequest to make this O(1) instead of O(N) - for (auto i = requests.begin(), e = requests.end(); i != e; ++i) - { - if (*i == request) - { - requests.erase(i); - if (requests.empty()) - busy_periods++; - queue_cost -= request->cost; - canceled_requests++; - canceled_cost += request->cost; - return true; - } - } - return false; - } - bool isActive() override { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); return !requests.empty(); } @@ -117,14 +98,14 @@ public: std::pair getQueueLengthAndCost() { - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); return {requests.size(), queue_cost}; } private: std::mutex mutex; Int64 queue_cost = 0; - std::deque requests; // TODO(serxa): reimplement it using intrusive list to avoid allocations/deallocations and O(N) during cancel + std::deque requests; }; } diff --git a/src/Common/Scheduler/Nodes/PriorityPolicy.h b/src/Common/Scheduler/Nodes/PriorityPolicy.h index 9b4cfc37f8c..6d6b15bd063 100644 --- a/src/Common/Scheduler/Nodes/PriorityPolicy.h +++ b/src/Common/Scheduler/Nodes/PriorityPolicy.h @@ -102,31 +102,25 @@ public: std::pair dequeueRequest() override { - // Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr` - while (true) + if (items.empty()) + return {nullptr, false}; + + // Recursively pull request from child + auto [request, child_active] = items.front().child->dequeueRequest(); + assert(request != nullptr); + + // Deactivate child if it is empty + if (!child_active) { + std::pop_heap(items.begin(), items.end()); + items.pop_back(); if (items.empty()) - return {nullptr, false}; - - // Recursively pull request from child - auto [request, child_active] = items.front().child->dequeueRequest(); - - // Deactivate child if it is empty - if (!child_active) - { - std::pop_heap(items.begin(), items.end()); - items.pop_back(); - if (items.empty()) - busy_periods++; - } - - if (request) - { - dequeued_requests++; - dequeued_cost += request->cost; - return {request, !items.empty()}; - } + busy_periods++; } + + dequeued_requests++; + dequeued_cost += request->cost; + return {request, !items.empty()}; } bool isActive() override diff --git a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp index cdf09776077..961a3b6f713 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_dynamic_resource_manager.cpp @@ -38,6 +38,7 @@ TEST(SchedulerDynamicResourceManager, Smoke) { ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking); gA.lock(); + gA.setFailure(); gA.unlock(); ResourceGuard gB(cB->get("res1")); diff --git a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp index e76639a4b01..9fefbc02cbd 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_resource_scheduler.cpp @@ -4,7 +4,6 @@ #include -#include #include using namespace DB; @@ -74,22 +73,6 @@ struct ResourceHolder } }; -struct MyRequest : public ResourceRequest -{ - std::function on_execute; - - explicit MyRequest(ResourceCost cost_, std::function on_execute_) - : ResourceRequest(cost_) - , on_execute(on_execute_) - {} - - void execute() override - { - if (on_execute) - on_execute(); - } -}; - TEST(SchedulerRoot, Smoke) { ResourceTest t; @@ -128,49 +111,3 @@ TEST(SchedulerRoot, Smoke) EXPECT_TRUE(fc2->requests.contains(&rg.request)); } } - -TEST(SchedulerRoot, Cancel) -{ - ResourceTest t; - - ResourceHolder r1(t); - auto * fc1 = r1.add("/", "1"); - r1.add("/prio"); - auto a = r1.addQueue("/prio/A", "1"); - auto b = r1.addQueue("/prio/B", "2"); - r1.registerResource(); - - std::barrier sync(2); - std::thread consumer1([&] - { - std::barrier destruct_sync(2); - MyRequest request(1,[&] - { - sync.arrive_and_wait(); // (A) - EXPECT_TRUE(fc1->requests.contains(&request)); - sync.arrive_and_wait(); // (B) - request.finish(); - destruct_sync.arrive_and_wait(); // (C) - }); - a.queue->enqueueRequest(&request); - destruct_sync.arrive_and_wait(); // (C) - }); - - std::thread consumer2([&] - { - MyRequest request(1,[&] - { - FAIL() << "This request must be canceled, but instead executes"; - }); - sync.arrive_and_wait(); // (A) wait for request of consumer1 to be inside execute, so that constraint is in violated state and our request will not be executed immediately - b.queue->enqueueRequest(&request); - bool canceled = b.queue->cancelRequest(&request); - EXPECT_TRUE(canceled); - sync.arrive_and_wait(); // (B) release request of consumer1 to be finished - }); - - consumer1.join(); - consumer2.join(); - - EXPECT_TRUE(fc1->requests.empty()); -} diff --git a/src/Common/Scheduler/ResourceGuard.h b/src/Common/Scheduler/ResourceGuard.h index 50f665a384b..dca4041b176 100644 --- a/src/Common/Scheduler/ResourceGuard.h +++ b/src/Common/Scheduler/ResourceGuard.h @@ -71,7 +71,8 @@ public: // lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread chassert(state == Dequeued); state = Finished; - ResourceRequest::finish(); + if (constraint) + constraint->finishRequest(this); } static Request & local() @@ -125,6 +126,12 @@ public: } } + /// Mark request as unsuccessful; by default request is considered to be successful + void setFailure() + { + request.successful = false; + } + ResourceLink link; Request & request; }; diff --git a/src/Common/Scheduler/ResourceRequest.cpp b/src/Common/Scheduler/ResourceRequest.cpp deleted file mode 100644 index 26e8084cdfa..00000000000 --- a/src/Common/Scheduler/ResourceRequest.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include -#include - -namespace DB -{ - -void ResourceRequest::finish() -{ - if (constraint) - constraint->finishRequest(this); -} - -} diff --git a/src/Common/Scheduler/ResourceRequest.h b/src/Common/Scheduler/ResourceRequest.h index f3153ad382c..3d2230746f9 100644 --- a/src/Common/Scheduler/ResourceRequest.h +++ b/src/Common/Scheduler/ResourceRequest.h @@ -14,6 +14,9 @@ class ISchedulerConstraint; using ResourceCost = Int64; constexpr ResourceCost ResourceCostMax = std::numeric_limits::max(); +/// Timestamps (nanoseconds since epoch) +using ResourceNs = UInt64; + /* * Request for a resource consumption. The main moving part of the scheduling subsystem. * Resource requests processing workflow: @@ -28,7 +31,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits::max(); * 3) Scheduler calls ISchedulerNode::dequeueRequest() that returns the request. * 4) Callback ResourceRequest::execute() is called to provide access to the resource. * 5) The resource consumption is happening outside of the scheduling subsystem. - * 6) ResourceRequest::finish() is called when consumption is finished. + * 6) request->constraint->finishRequest() is called when consumption is finished. * * Steps (5) and (6) can be omitted if constraint is not used by the resource. * @@ -36,10 +39,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits::max(); * Request ownership is done outside of the scheduling subsystem. * After (6) request can be destructed safely. * - * Request can also be canceled before (3) using ISchedulerQueue::cancelRequest(). - * Returning false means it is too late for request to be canceled. It should be processed in a regular way. - * Returning true means successful cancel and therefore steps (4) and (5) are not going to happen - * and step (6) MUST be omitted. + * Request cancelling is not supported yet. */ class ResourceRequest { @@ -48,20 +48,32 @@ public: /// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it) ResourceCost cost; + /// Request outcome + /// Should be filled during resource consumption + bool successful; + /// Scheduler node to be notified on consumption finish /// Auto-filled during request enqueue/dequeue ISchedulerConstraint * constraint; + /// Timestamps for introspection + ResourceNs enqueue_ns; + ResourceNs execute_ns; + ResourceNs finish_ns; + explicit ResourceRequest(ResourceCost cost_ = 1) { reset(cost_); } - /// ResourceRequest object may be reused again after reset() void reset(ResourceCost cost_) { cost = cost_; + successful = true; constraint = nullptr; + enqueue_ns = 0; + execute_ns = 0; + finish_ns = 0; } virtual ~ResourceRequest() = default; @@ -71,12 +83,6 @@ public: /// just triggering start of a consumption, not doing the consumption itself /// (e.g. setting an std::promise or creating a job in a thread pool) virtual void execute() = 0; - - /// Stop resource consumption and notify resource scheduler. - /// Should be called when resource consumption is finished by consumer. - /// ResourceRequest should not be destructed or reset before calling to `finish()`. - /// WARNING: this function MUST not be called if request was canceled. - void finish(); }; } diff --git a/src/Common/Scheduler/SchedulerRoot.h b/src/Common/Scheduler/SchedulerRoot.h index ab3f702a422..3a23a8df834 100644 --- a/src/Common/Scheduler/SchedulerRoot.h +++ b/src/Common/Scheduler/SchedulerRoot.h @@ -145,27 +145,22 @@ public: std::pair dequeueRequest() override { - while (true) - { - if (current == nullptr) // No active resources - return {nullptr, false}; + if (current == nullptr) // No active resources + return {nullptr, false}; - // Dequeue request from current resource - auto [request, resource_active] = current->root->dequeueRequest(); + // Dequeue request from current resource + auto [request, resource_active] = current->root->dequeueRequest(); + assert(request != nullptr); - // Deactivate resource if required - if (!resource_active) - deactivate(current); - else - current = current->next; // Just move round-robin pointer + // Deactivate resource if required + if (!resource_active) + deactivate(current); + else + current = current->next; // Just move round-robin pointer - if (request == nullptr) // Possible in case of request cancel, just retry - continue; - - dequeued_requests++; - dequeued_cost += request->cost; - return {request, current != nullptr}; - } + dequeued_requests++; + dequeued_cost += request->cost; + return {request, current != nullptr}; } bool isActive() override @@ -250,6 +245,7 @@ private: void execute(ResourceRequest * request) { + request->execute_ns = clock_gettime_ns(); request->execute(); } diff --git a/src/Storages/System/StorageSystemScheduler.cpp b/src/Storages/System/StorageSystemScheduler.cpp index 633bac5d285..ba07d44dbf9 100644 --- a/src/Storages/System/StorageSystemScheduler.cpp +++ b/src/Storages/System/StorageSystemScheduler.cpp @@ -30,9 +30,7 @@ ColumnsDescription StorageSystemScheduler::getColumnsDescription() {"is_active", std::make_shared(), "Whether this node is currently active - has resource requests to be dequeued and constraints satisfied."}, {"active_children", std::make_shared(), "The number of children in active state."}, {"dequeued_requests", std::make_shared(), "The total number of resource requests dequeued from this node."}, - {"canceled_requests", std::make_shared(), "The total number of resource requests canceled from this node."}, {"dequeued_cost", std::make_shared(), "The sum of costs (e.g. size in bytes) of all requests dequeued from this node."}, - {"canceled_cost", std::make_shared(), "The sum of costs (e.g. size in bytes) of all requests canceled from this node."}, {"busy_periods", std::make_shared(), "The total number of deactivations of this node."}, {"vruntime", std::make_shared(std::make_shared()), "For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner."}, @@ -95,9 +93,7 @@ void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr c res_columns[i++]->insert(node->isActive()); res_columns[i++]->insert(node->activeChildren()); res_columns[i++]->insert(node->dequeued_requests.load()); - res_columns[i++]->insert(node->canceled_requests.load()); res_columns[i++]->insert(node->dequeued_cost.load()); - res_columns[i++]->insert(node->canceled_cost.load()); res_columns[i++]->insert(node->busy_periods.load()); Field vruntime;