add SETTINGS clause in CREATE WORKLOAD query

This commit is contained in:
serxa 2024-09-23 09:51:16 +00:00
parent 823271ddd6
commit 956b40ec24
6 changed files with 219 additions and 51 deletions

View File

@ -43,8 +43,7 @@ IOResourceManager::NodeInfo::NodeInfo(const ASTPtr & ast, const String & resourc
auto * create = typeid_cast<ASTCreateWorkloadQuery *>(ast.get());
name = create->getWorkloadName();
parent = create->getWorkloadParent();
// TODO(serxa): parse workload settings specifically for `resource_name`
UNUSED(resource_name);
settings.updateFromAST(create->settings, resource_name);
}
IOResourceManager::Resource::Resource(const ASTPtr & resource_entity_)
@ -205,21 +204,45 @@ IOResourceManager::Workload::Workload(IOResourceManager * resource_manager_, con
: resource_manager(resource_manager_)
, workload_entity(workload_entity_)
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->createNode(NodeInfo(workload_entity, resource_name));
try
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->createNode(NodeInfo(workload_entity, resource_name));
}
catch (...)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error in IOResourceManager: {}",
getCurrentExceptionMessage(/* with_stacktrace = */ true));
}
}
IOResourceManager::Workload::~Workload()
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->deleteNode(NodeInfo(workload_entity, resource_name));
try
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->deleteNode(NodeInfo(workload_entity, resource_name));
}
catch (...)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error in IOResourceManager: {}",
getCurrentExceptionMessage(/* with_stacktrace = */ true));
}
}
void IOResourceManager::Workload::updateWorkload(const ASTPtr & new_entity)
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->updateNode(NodeInfo(workload_entity, resource_name), NodeInfo(new_entity, resource_name));
workload_entity = new_entity;
try
{
for (auto & [resource_name, resource] : resource_manager->resources)
resource->updateNode(NodeInfo(workload_entity, resource_name), NodeInfo(new_entity, resource_name));
workload_entity = new_entity;
}
catch (...)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error in IOResourceManager: {}",
getCurrentExceptionMessage(/* with_stacktrace = */ true));
}
}
String IOResourceManager::Workload::getParent() const
@ -233,36 +256,29 @@ IOResourceManager::IOResourceManager(IWorkloadEntityStorage & storage_)
subscription = storage.getAllEntitiesAndSubscribe(
[this] (const std::vector<IWorkloadEntityStorage::Event> & events)
{
try
for (auto [entity_type, entity_name, entity] : events)
{
for (auto [entity_type, entity_name, entity] : events)
switch (entity_type)
{
switch (entity_type)
case WorkloadEntityType::Workload:
{
case WorkloadEntityType::Workload:
{
if (entity)
createOrUpdateWorkload(entity_name, entity);
else
deleteWorkload(entity_name);
break;
}
case WorkloadEntityType::Resource:
{
if (entity)
createResource(entity_name, entity);
else
deleteResource(entity_name);
break;
}
case WorkloadEntityType::MAX: break;
if (entity)
createOrUpdateWorkload(entity_name, entity);
else
deleteWorkload(entity_name);
break;
}
case WorkloadEntityType::Resource:
{
if (entity)
createResource(entity_name, entity);
else
deleteResource(entity_name);
break;
}
case WorkloadEntityType::MAX: break;
}
}
catch (...)
{
// TODO(serxa): handle CRUD errors
}
});
}

View File

@ -0,0 +1,83 @@
#include <Common/Scheduler/SchedulingSettings.h>
#include <Parsers/ASTSetQuery.h>
namespace DB
{
void SchedulingSettings::updateFromAST(const ASTPtr & settings, const String & resource_name)
{
UNUSED(resource_name); // TODO(serxa): read resource specific settings from AST
if (auto * set = typeid_cast<ASTSetQuery *>(settings.get()))
{
std::optional<Float64> new_weight;
std::optional<Priority> new_priority;
std::optional<Float64> new_max_speed;
std::optional<Float64> new_max_burst;
std::optional<Int64> new_max_requests;
std::optional<Int64> new_max_cost;
// Read changed setting values
for (const auto & [name, value] : set->changes)
{
// TODO(serxa): we should validate workloads with this function before storing in WorkloadEntityStorage
// TODO(serxa): and probably we should add and persist version in filename for future changes
if (name == "weight")
new_weight = value.safeGet<Float64>();
else if (name == "priority")
new_priority = Priority{value.safeGet<Priority::Value>()};
else if (name == "max_speed")
new_max_speed = value.safeGet<Float64>();
else if (name == "max_burst")
new_max_burst = value.safeGet<Float64>();
else if (name == "max_requests")
new_max_requests = value.safeGet<Float64>();
else if (name == "max_cost")
new_max_cost = value.safeGet<Float64>();
}
// Read setting to be reset to default values
static SchedulingSettings default_settings;
bool reset_max_burst = false;
for (const String & name : set->default_settings)
{
if (name == "weight")
new_weight = default_settings.weight;
else if (name == "priority")
new_priority = default_settings.priority;
else if (name == "max_speed")
new_max_speed = default_settings.max_speed;
else if (name == "max_burst")
reset_max_burst = true;
else if (name == "max_requests")
new_max_requests = default_settings.max_requests;
else if (name == "max_cost")
new_max_cost = default_settings.max_cost;
}
if (reset_max_burst)
new_max_burst = default_burst_seconds * (new_max_speed ? *new_max_speed : max_speed);
// Save new values into the `this` object
// Leave previous value intentionally for ALTER query to be able to skip not mentioned setting value
if (new_weight)
weight = *new_weight;
if (new_priority)
priority = *new_priority;
if (new_max_speed)
{
max_speed = *new_max_speed;
// We always set max_burst if max_speed is changed.
// This is done for users to be able to ignore more advanced max_burst setting and rely only on max_speed
if (!new_max_burst)
max_burst = default_burst_seconds * max_speed;
}
if (new_max_burst)
max_burst = *new_max_burst;
if (new_max_requests)
max_requests = *new_max_requests;
if (new_max_cost)
max_cost = *new_max_cost;
}
}
}

View File

@ -3,6 +3,7 @@
#include <base/types.h>
#include <Common/Priority.h>
#include <Parsers/IAST_fwd.h>
#include <limits>
@ -12,14 +13,14 @@ namespace DB
struct SchedulingSettings
{
/// Priority and weight among siblings
double weight = 1.0;
Float64 weight = 1.0;
Priority priority;
/// Throttling constraints.
/// Up to 2 independent throttlers: one for average speed and one for peek speed.
static constexpr double default_burst_seconds = 1.0;
double max_speed = 0; // Zero means unlimited
double max_burst = 0; // default is `default_burst_seconds * max_speed`
static constexpr Float64 default_burst_seconds = 1.0;
Float64 max_speed = 0; // Zero means unlimited
Float64 max_burst = 0; // default is `default_burst_seconds * max_speed`
/// Limits total number of concurrent resource requests that are allowed to consume
static constexpr Int64 default_max_requests = std::numeric_limits<Int64>::max();
@ -32,7 +33,7 @@ struct SchedulingSettings
bool hasThrottler() const { return max_speed != 0; }
bool hasSemaphore() const { return max_requests != default_max_requests || max_cost != default_max_cost; }
// TODO(serxa): add helper functions for parsing, printing and validating
void updateFromAST(const ASTPtr & settings, const String & resource_name);
};
}

View File

@ -15,33 +15,49 @@ ASTPtr ASTCreateWorkloadQuery::clone() const
res->workload_name = workload_name->clone();
res->children.push_back(res->workload_name);
// TODO(serxa): clone settings
if (workload_parent)
{
res->workload_parent = workload_parent->clone();
res->children.push_back(res->workload_parent);
}
if (settings)
{
res->settings = settings->clone();
res->children.push_back(res->settings);
}
return res;
}
void ASTCreateWorkloadQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
void ASTCreateWorkloadQuery::formatImpl(const IAST::FormatSettings & format_settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE ";
format_settings.ostr << (format_settings.hilite ? hilite_keyword : "") << "CREATE ";
if (or_replace)
settings.ostr << "OR REPLACE ";
format_settings.ostr << "OR REPLACE ";
settings.ostr << "WORKLOAD ";
format_settings.ostr << "WORKLOAD ";
if (if_not_exists)
settings.ostr << "IF NOT EXISTS ";
format_settings.ostr << "IF NOT EXISTS ";
settings.ostr << (settings.hilite ? hilite_none : "");
format_settings.ostr << (format_settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getWorkloadName()) << (settings.hilite ? hilite_none : "");
format_settings.ostr << (format_settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getWorkloadName()) << (format_settings.hilite ? hilite_none : "");
formatOnCluster(settings);
formatOnCluster(format_settings);
if (hasParent())
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN " << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getWorkloadParent()) << (settings.hilite ? hilite_none : "");
format_settings.ostr << (format_settings.hilite ? hilite_keyword : "") << " IN " << (format_settings.hilite ? hilite_none : "");
format_settings.ostr << (format_settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(getWorkloadParent()) << (format_settings.hilite ? hilite_none : "");
}
if (settings)
{
format_settings.ostr << ' ' << (format_settings.hilite ? hilite_keyword : "") << "SETTINGS" << (format_settings.hilite ? hilite_none : "") << ' ';
settings->format(format_settings);
}
}

View File

@ -12,7 +12,7 @@ class ASTCreateWorkloadQuery : public IAST, public ASTQueryWithOnCluster
public:
ASTPtr workload_name;
ASTPtr workload_parent;
// TODO(serxa): add workload settings (weight and priority should also go inside settings, because they can differ for different resources)
ASTPtr settings;
bool or_replace = false;
bool if_not_exists = false;

View File

@ -2,13 +2,61 @@
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserSetQuery.h>
#include <Common/SettingsChanges.h>
namespace DB
{
namespace
{
bool parseSettings(IParser::Pos & pos, Expected & expected, ASTPtr & settings)
{
return IParserBase::wrapParseImpl(pos, [&]
{
if (!ParserKeyword(Keyword::SETTINGS).ignore(pos, expected))
return false;
SettingsChanges settings_changes;
auto parse_setting = [&]
{
SettingChange setting;
if (ParserSetQuery::parseNameValuePair(setting, pos, expected))
{
settings_changes.push_back(std::move(setting));
// TODO(serxa): parse optional clause: [FOR resource_name]
return true;
}
return false;
};
if (!ParserList::parseUtil(pos, expected, parse_setting, false))
return false;
ASTPtr res_settings;
if (!settings_changes.empty())
{
auto settings_changes_ast = std::make_shared<ASTSetQuery>();
settings_changes_ast->changes = std::move(settings_changes);
settings_changes_ast->is_standalone = false;
res_settings = settings_changes_ast;
}
settings = std::move(res_settings);
return true;
});
}
}
bool ParserCreateWorkloadQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_create(Keyword::CREATE);
@ -18,7 +66,6 @@ bool ParserCreateWorkloadQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
ParserIdentifier workload_name_p;
ParserKeyword s_on(Keyword::ON);
ParserKeyword s_in(Keyword::IN);
// TODO(serxa): parse workload settings
ASTPtr workload_name;
ASTPtr workload_parent;
@ -54,6 +101,9 @@ bool ParserCreateWorkloadQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
return false;
}
ASTPtr settings;
parseSettings(pos, expected, settings);
auto create_workload_query = std::make_shared<ASTCreateWorkloadQuery>();
node = create_workload_query;
@ -70,6 +120,8 @@ bool ParserCreateWorkloadQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
create_workload_query->if_not_exists = if_not_exists;
create_workload_query->cluster = std::move(cluster_str);
create_workload_query->settings = std::move(settings);
return true;
}