mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
add tests for max_speed and max_bust
This commit is contained in:
parent
86515e1bce
commit
3ff86a4347
@ -1,9 +1,9 @@
|
||||
#include <chrono>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "Common/Priority.h"
|
||||
#include <Common/Scheduler/Nodes/tests/ResourceTest.h>
|
||||
|
||||
#include <Common/Priority.h>
|
||||
#include <Common/Scheduler/Nodes/FairPolicy.h>
|
||||
#include <Common/Scheduler/Nodes/UnifiedSchedulerNode.h>
|
||||
|
||||
@ -301,3 +301,122 @@ TEST(SchedulerUnifiedNode, List)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(SchedulerUnifiedNode, ThrottlerLeakyBucket)
|
||||
{
|
||||
ResourceTest t;
|
||||
EventQueue::TimePoint start = std::chrono::system_clock::now();
|
||||
t.process(start, 0);
|
||||
|
||||
auto all = t.createUnifiedNode("all", {.priority = Priority{}, .max_speed = 10.0, .max_burst = 20.0});
|
||||
|
||||
t.enqueue(all, {10, 10, 10, 10, 10, 10, 10, 10});
|
||||
|
||||
t.process(start + std::chrono::seconds(0));
|
||||
t.consumed("all", 30); // It is allowed to go below zero for exactly one resource request
|
||||
|
||||
t.process(start + std::chrono::seconds(1));
|
||||
t.consumed("all", 10);
|
||||
|
||||
t.process(start + std::chrono::seconds(2));
|
||||
t.consumed("all", 10);
|
||||
|
||||
t.process(start + std::chrono::seconds(3));
|
||||
t.consumed("all", 10);
|
||||
|
||||
t.process(start + std::chrono::seconds(4));
|
||||
t.consumed("all", 10);
|
||||
|
||||
t.process(start + std::chrono::seconds(100500));
|
||||
t.consumed("all", 10);
|
||||
}
|
||||
|
||||
TEST(SchedulerUnifiedNode, ThrottlerPacing)
|
||||
{
|
||||
ResourceTest t;
|
||||
EventQueue::TimePoint start = std::chrono::system_clock::now();
|
||||
t.process(start, 0);
|
||||
|
||||
// Zero burst allows you to send one request of any `size` and than throttle for `size/max_speed` seconds.
|
||||
// Useful if outgoing traffic should be "paced", i.e. have the least possible burstiness.
|
||||
auto all = t.createUnifiedNode("all", {.priority = Priority{}, .max_speed = 1.0, .max_burst = 0.0});
|
||||
|
||||
t.enqueue(all, {1, 2, 3, 1, 2, 1});
|
||||
int output[] = {1, 2, 0, 3, 0, 0, 1, 2, 0, 1, 0};
|
||||
for (int i = 0; i < std::size(output); i++)
|
||||
{
|
||||
t.process(start + std::chrono::seconds(i));
|
||||
t.consumed("all", output[i]);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(SchedulerUnifiedNode, ThrottlerBucketFilling)
|
||||
{
|
||||
ResourceTest t;
|
||||
EventQueue::TimePoint start = std::chrono::system_clock::now();
|
||||
t.process(start, 0);
|
||||
|
||||
auto all = t.createUnifiedNode("all", {.priority = Priority{}, .max_speed = 10.0, .max_burst = 100.0});
|
||||
|
||||
t.enqueue(all, {100});
|
||||
|
||||
t.process(start + std::chrono::seconds(0));
|
||||
t.consumed("all", 100); // consume all tokens, but it is still active (not negative)
|
||||
|
||||
t.process(start + std::chrono::seconds(5));
|
||||
t.consumed("all", 0); // There was nothing to consume
|
||||
|
||||
t.enqueue(all, {10, 10, 10, 10, 10, 10, 10, 10, 10, 10});
|
||||
t.process(start + std::chrono::seconds(5));
|
||||
t.consumed("all", 60); // 5 sec * 10 tokens/sec = 50 tokens + 1 extra request to go below zero
|
||||
|
||||
t.process(start + std::chrono::seconds(100));
|
||||
t.consumed("all", 40); // Consume rest
|
||||
|
||||
t.process(start + std::chrono::seconds(200));
|
||||
|
||||
t.enqueue(all, {95, 1, 1, 1, 1, 1, 1, 1, 1, 1});
|
||||
t.process(start + std::chrono::seconds(200));
|
||||
t.consumed("all", 101); // check we cannot consume more than max_burst + 1 request
|
||||
|
||||
t.process(start + std::chrono::seconds(100500));
|
||||
t.consumed("all", 3);
|
||||
}
|
||||
|
||||
TEST(SchedulerUnifiedNode, ThrottlerAndFairness)
|
||||
{
|
||||
ResourceTest t;
|
||||
EventQueue::TimePoint start = std::chrono::system_clock::now();
|
||||
t.process(start, 0);
|
||||
|
||||
auto all = t.createUnifiedNode("all", {.priority = Priority{}, .max_speed = 10.0, .max_burst = 100.0});
|
||||
auto a = t.createUnifiedNode("A", all, {.weight = 10.0, .priority = Priority{}});
|
||||
auto b = t.createUnifiedNode("B", all, {.weight = 90.0, .priority = Priority{}});
|
||||
|
||||
ResourceCost req_cost = 1;
|
||||
ResourceCost total_cost = 2000;
|
||||
for (int i = 0; i < total_cost / req_cost; i++)
|
||||
{
|
||||
t.enqueue(a, {req_cost});
|
||||
t.enqueue(b, {req_cost});
|
||||
}
|
||||
|
||||
double shareA = 0.1;
|
||||
double shareB = 0.9;
|
||||
|
||||
// Bandwidth-latency coupling due to fairness: worst latency is inversely proportional to share
|
||||
auto max_latencyA = static_cast<ResourceCost>(req_cost * (1.0 + 1.0 / shareA));
|
||||
auto max_latencyB = static_cast<ResourceCost>(req_cost * (1.0 + 1.0 / shareB));
|
||||
|
||||
double consumedA = 0;
|
||||
double consumedB = 0;
|
||||
for (int seconds = 0; seconds < 100; seconds++)
|
||||
{
|
||||
t.process(start + std::chrono::seconds(seconds));
|
||||
double arrival_curve = 100.0 + 10.0 * seconds + req_cost;
|
||||
t.consumed("A", static_cast<ResourceCost>(arrival_curve * shareA - consumedA), max_latencyA);
|
||||
t.consumed("B", static_cast<ResourceCost>(arrival_curve * shareB - consumedB), max_latencyB);
|
||||
consumedA = arrival_curve * shareA;
|
||||
consumedB = arrival_curve * shareB;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user