diff --git a/src/Common/Scheduler/Nodes/tests/gtest_io_resource_manager.cpp b/src/Common/Scheduler/Nodes/tests/gtest_io_resource_manager.cpp index 93c8439bdae..15cd6436c47 100644 --- a/src/Common/Scheduler/Nodes/tests/gtest_io_resource_manager.cpp +++ b/src/Common/Scheduler/Nodes/tests/gtest_io_resource_manager.cpp @@ -112,7 +112,7 @@ public: } private: - bool storeEntityImpl( + WorkloadEntityStorageBase::OperationResult storeEntityImpl( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, @@ -122,17 +122,17 @@ private: const Settings & settings) override { UNUSED(current_context, entity_type, entity_name, create_entity_query, throw_if_exists, replace_if_exists, settings); - return true; + return OperationResult::Ok; } - bool removeEntityImpl( + WorkloadEntityStorageBase::OperationResult removeEntityImpl( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, bool throw_if_not_exists) override { UNUSED(current_context, entity_type, entity_name, throw_if_not_exists); - return true; + return OperationResult::Ok; } }; diff --git a/src/Common/Scheduler/Workload/WorkloadEntityDiskStorage.cpp b/src/Common/Scheduler/Workload/WorkloadEntityDiskStorage.cpp index 190b2928fe0..0e67074c84b 100644 --- a/src/Common/Scheduler/Workload/WorkloadEntityDiskStorage.cpp +++ b/src/Common/Scheduler/Workload/WorkloadEntityDiskStorage.cpp @@ -198,7 +198,7 @@ void WorkloadEntityDiskStorage::createDirectory() } -bool WorkloadEntityDiskStorage::storeEntityImpl( +WorkloadEntityStorageBase::OperationResult WorkloadEntityDiskStorage::storeEntityImpl( const ContextPtr & /*current_context*/, WorkloadEntityType entity_type, const String & entity_name, @@ -216,7 +216,7 @@ bool WorkloadEntityDiskStorage::storeEntityImpl( if (throw_if_exists) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists", entity_name); else if (!replace_if_exists) - return false; + return OperationResult::Failed; } WriteBufferFromOwnString create_statement_buf; @@ -247,11 +247,11 @@ bool WorkloadEntityDiskStorage::storeEntityImpl( } LOG_TRACE(log, "Entity {} stored", backQuote(entity_name)); - return true; + return OperationResult::Ok; } -bool WorkloadEntityDiskStorage::removeEntityImpl( +WorkloadEntityStorageBase::OperationResult WorkloadEntityDiskStorage::removeEntityImpl( const ContextPtr & /*current_context*/, WorkloadEntityType entity_type, const String & entity_name, @@ -267,11 +267,11 @@ bool WorkloadEntityDiskStorage::removeEntityImpl( if (throw_if_not_exists) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' doesn't exist", entity_name); else - return false; + return OperationResult::Failed; } LOG_TRACE(log, "Entity {} removed", backQuote(entity_name)); - return true; + return OperationResult::Ok; } diff --git a/src/Common/Scheduler/Workload/WorkloadEntityDiskStorage.h b/src/Common/Scheduler/Workload/WorkloadEntityDiskStorage.h index ceb736372ae..b60a5075a02 100644 --- a/src/Common/Scheduler/Workload/WorkloadEntityDiskStorage.h +++ b/src/Common/Scheduler/Workload/WorkloadEntityDiskStorage.h @@ -16,7 +16,7 @@ public: void loadEntities() override; private: - bool storeEntityImpl( + OperationResult storeEntityImpl( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, @@ -25,7 +25,7 @@ private: bool replace_if_exists, const Settings & settings) override; - bool removeEntityImpl( + OperationResult removeEntityImpl( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, diff --git a/src/Common/Scheduler/Workload/WorkloadEntityKeeperStorage.cpp b/src/Common/Scheduler/Workload/WorkloadEntityKeeperStorage.cpp index e69de29bb2d..37d1cc568ec 100644 --- a/src/Common/Scheduler/Workload/WorkloadEntityKeeperStorage.cpp +++ b/src/Common/Scheduler/Workload/WorkloadEntityKeeperStorage.cpp @@ -0,0 +1,274 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace Setting +{ +extern const SettingsUInt64 max_parser_backtracks; +extern const SettingsUInt64 max_parser_depth; +} + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; +} + +WorkloadEntityKeeperStorage::WorkloadEntityKeeperStorage( + const ContextPtr & global_context_, const String & zookeeper_path_) + : WorkloadEntityStorageBase(global_context_) + , zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }} + , zookeeper_path{zookeeper_path_} + , watch_queue{std::make_shared>(std::numeric_limits::max())} + , log{getLogger("WorkloadEntityKeeperStorage")} +{ + if (zookeeper_path.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path must be non-empty"); + + if (zookeeper_path.back() == '/') + zookeeper_path.resize(zookeeper_path.size() - 1); + + /// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it. + if (zookeeper_path.front() != '/') + zookeeper_path = "/" + zookeeper_path; +} + +WorkloadEntityKeeperStorage::~WorkloadEntityKeeperStorage() +{ + SCOPE_EXIT_SAFE(stopWatchingThread()); +} + +void WorkloadEntityKeeperStorage::startWatchingThread() +{ + if (!watching_flag.exchange(true)) + watching_thread = ThreadFromGlobalPool(&WorkloadEntityKeeperStorage::processWatchQueue, this); +} + +void WorkloadEntityKeeperStorage::stopWatchingThread() +{ + if (watching_flag.exchange(false)) + { + watch_queue->finish(); + if (watching_thread.joinable()) + watching_thread.join(); + } +} + +zkutil::ZooKeeperPtr WorkloadEntityKeeperStorage::getZooKeeper() +{ + auto [zookeeper, session_status] = zookeeper_getter.getZooKeeper(); + + if (session_status == zkutil::ZooKeeperCachingGetter::SessionStatus::New) + { + /// It's possible that we connected to different [Zoo]Keeper instance + /// so we may read a bit stale state. + zookeeper->sync(zookeeper_path); + + createRootNodes(zookeeper); + refreshAllEntities(zookeeper); + } + + return zookeeper; +} + +void WorkloadEntityKeeperStorage::loadEntities() +{ + /// loadEntities() is called at start from Server::main(), so it's better not to stop here on no connection to ZooKeeper or any other error. + /// However the watching thread must be started anyway in case the connection will be established later. + if (!entities_loaded) + { + try + { + refreshAllEntities(getZooKeeper()); + startWatchingThread(); + } + catch (...) + { + tryLogCurrentException(log, "Failed to load workload entities"); + } + } + startWatchingThread(); +} + + +void WorkloadEntityKeeperStorage::processWatchQueue() +{ + LOG_DEBUG(log, "Started watching thread"); + setThreadName("WrkldEntWatch"); + + while (watching_flag) + { + try + { + /// Re-initialize ZooKeeper session if expired + getZooKeeper(); + + bool queued = false; + if (!watch_queue->tryPop(queued, /* timeout_ms: */ 10000)) + continue; + + refreshAllEntities(getZooKeeper()); + } + catch (...) + { + tryLogCurrentException(log, "Will try to restart watching thread after error"); + zookeeper_getter.resetCache(); + sleepForSeconds(5); + } + } + + LOG_DEBUG(log, "Stopped watching thread"); +} + + +void WorkloadEntityKeeperStorage::stopWatching() +{ + stopWatchingThread(); +} + +void WorkloadEntityKeeperStorage::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper) +{ + zookeeper->createAncestors(zookeeper_path); + // If node does not exist we consider it to be equal to empty node: no workload entities + zookeeper->createIfNotExists(zookeeper_path, ""); +} + +WorkloadEntityStorageBase::OperationResult WorkloadEntityKeeperStorage::storeEntityImpl( + const ContextPtr & /*current_context*/, + WorkloadEntityType entity_type, + const String & entity_name, + ASTPtr create_entity_query, + bool /*throw_if_exists*/, + bool /*replace_if_exists*/, + const Settings &) +{ + LOG_DEBUG(log, "Storing workload entity {}", backQuote(entity_name)); + + String new_data = serializeAllEntities(Event{entity_type, entity_name, create_entity_query}); + auto zookeeper = getZooKeeper(); + + Coordination::Stat stat; + auto code = zookeeper->trySet(zookeeper_path, new_data, current_version, &stat); + if (code != Coordination::Error::ZOK) + { + refreshAllEntities(zookeeper); + return OperationResult::Retry; + } + + current_version = stat.version; + + LOG_DEBUG(log, "Workload entity {} stored", backQuote(entity_name)); + + return OperationResult::Ok; +} + + +WorkloadEntityStorageBase::OperationResult WorkloadEntityKeeperStorage::removeEntityImpl( + const ContextPtr & /*current_context*/, + WorkloadEntityType entity_type, + const String & entity_name, + bool /*throw_if_not_exists*/) +{ + LOG_DEBUG(log, "Removing workload entity {}", backQuote(entity_name)); + + String new_data = serializeAllEntities(Event{entity_type, entity_name, {}}); + auto zookeeper = getZooKeeper(); + + Coordination::Stat stat; + auto code = zookeeper->trySet(zookeeper_path, new_data, current_version, &stat); + if (code != Coordination::Error::ZOK) + { + refreshAllEntities(zookeeper); + return OperationResult::Retry; + } + + current_version = stat.version; + + LOG_DEBUG(log, "Workload entity {} removed", backQuote(entity_name)); + + return OperationResult::Ok; +} + +std::pair WorkloadEntityKeeperStorage::getDataAndSetWatch(const zkutil::ZooKeeperPtr & zookeeper) +{ + const auto data_watcher = [my_watch_queue = watch_queue](const Coordination::WatchResponse & response) + { + if (response.type == Coordination::Event::CHANGED) + { + [[maybe_unused]] bool inserted = my_watch_queue->emplace(true); + /// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called). + } + }; + + Coordination::Stat stat; + String data; + bool exists = zookeeper->tryGetWatch(zookeeper_path, data, &stat, data_watcher); + if (!exists) + { + createRootNodes(zookeeper); + data = zookeeper->getWatch(zookeeper_path, &stat, data_watcher); + } + return {data, stat.version}; +} + +void WorkloadEntityKeeperStorage::refreshAllEntities(const zkutil::ZooKeeperPtr & zookeeper) +{ + /// It doesn't make sense to keep the old watch events because we will reread everything in this function. + watch_queue->clear(); + + refreshEntities(zookeeper); + entities_loaded = true; +} + +void WorkloadEntityKeeperStorage::refreshEntities(const zkutil::ZooKeeperPtr & zookeeper) +{ + LOG_DEBUG(log, "Refreshing workload entities"); + auto [data, version] = getDataAndSetWatch(zookeeper); + + ASTs queries; + ParserCreateWorkloadEntity parser; + const char * begin = data.data(); /// begin of current query + const char * pos = begin; /// parser moves pos from begin to the end of current query + const char * end = begin + data.size(); + while (pos < end) + { + queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS)); + while (isWhitespaceASCII(*pos) || *pos == ';') + ++pos; + } + + /// Read & parse all SQL entities from data we just read from ZooKeeper + std::vector> new_entities; + for (const auto & query : queries) + { + if (auto * create_workload_query = query->as()) + new_entities.emplace_back(create_workload_query->getWorkloadName(), query); + else if (auto * create_resource_query = query->as()) + new_entities.emplace_back(create_resource_query->getResourceName(), query); + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid workload entity query in keeper storage: {}", query->getID()); + } + + setAllEntities(new_entities); + current_version = version; + + LOG_DEBUG(log, "Workload entities refreshing is done"); +} + +} + diff --git a/src/Common/Scheduler/Workload/WorkloadEntityKeeperStorage.h b/src/Common/Scheduler/Workload/WorkloadEntityKeeperStorage.h index 6f70f09beec..523be850d8d 100644 --- a/src/Common/Scheduler/Workload/WorkloadEntityKeeperStorage.h +++ b/src/Common/Scheduler/Workload/WorkloadEntityKeeperStorage.h @@ -1 +1,70 @@ #pragma once + +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +/// Loads RESOURCE and WORKLOAD sql objects from Keeper. +class WorkloadEntityKeeperStorage : public WorkloadEntityStorageBase +{ +public: + WorkloadEntityKeeperStorage(const ContextPtr & global_context_, const String & zookeeper_path_); + ~WorkloadEntityKeeperStorage() override; + + bool isReplicated() const override { return true; } + String getReplicationID() const override { return zookeeper_path; } + + void loadEntities() override; + void stopWatching() override; + +private: + OperationResult storeEntityImpl( + const ContextPtr & current_context, + WorkloadEntityType entity_type, + const String & entity_name, + ASTPtr create_entity_query, + bool throw_if_exists, + bool replace_if_exists, + const Settings & settings) override; + + OperationResult removeEntityImpl( + const ContextPtr & current_context, + WorkloadEntityType entity_type, + const String & entity_name, + bool throw_if_not_exists) override; + + void processWatchQueue(); + + zkutil::ZooKeeperPtr getZooKeeper(); + + void startWatchingThread(); + void stopWatchingThread(); + + void createRootNodes(const zkutil::ZooKeeperPtr & zookeeper); + + std::pair getDataAndSetWatch(const zkutil::ZooKeeperPtr & zookeeper); + + void refreshAllEntities(const zkutil::ZooKeeperPtr & zookeeper); // TODO(serxa): get rid of it + void refreshEntities(const zkutil::ZooKeeperPtr & zookeeper); + + zkutil::ZooKeeperCachingGetter zookeeper_getter; + String zookeeper_path; + Int32 current_version = 0; + + ThreadFromGlobalPool watching_thread; + std::atomic entities_loaded = false; + std::atomic watching_flag = false; + + std::shared_ptr> watch_queue; // TODO(serxa): rework it into something that is not a queue + + LoggerPtr log; +}; + +} diff --git a/src/Common/Scheduler/Workload/WorkloadEntityStorageBase.cpp b/src/Common/Scheduler/Workload/WorkloadEntityStorageBase.cpp index 060bbbd6f87..0cd872f4890 100644 --- a/src/Common/Scheduler/Workload/WorkloadEntityStorageBase.cpp +++ b/src/Common/Scheduler/Workload/WorkloadEntityStorageBase.cpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include @@ -13,7 +15,6 @@ #include #include - namespace DB { @@ -26,6 +27,7 @@ namespace ErrorCodes namespace { +/// Removes details from a CREATE query to be used as workload entity definition ASTPtr normalizeCreateWorkloadEntityQuery(const IAST & create_query) { auto ptr = create_query.clone(); @@ -42,6 +44,7 @@ ASTPtr normalizeCreateWorkloadEntityQuery(const IAST & create_query) return ptr; } +/// Returns a type of a workload entity `ptr` WorkloadEntityType getEntityType(const ASTPtr & ptr) { if (auto * res = typeid_cast(ptr.get())) @@ -52,12 +55,38 @@ WorkloadEntityType getEntityType(const ASTPtr & ptr) return WorkloadEntityType::MAX; } +bool entityEquals(const ASTPtr & lhs, const ASTPtr & rhs) +{ + if (auto * a = typeid_cast(lhs.get())) + { + if (auto * b = typeid_cast(rhs.get())) + { + return std::forward_as_tuple(a->getWorkloadName(), a->getWorkloadParent(), a->changes) + == std::forward_as_tuple(b->getWorkloadName(), b->getWorkloadParent(), b->changes); + } + } + if (auto * a = typeid_cast(lhs.get())) + { + if (auto * b = typeid_cast(rhs.get())) + return std::forward_as_tuple(a->getResourceName(), a->operations) + == std::forward_as_tuple(b->getResourceName(), b->operations); + } + return false; +} + +/// Workload entities could reference each other. +/// This enum defines all possible reference types enum class ReferenceType { - Parent, ForResource + Parent, // Source workload references target workload as a parent + ForResource // Source workload references target resource in its `SETTINGS x = y FOR resource` clause }; -void forEachReference(const ASTPtr & source_entity, std::function func) +/// Runs a `func` callback for every reference from `source` to `target`. +/// This function is the source of truth defining what `target` references are stored in a workload `source_entity` +void forEachReference( + const ASTPtr & source_entity, + std::function func) { if (auto * res = typeid_cast(source_entity.get())) { @@ -82,6 +111,7 @@ void forEachReference(const ASTPtr & source_entity, std::function & workloads, std::unordered_set & visited, std::vector> & sorted_workloads) { if (visited.contains(name)) @@ -101,6 +131,7 @@ void topologicallySortedWorkloadsImpl(const String & name, const ASTPtr & ast, c sorted_workloads.emplace_back(name, ast); } +/// Returns pairs {worload_name, create_workload_ast} in order that respect child-parent relation (parent first, then children) std::vector> topologicallySortedWorkloads(const std::unordered_map & workloads) { std::vector> sorted_workloads; @@ -110,6 +141,143 @@ std::vector> topologicallySortedWorkloads(const std::u return sorted_workloads; } +/// Helper for recursive DFS +void topologicallySortedDependenciesImpl( + const String & name, + const std::unordered_map> & dependencies, + std::unordered_set & visited, + std::vector & result) +{ + if (visited.contains(name)) + return; + visited.insert(name); + + if (auto it = dependencies.find(name); it != dependencies.end()) + { + for (const String & dep : it->second) + topologicallySortedDependenciesImpl(dep, dependencies, visited, result); + } + + result.emplace_back(name); +} + +/// Returns nodes in topological order that respect `dependencies` (key is node name, value is set of dependencies) +std::vector topologicallySortedDependencies(const std::unordered_map> & dependencies) { + std::unordered_set visited; // Set to track visited nodes + std::vector result; // Result to store nodes in topologically sorted order + + // Perform DFS for each node in the graph + for (const auto & [name, _] : dependencies) + topologicallySortedDependenciesImpl(name, dependencies, visited, result); + + // Reverse the result to get the correct topological order + std::reverse(result.begin(), result.end()); + + return result; +} + +/// Represents a change of a workload entity (WORKLOAD or RESOURCE) +struct EntityChange +{ + String name; /// Name of entity + ASTPtr before; /// Entity before change (CREATE if not set) + ASTPtr after; /// Entity after change (DROP if not set) + + std::vector toEvents() const + { + if (!after) + return {{getEntityType(before), name, {}}}; + else if (!before) + return {{getEntityType(after), name, after}}; + else + { + auto type_before = getEntityType(before); + auto type_after = getEntityType(after); + // If type changed, we have to remove an old entity and add a new one + if (type_before != type_after) + return {{type_before, name, {}}, {type_after, name, after}}; + else + return {{type_after, name, after}}; + } + } +}; + +/// Returns `changes` ordered for execution. +/// Every intemediate state during execution will be consistent (i.e. all references will be valid) +/// NOTE: It does not validate changes, any problem will be detected during execution. +/// NOTE: There will be no error if valid order does not exist. +std::vector topologicallySortedChanges(const std::vector & changes) +{ + // Construct map from entity name into entity change + std::unordered_map change_by_name; + for (const auto & change : changes) + change_by_name[change.name] = &change; + + // Construct references maps (before changes and after changes) + std::unordered_map> old_sources; // Key is target. Value is set of names of source entities. + std::unordered_map> new_targets; // Key is source. Value is set of names of target entities. + for (const auto & change : changes) + { + if (change.before) + { + forEachReference(change.before, + [&] (const String & target, const String & source, ReferenceType) + { + old_sources[target].insert(source); + }); + } + if (change.after) + { + forEachReference(change.after, + [&] (const String & target, const String & source, ReferenceType) + { + new_targets[source].insert(target); + }); + } + } + + // There are consistency rules that regulate order in which changes must be applied (see below). + // Construct DAG of dependencies between changes. + std::unordered_map> dependencies; // Key is entity name. Value is set of names of entity that should be changed first. + for (const auto & change : changes) + { + for (const auto & event : change.toEvents()) + { + if (!event.entity) // DROP + { + // Rule 1: Entity can only be removed after all existing references to it are removed as well. + for (const String & source : old_sources[event.name]) + { + if (change_by_name.contains(source)) + dependencies[event.name].insert(source); + } + } + else // CREATE || CREATE OR REPLACE + { + // Rule 2: Entity can only be created after all entities it references are created as well. + for (const String & target : new_targets[event.name]) + { + if (auto it = change_by_name.find(target); it != change_by_name.end()) + { + const EntityChange & target_change = *it->second; + // If target is creating, it should be created first. + // (But if target is updating, there is no dependency). + if (!target_change.before) + dependencies[event.name].insert(target); + } + } + } + } + } + + // Topological sort of changes to respect consistency rules + std::vector result; + for (const String & name : topologicallySortedDependencies(dependencies)) + result.push_back(*change_by_name[name]); + + return result; +} + } WorkloadEntityStorageBase::WorkloadEntityStorageBase(ContextPtr global_context_) @@ -130,7 +298,7 @@ ASTPtr WorkloadEntityStorageBase::get(const String & entity_name) const return it->second; } -ASTPtr WorkloadEntityStorageBase::tryGet(const std::string & entity_name) const +ASTPtr WorkloadEntityStorageBase::tryGet(const String & entity_name) const { std::lock_guard lock(mutex); @@ -146,9 +314,9 @@ bool WorkloadEntityStorageBase::has(const String & entity_name) const return tryGet(entity_name) != nullptr; } -std::vector WorkloadEntityStorageBase::getAllEntityNames() const +std::vector WorkloadEntityStorageBase::getAllEntityNames() const { - std::vector entity_names; + std::vector entity_names; std::lock_guard lock(mutex); entity_names.reserve(entities.size()); @@ -159,9 +327,9 @@ std::vector WorkloadEntityStorageBase::getAllEntityNames() const return entity_names; } -std::vector WorkloadEntityStorageBase::getAllEntityNames(WorkloadEntityType entity_type) const +std::vector WorkloadEntityStorageBase::getAllEntityNames(WorkloadEntityType entity_type) const { - std::vector entity_names; + std::vector entity_names; std::lock_guard lock(mutex); for (const auto & [name, entity] : entities) @@ -195,110 +363,101 @@ bool WorkloadEntityStorageBase::storeEntity( auto * workload = typeid_cast(create_entity_query.get()); auto * resource = typeid_cast(create_entity_query.get()); - std::unique_lock lock{mutex}; - - ASTPtr old_entity; // entity to be REPLACED - if (auto it = entities.find(entity_name); it != entities.end()) + while (true) { - if (throw_if_exists) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists", entity_name); - else if (!replace_if_exists) - return false; - else - old_entity = it->second; - } + std::unique_lock lock{mutex}; - // Validate CREATE OR REPLACE - if (old_entity) - { - auto * old_workload = typeid_cast(old_entity.get()); - auto * old_resource = typeid_cast(old_entity.get()); - if (workload && !old_workload) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists, but it is not a workload", entity_name); - if (resource && !old_resource) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists, but it is not a resource", entity_name); - if (workload && !old_workload->hasParent() && workload->hasParent()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "It is not allowed to remove root workload"); - } - - std::optional new_root_name; - - // Validate workload - if (workload) - { - if (!workload->hasParent()) + ASTPtr old_entity; // entity to be REPLACED + if (auto it = entities.find(entity_name); it != entities.end()) { - if (!root_name.empty() && root_name != workload->getWorkloadName()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "The second root is not allowed. You should probably add 'PARENT {}' clause.", root_name); - new_root_name = workload->getWorkloadName(); + if (throw_if_exists) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists", entity_name); + else if (!replace_if_exists) + return false; + else + old_entity = it->second; } - SchedulingSettings validator; - validator.updateFromChanges(workload->changes); - } - - forEachReference(create_entity_query, - [this, workload] (const String & target, const String & source, ReferenceType type) + // Validate CREATE OR REPLACE + if (old_entity) { - if (auto it = entities.find(target); it == entities.end()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' references another workload entity '{}' that doesn't exist", source, target); + auto * old_workload = typeid_cast(old_entity.get()); + auto * old_resource = typeid_cast(old_entity.get()); + if (workload && !old_workload) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists, but it is not a workload", entity_name); + if (resource && !old_resource) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists, but it is not a resource", entity_name); + if (workload && !old_workload->hasParent() && workload->hasParent()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "It is not allowed to remove root workload"); + } - switch (type) + // Validate workload + if (workload) + { + if (!workload->hasParent()) { - case ReferenceType::Parent: - { - if (typeid_cast(entities[target].get()) == nullptr) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload parent should reference another workload, not '{}'.", target); - break; - } - case ReferenceType::ForResource: - { - if (typeid_cast(entities[target].get()) == nullptr) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload settings should reference resource in FOR clause, not '{}'.", target); - - // Validate that we could parse the settings for specific resource - SchedulingSettings validator; - validator.updateFromChanges(workload->changes, target); - break; - } + if (!root_name.empty() && root_name != workload->getWorkloadName()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The second root is not allowed. You should probably add 'PARENT {}' clause.", root_name); } - // Detect reference cycles. - // The only way to create a cycle is to add an edge that will be a part of a new cycle. - // We are going to add an edge: `source` -> `target`, so we ensure there is no path back `target` -> `source`. - if (isIndirectlyReferenced(source, target)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity cycles are not allowed"); - }); + SchedulingSettings validator; + validator.updateFromChanges(workload->changes); + } - bool stored = storeEntityImpl( - current_context, - entity_type, - entity_name, - create_entity_query, - throw_if_exists, - replace_if_exists, - settings); + forEachReference(create_entity_query, + [this, workload] (const String & target, const String & source, ReferenceType type) + { + if (auto it = entities.find(target); it == entities.end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' references another workload entity '{}' that doesn't exist", source, target); - if (stored) - { - if (new_root_name) - root_name = *new_root_name; + switch (type) + { + case ReferenceType::Parent: + { + if (typeid_cast(entities[target].get()) == nullptr) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload parent should reference another workload, not '{}'.", target); + break; + } + case ReferenceType::ForResource: + { + if (typeid_cast(entities[target].get()) == nullptr) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload settings should reference resource in FOR clause, not '{}'.", target); - // Remove references of a replaced entity (only for CREATE OR REPLACE) - removeReferences(old_entity); + // Validate that we could parse the settings for specific resource + SchedulingSettings validator; + validator.updateFromChanges(workload->changes, target); + break; + } + } - // Insert references of created entity - insertReferences(create_entity_query); + // Detect reference cycles. + // The only way to create a cycle is to add an edge that will be a part of a new cycle. + // We are going to add an edge: `source` -> `target`, so we ensure there is no path back `target` -> `source`. + if (isIndirectlyReferenced(source, target)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity cycles are not allowed"); + }); - // Store in memory - entities[entity_name] = create_entity_query; + auto result = storeEntityImpl( + current_context, + entity_type, + entity_name, + create_entity_query, + throw_if_exists, + replace_if_exists, + settings); - // Process notifications - onEntityAdded(entity_type, entity_name, create_entity_query); - unlockAndNotify(lock); + if (result == OperationResult::Retry) + continue; // Entities were updated, we need to rerun all the validations + + if (result == OperationResult::Ok) + { + Event event{entity_type, entity_name, create_entity_query}; + applyEvent(lock, event); + unlockAndNotify(lock, {std::move(event)}); + } + + return result == OperationResult::Ok; } - - return stored; } bool WorkloadEntityStorageBase::removeEntity( @@ -307,47 +466,44 @@ bool WorkloadEntityStorageBase::removeEntity( const String & entity_name, bool throw_if_not_exists) { - std::unique_lock lock(mutex); - auto it = entities.find(entity_name); - if (it == entities.end()) + while (true) { - if (throw_if_not_exists) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' doesn't exist", entity_name); - else - return false; + std::unique_lock lock(mutex); + auto it = entities.find(entity_name); + if (it == entities.end()) + { + if (throw_if_not_exists) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' doesn't exist", entity_name); + else + return false; + } + + if (auto reference_it = references.find(entity_name); reference_it != references.end()) + { + String names; + for (const String & name : reference_it->second) + names += " " + name; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' cannot be dropped. It is referenced by:{}", entity_name, names); + } + + auto result = removeEntityImpl( + current_context, + entity_type, + entity_name, + throw_if_not_exists); + + if (result == OperationResult::Retry) + continue; // Entities were updated, we need to rerun all the validations + + if (result == OperationResult::Ok) + { + Event event{entity_type, entity_name, {}}; + applyEvent(lock, event); + unlockAndNotify(lock, {std::move(event)}); + } + + return result == OperationResult::Ok; } - - if (auto reference_it = references.find(entity_name); reference_it != references.end()) - { - String names; - for (const String & name : reference_it->second) - names += " " + name; - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' cannot be dropped. It is referenced by:{}", entity_name, names); - } - - bool removed = removeEntityImpl( - current_context, - entity_type, - entity_name, - throw_if_not_exists); - - if (removed) - { - if (entity_name == root_name) - root_name.clear(); - - // Clean up references - removeReferences(it->second); - - // Remove from memory - entities.erase(it); - - // Process notifications - onEntityRemoved(entity_type, entity_name); - unlockAndNotify(lock); - } - - return removed; } scope_guard WorkloadEntityStorageBase::getAllEntitiesAndSubscribe(const OnChangedHandler & handler) @@ -357,9 +513,7 @@ scope_guard WorkloadEntityStorageBase::getAllEntitiesAndSubscribe(const OnChange std::vector current_state; { std::unique_lock lock{mutex}; - chassert(queue.empty()); - makeEventsForAllEntities(lock); - current_state = std::move(queue); + current_state = orderEntities(entities); std::lock_guard lock2{handlers->mutex}; handlers->list.push_back(handler); @@ -377,41 +531,30 @@ scope_guard WorkloadEntityStorageBase::getAllEntitiesAndSubscribe(const OnChange return result; } -void WorkloadEntityStorageBase::onEntityAdded(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & new_entity) +void WorkloadEntityStorageBase::unlockAndNotify( + std::unique_lock & lock, + std::vector tx) { - queue.push_back(Event{.type = entity_type, .name = entity_name, .entity = new_entity}); -} + if (tx.empty()) + return; -void WorkloadEntityStorageBase::onEntityRemoved(WorkloadEntityType entity_type, const String & entity_name) -{ - queue.push_back(Event{.type = entity_type, .name = entity_name, .entity = {}}); -} - -void WorkloadEntityStorageBase::unlockAndNotify(std::unique_lock & mutex_lock) -{ - /// Only one thread can send notification at any time, that is why we need `mutex_lock` - if (!queue.empty()) + std::vector current_handlers; { - auto events = std::move(queue); + std::lock_guard handlers_lock{handlers->mutex}; + boost::range::copy(handlers->list, std::back_inserter(current_handlers)); + } - std::vector current_handlers; + lock.unlock(); + + for (const auto & handler : current_handlers) + { + try { - std::lock_guard handlers_lock{handlers->mutex}; - boost::range::copy(handlers->list, std::back_inserter(current_handlers)); + handler(tx); } - - mutex_lock.unlock(); - - for (const auto & handler : current_handlers) + catch (...) { - try - { - handler(events); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + tryLogCurrentException(__PRETTY_FUNCTION__); } } } @@ -421,52 +564,84 @@ std::unique_lock WorkloadEntityStorageBase::getLock() cons return std::unique_lock{mutex}; } -void WorkloadEntityStorageBase::setAllEntities(const std::vector> & new_entities) +void WorkloadEntityStorageBase::setAllEntities(const std::vector> & raw_new_entities) { - std::unordered_map normalized_entities; - for (const auto & [entity_name, create_query] : new_entities) - normalized_entities[entity_name] = normalizeCreateWorkloadEntityQuery(*create_query); - - // TODO(serxa): do validation and throw LOGICAL_ERROR if failed + std::unordered_map new_entities; + for (const auto & [entity_name, create_query] : raw_new_entities) + new_entities[entity_name] = normalizeCreateWorkloadEntityQuery(*create_query); std::unique_lock lock(mutex); - chassert(entities.empty()); // TODO(serxa): keeper storage could do full refresh, so we should support it here - entities = std::move(normalized_entities); - for (const auto & [entity_name, entity] : entities) - insertReferences(entity); - // Quick check to avoid extra work + // Fill vector of `changes` based on difference between current `entities` and `new_entities` + std::vector changes; + for (const auto & [entity_name, entity] : entities) { - std::lock_guard lock2(handlers->mutex); - if (handlers->list.empty()) - return; + if (auto it = new_entities.find(entity_name); it != new_entities.end()) + { + if (!entityEquals(entity, it->second)) + changes.emplace_back(entity_name, entity, it->second); // Remove entities that are not present in `new_entities` + } + else + changes.emplace_back(entity_name, entity, ASTPtr{}); // Update entities that are present in both `new_entities` and `entities` + } + for (const auto & [entity_name, entity] : new_entities) + { + if (!entities.contains(entity_name)) + changes.emplace_back(entity_name, ASTPtr{}, entity); // Create entities that are only present in `new_entities` } - makeEventsForAllEntities(lock); - unlockAndNotify(lock); + // Sort `changes` to respect consistency of references and apply them one by one. + std::vector tx; + for (const auto & change : topologicallySortedChanges(changes)) + { + for (const auto & event : change.toEvents()) + { + // TODO(serxa): do validation and throw LOGICAL_ERROR if failed + applyEvent(lock, event); + tx.push_back(event); + } + } + + // Notify subscribers + unlockAndNotify(lock, tx); } -void WorkloadEntityStorageBase::makeEventsForAllEntities(std::unique_lock &) +void WorkloadEntityStorageBase::applyEvent( + std::unique_lock &, + const Event & event) { - std::unordered_map workloads; - std::unordered_map resources; - for (auto & [entity_name, ast] : entities) + if (event.entity) // CREATE || CREATE OR REPLACE { - if (typeid_cast(ast.get())) - workloads.emplace(entity_name, ast); - else if (typeid_cast(ast.get())) - resources.emplace(entity_name, ast); - else - throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid workload entity type '{}'", ast->getID()); + auto * workload = typeid_cast(event.entity.get()); + + // Validate workload + if (workload && !workload->hasParent()) + root_name = workload->getWorkloadName(); + + // Remove references of a replaced entity (only for CREATE OR REPLACE) + if (auto it = entities.find(event.name); it != entities.end()) + removeReferences(it->second); + + // Insert references of created entity + insertReferences(event.entity); + + // Store in memory + entities[event.name] = event.entity; } + else // DROP + { + auto it = entities.find(event.name); + chassert(it != entities.end()); - // Resources should be created first because workloads could reference them - for (auto & [entity_name, ast] : resources) - onEntityAdded(WorkloadEntityType::Resource, entity_name, ast); + if (event.name == root_name) + root_name.clear(); - // Workloads should be created in an order such that children are created only after its parent is created - for (auto & [entity_name, ast] : topologicallySortedWorkloads(workloads)) - onEntityAdded(WorkloadEntityType::Workload, entity_name, ast); + // Clean up references + removeReferences(it->second); + + // Remove from memory + entities.erase(it); + } } std::vector> WorkloadEntityStorageBase::getAllEntities() const @@ -528,4 +703,59 @@ void WorkloadEntityStorageBase::removeReferences(const ASTPtr & entity) }); } +std::vector WorkloadEntityStorageBase::orderEntities( + const std::unordered_map & all_entities, + std::optional change) +{ + std::vector result; + + std::unordered_map workloads; + for (auto & [entity_name, ast] : all_entities) + { + if (typeid_cast(ast.get())) + { + if (change && change->name == entity_name) + continue; // Skip this workload if it is removed or updated + workloads.emplace(entity_name, ast); + } + else if (typeid_cast(ast.get())) + { + if (change && change->name == entity_name) + continue; // Skip this resource if it is removed or updated + // Resources should go first because workloads could reference them + result.emplace_back(WorkloadEntityType::Resource, entity_name, ast); + } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid workload entity type '{}'", ast->getID()); + } + + // Introduce new entity described by `change` + if (change && change->entity) + { + if (change->type == WorkloadEntityType::Workload) + workloads.emplace(change->name, change->entity); + else if (change->type == WorkloadEntityType::Resource) + result.emplace_back(WorkloadEntityType::Resource, change->name, change->entity); + } + + // Workloads should go in an order such that children are enlisted only after its parent + for (auto & [entity_name, ast] : topologicallySortedWorkloads(workloads)) + result.emplace_back(WorkloadEntityType::Workload, entity_name, ast); + + return result; +} + +String WorkloadEntityStorageBase::serializeAllEntities(std::optional change) +{ + std::unique_lock lock; + auto ordered_entities = orderEntities(entities, change); + WriteBufferFromOwnString buf; + for (const auto & event : ordered_entities) + { + formatAST(*event.entity, buf, false, true); + buf.write(";\n", 2); + } + return buf.str(); +} + } diff --git a/src/Common/Scheduler/Workload/WorkloadEntityStorageBase.h b/src/Common/Scheduler/Workload/WorkloadEntityStorageBase.h index e1f43181a0c..905c80610c2 100644 --- a/src/Common/Scheduler/Workload/WorkloadEntityStorageBase.h +++ b/src/Common/Scheduler/Workload/WorkloadEntityStorageBase.h @@ -49,7 +49,14 @@ public: const OnChangedHandler & handler) override; protected: - virtual bool storeEntityImpl( + enum class OperationResult + { + Ok, + Failed, + Retry + }; + + virtual OperationResult storeEntityImpl( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, @@ -58,7 +65,7 @@ protected: bool replace_if_exists, const Settings & settings) = 0; - virtual bool removeEntityImpl( + virtual OperationResult removeEntityImpl( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, @@ -66,18 +73,21 @@ protected: std::unique_lock getLock() const; + /// Replace current `entities` with `new_entities` and notifies subscribers. + /// Note that subscribers will be notified with a sequence of events. + /// It is guaranteed that all itermediate states (between every pair of consecutive events) + /// will be consistent (all references between entities will be valid) void setAllEntities(const std::vector> & new_entities); - void makeEventsForAllEntities(std::unique_lock & lock); - /// Called by derived class after a new workload entity has been added. - void onEntityAdded(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & new_entity); + /// Serialize `entities` stored in memory plus one optional `change` into multiline string + String serializeAllEntities(std::optional change = {}); - /// Called by derived class after an workload entity has been removed. - void onEntityRemoved(WorkloadEntityType entity_type, const String & entity_name); +private: + /// Change state in memory + void applyEvent(std::unique_lock & lock, const Event & event); - /// Sends notifications to subscribers about changes in workload entities - /// (added with previous calls onEntityAdded(), onEntityRemoved()). - void unlockAndNotify(std::unique_lock & lock); + /// Notify subscribers about changes describe by vector of events `tx` + void unlockAndNotify(std::unique_lock & lock, std::vector tx); /// Return true iff `references` has a path from `source` to `target` bool isIndirectlyReferenced(const String & target, const String & source); @@ -88,6 +98,11 @@ protected: /// Removes references that are described by `entity` from `references` void removeReferences(const ASTPtr & entity); + /// Returns an ordered vector of `entities` + std::vector orderEntities( + const std::unordered_map & all_entitites, + std::optional change = {}); + struct Handlers { std::mutex mutex; @@ -96,15 +111,14 @@ protected: /// shared_ptr is here for safety because WorkloadEntityStorageBase can be destroyed before all subscriptions are removed. std::shared_ptr handlers; - std::vector queue; - mutable std::recursive_mutex mutex; std::unordered_map entities; /// Maps entity name into CREATE entity query // Validation - std::unordered_map> references; /// Keep track of references between entities. Key is target. Values is set of sources + std::unordered_map> references; /// Keep track of references between entities. Key is target. Value is set of sources String root_name; /// current root workload name +protected: ContextPtr global_context; }; diff --git a/src/Common/Scheduler/Workload/createWorkloadEntityStorage.cpp b/src/Common/Scheduler/Workload/createWorkloadEntityStorage.cpp index 8475fe21455..5dc1265e31d 100644 --- a/src/Common/Scheduler/Workload/createWorkloadEntityStorage.cpp +++ b/src/Common/Scheduler/Workload/createWorkloadEntityStorage.cpp @@ -34,15 +34,12 @@ std::unique_ptr createWorkloadEntityStorage(const Contex zookeeper_path_key, disk_path_key); } - abort(); // TODO(serxa): create WorkloadEntityKeeperStorage object - //return std::make_unique(global_context, config.getString(zookeeper_path_key)); - } - else - { - String default_path = fs::path{global_context->getPath()} / "workload" / ""; - String path = config.getString(disk_path_key, default_path); - return std::make_unique(global_context, path); + return std::make_unique(global_context, config.getString(zookeeper_path_key)); } + + String default_path = fs::path{global_context->getPath()} / "workload" / ""; + String path = config.getString(disk_path_key, default_path); + return std::make_unique(global_context, path); } } diff --git a/src/Parsers/ASTCreateResourceQuery.h b/src/Parsers/ASTCreateResourceQuery.h index b05176837bc..f1c762e5bcd 100644 --- a/src/Parsers/ASTCreateResourceQuery.h +++ b/src/Parsers/ASTCreateResourceQuery.h @@ -19,6 +19,9 @@ public: { AccessMode mode; String disk; + + friend bool operator ==(const Operation & lhs, const Operation & rhs) { return lhs.mode == rhs.mode && lhs.disk == rhs.disk; } + friend bool operator !=(const Operation & lhs, const Operation & rhs) { return !(lhs == rhs); } }; using Operations = std::vector; diff --git a/src/Parsers/ParserCreateWorkloadEntity.cpp b/src/Parsers/ParserCreateWorkloadEntity.cpp new file mode 100644 index 00000000000..013210a6d87 --- /dev/null +++ b/src/Parsers/ParserCreateWorkloadEntity.cpp @@ -0,0 +1,16 @@ +#include +#include +#include + +namespace DB +{ + +bool ParserCreateWorkloadEntity::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + ParserCreateWorkloadQuery create_workload_p; + ParserCreateResourceQuery create_resource_p; + + return create_workload_p.parse(pos, node, expected) || create_resource_p.parse(pos, node, expected); +} + +} diff --git a/src/Parsers/ParserCreateWorkloadEntity.h b/src/Parsers/ParserCreateWorkloadEntity.h new file mode 100644 index 00000000000..1e7b78b3ccc --- /dev/null +++ b/src/Parsers/ParserCreateWorkloadEntity.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ + +/// Special parser for the CREATE WORKLOAD and CREATE RESOURCE queries. +class ParserCreateWorkloadEntity : public IParserBase +{ +protected: + const char * getName() const override { return "CREATE workload entity query"; } + + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + +} diff --git a/tests/integration/test_scheduler/configs/storage_configuration.xml b/tests/integration/test_scheduler/configs/storage_configuration.xml index 16cdf4a5b15..9498044c836 100644 --- a/tests/integration/test_scheduler/configs/storage_configuration.xml +++ b/tests/integration/test_scheduler/configs/storage_configuration.xml @@ -1,4 +1,5 @@ + /clickhouse/workload/definitions.sql diff --git a/tests/integration/test_scheduler/test.py b/tests/integration/test_scheduler/test.py index b78376bffe2..40c5f7e11ed 100644 --- a/tests/integration/test_scheduler/test.py +++ b/tests/integration/test_scheduler/test.py @@ -24,6 +24,7 @@ node = cluster.add_instance( "configs/workloads.xml.default", ], with_minio=True, + with_zookeeper=True, )