implement workload entity storage based on keeper

This commit is contained in:
serxa 2024-10-14 21:17:01 +00:00
parent 912d59d2c8
commit bec2db7b79
13 changed files with 852 additions and 230 deletions

View File

@ -112,7 +112,7 @@ public:
}
private:
bool storeEntityImpl(
WorkloadEntityStorageBase::OperationResult storeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
@ -122,17 +122,17 @@ private:
const Settings & settings) override
{
UNUSED(current_context, entity_type, entity_name, create_entity_query, throw_if_exists, replace_if_exists, settings);
return true;
return OperationResult::Ok;
}
bool removeEntityImpl(
WorkloadEntityStorageBase::OperationResult removeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
bool throw_if_not_exists) override
{
UNUSED(current_context, entity_type, entity_name, throw_if_not_exists);
return true;
return OperationResult::Ok;
}
};

View File

@ -198,7 +198,7 @@ void WorkloadEntityDiskStorage::createDirectory()
}
bool WorkloadEntityDiskStorage::storeEntityImpl(
WorkloadEntityStorageBase::OperationResult WorkloadEntityDiskStorage::storeEntityImpl(
const ContextPtr & /*current_context*/,
WorkloadEntityType entity_type,
const String & entity_name,
@ -216,7 +216,7 @@ bool WorkloadEntityDiskStorage::storeEntityImpl(
if (throw_if_exists)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists", entity_name);
else if (!replace_if_exists)
return false;
return OperationResult::Failed;
}
WriteBufferFromOwnString create_statement_buf;
@ -247,11 +247,11 @@ bool WorkloadEntityDiskStorage::storeEntityImpl(
}
LOG_TRACE(log, "Entity {} stored", backQuote(entity_name));
return true;
return OperationResult::Ok;
}
bool WorkloadEntityDiskStorage::removeEntityImpl(
WorkloadEntityStorageBase::OperationResult WorkloadEntityDiskStorage::removeEntityImpl(
const ContextPtr & /*current_context*/,
WorkloadEntityType entity_type,
const String & entity_name,
@ -267,11 +267,11 @@ bool WorkloadEntityDiskStorage::removeEntityImpl(
if (throw_if_not_exists)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' doesn't exist", entity_name);
else
return false;
return OperationResult::Failed;
}
LOG_TRACE(log, "Entity {} removed", backQuote(entity_name));
return true;
return OperationResult::Ok;
}

View File

@ -16,7 +16,7 @@ public:
void loadEntities() override;
private:
bool storeEntityImpl(
OperationResult storeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
@ -25,7 +25,7 @@ private:
bool replace_if_exists,
const Settings & settings) override;
bool removeEntityImpl(
OperationResult removeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,

View File

@ -0,0 +1,274 @@
#include <Common/Scheduler/Workload/WorkloadEntityKeeperStorage.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTCreateResourceQuery.h>
#include <Parsers/ParserCreateWorkloadEntity.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <base/sleep.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <Core/Settings.h>
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 max_parser_backtracks;
extern const SettingsUInt64 max_parser_depth;
}
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
WorkloadEntityKeeperStorage::WorkloadEntityKeeperStorage(
const ContextPtr & global_context_, const String & zookeeper_path_)
: WorkloadEntityStorageBase(global_context_)
, zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }}
, zookeeper_path{zookeeper_path_}
, watch_queue{std::make_shared<ConcurrentBoundedQueue<bool>>(std::numeric_limits<size_t>::max())}
, log{getLogger("WorkloadEntityKeeperStorage")}
{
if (zookeeper_path.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ZooKeeper path must be non-empty");
if (zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
}
WorkloadEntityKeeperStorage::~WorkloadEntityKeeperStorage()
{
SCOPE_EXIT_SAFE(stopWatchingThread());
}
void WorkloadEntityKeeperStorage::startWatchingThread()
{
if (!watching_flag.exchange(true))
watching_thread = ThreadFromGlobalPool(&WorkloadEntityKeeperStorage::processWatchQueue, this);
}
void WorkloadEntityKeeperStorage::stopWatchingThread()
{
if (watching_flag.exchange(false))
{
watch_queue->finish();
if (watching_thread.joinable())
watching_thread.join();
}
}
zkutil::ZooKeeperPtr WorkloadEntityKeeperStorage::getZooKeeper()
{
auto [zookeeper, session_status] = zookeeper_getter.getZooKeeper();
if (session_status == zkutil::ZooKeeperCachingGetter::SessionStatus::New)
{
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state.
zookeeper->sync(zookeeper_path);
createRootNodes(zookeeper);
refreshAllEntities(zookeeper);
}
return zookeeper;
}
void WorkloadEntityKeeperStorage::loadEntities()
{
/// loadEntities() is called at start from Server::main(), so it's better not to stop here on no connection to ZooKeeper or any other error.
/// However the watching thread must be started anyway in case the connection will be established later.
if (!entities_loaded)
{
try
{
refreshAllEntities(getZooKeeper());
startWatchingThread();
}
catch (...)
{
tryLogCurrentException(log, "Failed to load workload entities");
}
}
startWatchingThread();
}
void WorkloadEntityKeeperStorage::processWatchQueue()
{
LOG_DEBUG(log, "Started watching thread");
setThreadName("WrkldEntWatch");
while (watching_flag)
{
try
{
/// Re-initialize ZooKeeper session if expired
getZooKeeper();
bool queued = false;
if (!watch_queue->tryPop(queued, /* timeout_ms: */ 10000))
continue;
refreshAllEntities(getZooKeeper());
}
catch (...)
{
tryLogCurrentException(log, "Will try to restart watching thread after error");
zookeeper_getter.resetCache();
sleepForSeconds(5);
}
}
LOG_DEBUG(log, "Stopped watching thread");
}
void WorkloadEntityKeeperStorage::stopWatching()
{
stopWatchingThread();
}
void WorkloadEntityKeeperStorage::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper)
{
zookeeper->createAncestors(zookeeper_path);
// If node does not exist we consider it to be equal to empty node: no workload entities
zookeeper->createIfNotExists(zookeeper_path, "");
}
WorkloadEntityStorageBase::OperationResult WorkloadEntityKeeperStorage::storeEntityImpl(
const ContextPtr & /*current_context*/,
WorkloadEntityType entity_type,
const String & entity_name,
ASTPtr create_entity_query,
bool /*throw_if_exists*/,
bool /*replace_if_exists*/,
const Settings &)
{
LOG_DEBUG(log, "Storing workload entity {}", backQuote(entity_name));
String new_data = serializeAllEntities(Event{entity_type, entity_name, create_entity_query});
auto zookeeper = getZooKeeper();
Coordination::Stat stat;
auto code = zookeeper->trySet(zookeeper_path, new_data, current_version, &stat);
if (code != Coordination::Error::ZOK)
{
refreshAllEntities(zookeeper);
return OperationResult::Retry;
}
current_version = stat.version;
LOG_DEBUG(log, "Workload entity {} stored", backQuote(entity_name));
return OperationResult::Ok;
}
WorkloadEntityStorageBase::OperationResult WorkloadEntityKeeperStorage::removeEntityImpl(
const ContextPtr & /*current_context*/,
WorkloadEntityType entity_type,
const String & entity_name,
bool /*throw_if_not_exists*/)
{
LOG_DEBUG(log, "Removing workload entity {}", backQuote(entity_name));
String new_data = serializeAllEntities(Event{entity_type, entity_name, {}});
auto zookeeper = getZooKeeper();
Coordination::Stat stat;
auto code = zookeeper->trySet(zookeeper_path, new_data, current_version, &stat);
if (code != Coordination::Error::ZOK)
{
refreshAllEntities(zookeeper);
return OperationResult::Retry;
}
current_version = stat.version;
LOG_DEBUG(log, "Workload entity {} removed", backQuote(entity_name));
return OperationResult::Ok;
}
std::pair<String, Int32> WorkloadEntityKeeperStorage::getDataAndSetWatch(const zkutil::ZooKeeperPtr & zookeeper)
{
const auto data_watcher = [my_watch_queue = watch_queue](const Coordination::WatchResponse & response)
{
if (response.type == Coordination::Event::CHANGED)
{
[[maybe_unused]] bool inserted = my_watch_queue->emplace(true);
/// `inserted` can be false if `watch_queue` was already finalized (which happens when stopWatching() is called).
}
};
Coordination::Stat stat;
String data;
bool exists = zookeeper->tryGetWatch(zookeeper_path, data, &stat, data_watcher);
if (!exists)
{
createRootNodes(zookeeper);
data = zookeeper->getWatch(zookeeper_path, &stat, data_watcher);
}
return {data, stat.version};
}
void WorkloadEntityKeeperStorage::refreshAllEntities(const zkutil::ZooKeeperPtr & zookeeper)
{
/// It doesn't make sense to keep the old watch events because we will reread everything in this function.
watch_queue->clear();
refreshEntities(zookeeper);
entities_loaded = true;
}
void WorkloadEntityKeeperStorage::refreshEntities(const zkutil::ZooKeeperPtr & zookeeper)
{
LOG_DEBUG(log, "Refreshing workload entities");
auto [data, version] = getDataAndSetWatch(zookeeper);
ASTs queries;
ParserCreateWorkloadEntity parser;
const char * begin = data.data(); /// begin of current query
const char * pos = begin; /// parser moves pos from begin to the end of current query
const char * end = begin + data.size();
while (pos < end)
{
queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS));
while (isWhitespaceASCII(*pos) || *pos == ';')
++pos;
}
/// Read & parse all SQL entities from data we just read from ZooKeeper
std::vector<std::pair<String, ASTPtr>> new_entities;
for (const auto & query : queries)
{
if (auto * create_workload_query = query->as<ASTCreateWorkloadQuery>())
new_entities.emplace_back(create_workload_query->getWorkloadName(), query);
else if (auto * create_resource_query = query->as<ASTCreateResourceQuery>())
new_entities.emplace_back(create_resource_query->getResourceName(), query);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid workload entity query in keeper storage: {}", query->getID());
}
setAllEntities(new_entities);
current_version = version;
LOG_DEBUG(log, "Workload entities refreshing is done");
}
}

View File

@ -1 +1,70 @@
#pragma once
#include <Common/Scheduler/Workload/WorkloadEntityStorageBase.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/ZooKeeperCachingGetter.h>
namespace DB
{
/// Loads RESOURCE and WORKLOAD sql objects from Keeper.
class WorkloadEntityKeeperStorage : public WorkloadEntityStorageBase
{
public:
WorkloadEntityKeeperStorage(const ContextPtr & global_context_, const String & zookeeper_path_);
~WorkloadEntityKeeperStorage() override;
bool isReplicated() const override { return true; }
String getReplicationID() const override { return zookeeper_path; }
void loadEntities() override;
void stopWatching() override;
private:
OperationResult storeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
ASTPtr create_entity_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
OperationResult removeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
bool throw_if_not_exists) override;
void processWatchQueue();
zkutil::ZooKeeperPtr getZooKeeper();
void startWatchingThread();
void stopWatchingThread();
void createRootNodes(const zkutil::ZooKeeperPtr & zookeeper);
std::pair<String, Int32> getDataAndSetWatch(const zkutil::ZooKeeperPtr & zookeeper);
void refreshAllEntities(const zkutil::ZooKeeperPtr & zookeeper); // TODO(serxa): get rid of it
void refreshEntities(const zkutil::ZooKeeperPtr & zookeeper);
zkutil::ZooKeeperCachingGetter zookeeper_getter;
String zookeeper_path;
Int32 current_version = 0;
ThreadFromGlobalPool watching_thread;
std::atomic<bool> entities_loaded = false;
std::atomic<bool> watching_flag = false;
std::shared_ptr<ConcurrentBoundedQueue<bool>> watch_queue; // TODO(serxa): rework it into something that is not a queue
LoggerPtr log;
};
}

View File

@ -5,6 +5,8 @@
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTCreateResourceQuery.h>
#include <Parsers/formatAST.h>
#include <IO/WriteBufferFromString.h>
#include <boost/container/flat_set.hpp>
#include <boost/range/algorithm/copy.hpp>
@ -13,7 +15,6 @@
#include <queue>
#include <unordered_set>
namespace DB
{
@ -26,6 +27,7 @@ namespace ErrorCodes
namespace
{
/// Removes details from a CREATE query to be used as workload entity definition
ASTPtr normalizeCreateWorkloadEntityQuery(const IAST & create_query)
{
auto ptr = create_query.clone();
@ -42,6 +44,7 @@ ASTPtr normalizeCreateWorkloadEntityQuery(const IAST & create_query)
return ptr;
}
/// Returns a type of a workload entity `ptr`
WorkloadEntityType getEntityType(const ASTPtr & ptr)
{
if (auto * res = typeid_cast<ASTCreateWorkloadQuery *>(ptr.get()))
@ -52,12 +55,38 @@ WorkloadEntityType getEntityType(const ASTPtr & ptr)
return WorkloadEntityType::MAX;
}
bool entityEquals(const ASTPtr & lhs, const ASTPtr & rhs)
{
if (auto * a = typeid_cast<ASTCreateWorkloadQuery *>(lhs.get()))
{
if (auto * b = typeid_cast<ASTCreateWorkloadQuery *>(rhs.get()))
{
return std::forward_as_tuple(a->getWorkloadName(), a->getWorkloadParent(), a->changes)
== std::forward_as_tuple(b->getWorkloadName(), b->getWorkloadParent(), b->changes);
}
}
if (auto * a = typeid_cast<ASTCreateResourceQuery *>(lhs.get()))
{
if (auto * b = typeid_cast<ASTCreateResourceQuery *>(rhs.get()))
return std::forward_as_tuple(a->getResourceName(), a->operations)
== std::forward_as_tuple(b->getResourceName(), b->operations);
}
return false;
}
/// Workload entities could reference each other.
/// This enum defines all possible reference types
enum class ReferenceType
{
Parent, ForResource
Parent, // Source workload references target workload as a parent
ForResource // Source workload references target resource in its `SETTINGS x = y FOR resource` clause
};
void forEachReference(const ASTPtr & source_entity, std::function<void(String, String, ReferenceType)> func)
/// Runs a `func` callback for every reference from `source` to `target`.
/// This function is the source of truth defining what `target` references are stored in a workload `source_entity`
void forEachReference(
const ASTPtr & source_entity,
std::function<void(const String & target, const String & source, ReferenceType type)> func)
{
if (auto * res = typeid_cast<ASTCreateWorkloadQuery *>(source_entity.get()))
{
@ -82,6 +111,7 @@ void forEachReference(const ASTPtr & source_entity, std::function<void(String, S
}
}
/// Helper for recursive DFS
void topologicallySortedWorkloadsImpl(const String & name, const ASTPtr & ast, const std::unordered_map<String, ASTPtr> & workloads, std::unordered_set<String> & visited, std::vector<std::pair<String, ASTPtr>> & sorted_workloads)
{
if (visited.contains(name))
@ -101,6 +131,7 @@ void topologicallySortedWorkloadsImpl(const String & name, const ASTPtr & ast, c
sorted_workloads.emplace_back(name, ast);
}
/// Returns pairs {worload_name, create_workload_ast} in order that respect child-parent relation (parent first, then children)
std::vector<std::pair<String, ASTPtr>> topologicallySortedWorkloads(const std::unordered_map<String, ASTPtr> & workloads)
{
std::vector<std::pair<String, ASTPtr>> sorted_workloads;
@ -110,6 +141,143 @@ std::vector<std::pair<String, ASTPtr>> topologicallySortedWorkloads(const std::u
return sorted_workloads;
}
/// Helper for recursive DFS
void topologicallySortedDependenciesImpl(
const String & name,
const std::unordered_map<String, std::unordered_set<String>> & dependencies,
std::unordered_set<String> & visited,
std::vector<String> & result)
{
if (visited.contains(name))
return;
visited.insert(name);
if (auto it = dependencies.find(name); it != dependencies.end())
{
for (const String & dep : it->second)
topologicallySortedDependenciesImpl(dep, dependencies, visited, result);
}
result.emplace_back(name);
}
/// Returns nodes in topological order that respect `dependencies` (key is node name, value is set of dependencies)
std::vector<String> topologicallySortedDependencies(const std::unordered_map<String, std::unordered_set<String>> & dependencies) {
std::unordered_set<String> visited; // Set to track visited nodes
std::vector<String> result; // Result to store nodes in topologically sorted order
// Perform DFS for each node in the graph
for (const auto & [name, _] : dependencies)
topologicallySortedDependenciesImpl(name, dependencies, visited, result);
// Reverse the result to get the correct topological order
std::reverse(result.begin(), result.end());
return result;
}
/// Represents a change of a workload entity (WORKLOAD or RESOURCE)
struct EntityChange
{
String name; /// Name of entity
ASTPtr before; /// Entity before change (CREATE if not set)
ASTPtr after; /// Entity after change (DROP if not set)
std::vector<IWorkloadEntityStorage::Event> toEvents() const
{
if (!after)
return {{getEntityType(before), name, {}}};
else if (!before)
return {{getEntityType(after), name, after}};
else
{
auto type_before = getEntityType(before);
auto type_after = getEntityType(after);
// If type changed, we have to remove an old entity and add a new one
if (type_before != type_after)
return {{type_before, name, {}}, {type_after, name, after}};
else
return {{type_after, name, after}};
}
}
};
/// Returns `changes` ordered for execution.
/// Every intemediate state during execution will be consistent (i.e. all references will be valid)
/// NOTE: It does not validate changes, any problem will be detected during execution.
/// NOTE: There will be no error if valid order does not exist.
std::vector<EntityChange> topologicallySortedChanges(const std::vector<EntityChange> & changes)
{
// Construct map from entity name into entity change
std::unordered_map<String, const EntityChange *> change_by_name;
for (const auto & change : changes)
change_by_name[change.name] = &change;
// Construct references maps (before changes and after changes)
std::unordered_map<String, std::unordered_set<String>> old_sources; // Key is target. Value is set of names of source entities.
std::unordered_map<String, std::unordered_set<String>> new_targets; // Key is source. Value is set of names of target entities.
for (const auto & change : changes)
{
if (change.before)
{
forEachReference(change.before,
[&] (const String & target, const String & source, ReferenceType)
{
old_sources[target].insert(source);
});
}
if (change.after)
{
forEachReference(change.after,
[&] (const String & target, const String & source, ReferenceType)
{
new_targets[source].insert(target);
});
}
}
// There are consistency rules that regulate order in which changes must be applied (see below).
// Construct DAG of dependencies between changes.
std::unordered_map<String, std::unordered_set<String>> dependencies; // Key is entity name. Value is set of names of entity that should be changed first.
for (const auto & change : changes)
{
for (const auto & event : change.toEvents())
{
if (!event.entity) // DROP
{
// Rule 1: Entity can only be removed after all existing references to it are removed as well.
for (const String & source : old_sources[event.name])
{
if (change_by_name.contains(source))
dependencies[event.name].insert(source);
}
}
else // CREATE || CREATE OR REPLACE
{
// Rule 2: Entity can only be created after all entities it references are created as well.
for (const String & target : new_targets[event.name])
{
if (auto it = change_by_name.find(target); it != change_by_name.end())
{
const EntityChange & target_change = *it->second;
// If target is creating, it should be created first.
// (But if target is updating, there is no dependency).
if (!target_change.before)
dependencies[event.name].insert(target);
}
}
}
}
}
// Topological sort of changes to respect consistency rules
std::vector<EntityChange> result;
for (const String & name : topologicallySortedDependencies(dependencies))
result.push_back(*change_by_name[name]);
return result;
}
}
WorkloadEntityStorageBase::WorkloadEntityStorageBase(ContextPtr global_context_)
@ -130,7 +298,7 @@ ASTPtr WorkloadEntityStorageBase::get(const String & entity_name) const
return it->second;
}
ASTPtr WorkloadEntityStorageBase::tryGet(const std::string & entity_name) const
ASTPtr WorkloadEntityStorageBase::tryGet(const String & entity_name) const
{
std::lock_guard lock(mutex);
@ -146,9 +314,9 @@ bool WorkloadEntityStorageBase::has(const String & entity_name) const
return tryGet(entity_name) != nullptr;
}
std::vector<std::string> WorkloadEntityStorageBase::getAllEntityNames() const
std::vector<String> WorkloadEntityStorageBase::getAllEntityNames() const
{
std::vector<std::string> entity_names;
std::vector<String> entity_names;
std::lock_guard lock(mutex);
entity_names.reserve(entities.size());
@ -159,9 +327,9 @@ std::vector<std::string> WorkloadEntityStorageBase::getAllEntityNames() const
return entity_names;
}
std::vector<std::string> WorkloadEntityStorageBase::getAllEntityNames(WorkloadEntityType entity_type) const
std::vector<String> WorkloadEntityStorageBase::getAllEntityNames(WorkloadEntityType entity_type) const
{
std::vector<std::string> entity_names;
std::vector<String> entity_names;
std::lock_guard lock(mutex);
for (const auto & [name, entity] : entities)
@ -195,110 +363,101 @@ bool WorkloadEntityStorageBase::storeEntity(
auto * workload = typeid_cast<ASTCreateWorkloadQuery *>(create_entity_query.get());
auto * resource = typeid_cast<ASTCreateResourceQuery *>(create_entity_query.get());
std::unique_lock lock{mutex};
ASTPtr old_entity; // entity to be REPLACED
if (auto it = entities.find(entity_name); it != entities.end())
while (true)
{
if (throw_if_exists)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists", entity_name);
else if (!replace_if_exists)
return false;
else
old_entity = it->second;
}
std::unique_lock lock{mutex};
// Validate CREATE OR REPLACE
if (old_entity)
{
auto * old_workload = typeid_cast<ASTCreateWorkloadQuery *>(old_entity.get());
auto * old_resource = typeid_cast<ASTCreateResourceQuery *>(old_entity.get());
if (workload && !old_workload)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists, but it is not a workload", entity_name);
if (resource && !old_resource)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists, but it is not a resource", entity_name);
if (workload && !old_workload->hasParent() && workload->hasParent())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "It is not allowed to remove root workload");
}
std::optional<String> new_root_name;
// Validate workload
if (workload)
{
if (!workload->hasParent())
ASTPtr old_entity; // entity to be REPLACED
if (auto it = entities.find(entity_name); it != entities.end())
{
if (!root_name.empty() && root_name != workload->getWorkloadName())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The second root is not allowed. You should probably add 'PARENT {}' clause.", root_name);
new_root_name = workload->getWorkloadName();
if (throw_if_exists)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists", entity_name);
else if (!replace_if_exists)
return false;
else
old_entity = it->second;
}
SchedulingSettings validator;
validator.updateFromChanges(workload->changes);
}
forEachReference(create_entity_query,
[this, workload] (const String & target, const String & source, ReferenceType type)
// Validate CREATE OR REPLACE
if (old_entity)
{
if (auto it = entities.find(target); it == entities.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' references another workload entity '{}' that doesn't exist", source, target);
auto * old_workload = typeid_cast<ASTCreateWorkloadQuery *>(old_entity.get());
auto * old_resource = typeid_cast<ASTCreateResourceQuery *>(old_entity.get());
if (workload && !old_workload)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists, but it is not a workload", entity_name);
if (resource && !old_resource)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' already exists, but it is not a resource", entity_name);
if (workload && !old_workload->hasParent() && workload->hasParent())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "It is not allowed to remove root workload");
}
switch (type)
// Validate workload
if (workload)
{
if (!workload->hasParent())
{
case ReferenceType::Parent:
{
if (typeid_cast<ASTCreateWorkloadQuery *>(entities[target].get()) == nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload parent should reference another workload, not '{}'.", target);
break;
}
case ReferenceType::ForResource:
{
if (typeid_cast<ASTCreateResourceQuery *>(entities[target].get()) == nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload settings should reference resource in FOR clause, not '{}'.", target);
// Validate that we could parse the settings for specific resource
SchedulingSettings validator;
validator.updateFromChanges(workload->changes, target);
break;
}
if (!root_name.empty() && root_name != workload->getWorkloadName())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The second root is not allowed. You should probably add 'PARENT {}' clause.", root_name);
}
// Detect reference cycles.
// The only way to create a cycle is to add an edge that will be a part of a new cycle.
// We are going to add an edge: `source` -> `target`, so we ensure there is no path back `target` -> `source`.
if (isIndirectlyReferenced(source, target))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity cycles are not allowed");
});
SchedulingSettings validator;
validator.updateFromChanges(workload->changes);
}
bool stored = storeEntityImpl(
current_context,
entity_type,
entity_name,
create_entity_query,
throw_if_exists,
replace_if_exists,
settings);
forEachReference(create_entity_query,
[this, workload] (const String & target, const String & source, ReferenceType type)
{
if (auto it = entities.find(target); it == entities.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' references another workload entity '{}' that doesn't exist", source, target);
if (stored)
{
if (new_root_name)
root_name = *new_root_name;
switch (type)
{
case ReferenceType::Parent:
{
if (typeid_cast<ASTCreateWorkloadQuery *>(entities[target].get()) == nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload parent should reference another workload, not '{}'.", target);
break;
}
case ReferenceType::ForResource:
{
if (typeid_cast<ASTCreateResourceQuery *>(entities[target].get()) == nullptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload settings should reference resource in FOR clause, not '{}'.", target);
// Remove references of a replaced entity (only for CREATE OR REPLACE)
removeReferences(old_entity);
// Validate that we could parse the settings for specific resource
SchedulingSettings validator;
validator.updateFromChanges(workload->changes, target);
break;
}
}
// Insert references of created entity
insertReferences(create_entity_query);
// Detect reference cycles.
// The only way to create a cycle is to add an edge that will be a part of a new cycle.
// We are going to add an edge: `source` -> `target`, so we ensure there is no path back `target` -> `source`.
if (isIndirectlyReferenced(source, target))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity cycles are not allowed");
});
// Store in memory
entities[entity_name] = create_entity_query;
auto result = storeEntityImpl(
current_context,
entity_type,
entity_name,
create_entity_query,
throw_if_exists,
replace_if_exists,
settings);
// Process notifications
onEntityAdded(entity_type, entity_name, create_entity_query);
unlockAndNotify(lock);
if (result == OperationResult::Retry)
continue; // Entities were updated, we need to rerun all the validations
if (result == OperationResult::Ok)
{
Event event{entity_type, entity_name, create_entity_query};
applyEvent(lock, event);
unlockAndNotify(lock, {std::move(event)});
}
return result == OperationResult::Ok;
}
return stored;
}
bool WorkloadEntityStorageBase::removeEntity(
@ -307,47 +466,44 @@ bool WorkloadEntityStorageBase::removeEntity(
const String & entity_name,
bool throw_if_not_exists)
{
std::unique_lock lock(mutex);
auto it = entities.find(entity_name);
if (it == entities.end())
while (true)
{
if (throw_if_not_exists)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' doesn't exist", entity_name);
else
return false;
std::unique_lock lock(mutex);
auto it = entities.find(entity_name);
if (it == entities.end())
{
if (throw_if_not_exists)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' doesn't exist", entity_name);
else
return false;
}
if (auto reference_it = references.find(entity_name); reference_it != references.end())
{
String names;
for (const String & name : reference_it->second)
names += " " + name;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' cannot be dropped. It is referenced by:{}", entity_name, names);
}
auto result = removeEntityImpl(
current_context,
entity_type,
entity_name,
throw_if_not_exists);
if (result == OperationResult::Retry)
continue; // Entities were updated, we need to rerun all the validations
if (result == OperationResult::Ok)
{
Event event{entity_type, entity_name, {}};
applyEvent(lock, event);
unlockAndNotify(lock, {std::move(event)});
}
return result == OperationResult::Ok;
}
if (auto reference_it = references.find(entity_name); reference_it != references.end())
{
String names;
for (const String & name : reference_it->second)
names += " " + name;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Workload entity '{}' cannot be dropped. It is referenced by:{}", entity_name, names);
}
bool removed = removeEntityImpl(
current_context,
entity_type,
entity_name,
throw_if_not_exists);
if (removed)
{
if (entity_name == root_name)
root_name.clear();
// Clean up references
removeReferences(it->second);
// Remove from memory
entities.erase(it);
// Process notifications
onEntityRemoved(entity_type, entity_name);
unlockAndNotify(lock);
}
return removed;
}
scope_guard WorkloadEntityStorageBase::getAllEntitiesAndSubscribe(const OnChangedHandler & handler)
@ -357,9 +513,7 @@ scope_guard WorkloadEntityStorageBase::getAllEntitiesAndSubscribe(const OnChange
std::vector<Event> current_state;
{
std::unique_lock lock{mutex};
chassert(queue.empty());
makeEventsForAllEntities(lock);
current_state = std::move(queue);
current_state = orderEntities(entities);
std::lock_guard lock2{handlers->mutex};
handlers->list.push_back(handler);
@ -377,41 +531,30 @@ scope_guard WorkloadEntityStorageBase::getAllEntitiesAndSubscribe(const OnChange
return result;
}
void WorkloadEntityStorageBase::onEntityAdded(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & new_entity)
void WorkloadEntityStorageBase::unlockAndNotify(
std::unique_lock<std::recursive_mutex> & lock,
std::vector<Event> tx)
{
queue.push_back(Event{.type = entity_type, .name = entity_name, .entity = new_entity});
}
if (tx.empty())
return;
void WorkloadEntityStorageBase::onEntityRemoved(WorkloadEntityType entity_type, const String & entity_name)
{
queue.push_back(Event{.type = entity_type, .name = entity_name, .entity = {}});
}
void WorkloadEntityStorageBase::unlockAndNotify(std::unique_lock<std::recursive_mutex> & mutex_lock)
{
/// Only one thread can send notification at any time, that is why we need `mutex_lock`
if (!queue.empty())
std::vector<OnChangedHandler> current_handlers;
{
auto events = std::move(queue);
std::lock_guard handlers_lock{handlers->mutex};
boost::range::copy(handlers->list, std::back_inserter(current_handlers));
}
std::vector<OnChangedHandler> current_handlers;
lock.unlock();
for (const auto & handler : current_handlers)
{
try
{
std::lock_guard handlers_lock{handlers->mutex};
boost::range::copy(handlers->list, std::back_inserter(current_handlers));
handler(tx);
}
mutex_lock.unlock();
for (const auto & handler : current_handlers)
catch (...)
{
try
{
handler(events);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
@ -421,52 +564,84 @@ std::unique_lock<std::recursive_mutex> WorkloadEntityStorageBase::getLock() cons
return std::unique_lock{mutex};
}
void WorkloadEntityStorageBase::setAllEntities(const std::vector<std::pair<String, ASTPtr>> & new_entities)
void WorkloadEntityStorageBase::setAllEntities(const std::vector<std::pair<String, ASTPtr>> & raw_new_entities)
{
std::unordered_map<String, ASTPtr> normalized_entities;
for (const auto & [entity_name, create_query] : new_entities)
normalized_entities[entity_name] = normalizeCreateWorkloadEntityQuery(*create_query);
// TODO(serxa): do validation and throw LOGICAL_ERROR if failed
std::unordered_map<String, ASTPtr> new_entities;
for (const auto & [entity_name, create_query] : raw_new_entities)
new_entities[entity_name] = normalizeCreateWorkloadEntityQuery(*create_query);
std::unique_lock lock(mutex);
chassert(entities.empty()); // TODO(serxa): keeper storage could do full refresh, so we should support it here
entities = std::move(normalized_entities);
for (const auto & [entity_name, entity] : entities)
insertReferences(entity);
// Quick check to avoid extra work
// Fill vector of `changes` based on difference between current `entities` and `new_entities`
std::vector<EntityChange> changes;
for (const auto & [entity_name, entity] : entities)
{
std::lock_guard lock2(handlers->mutex);
if (handlers->list.empty())
return;
if (auto it = new_entities.find(entity_name); it != new_entities.end())
{
if (!entityEquals(entity, it->second))
changes.emplace_back(entity_name, entity, it->second); // Remove entities that are not present in `new_entities`
}
else
changes.emplace_back(entity_name, entity, ASTPtr{}); // Update entities that are present in both `new_entities` and `entities`
}
for (const auto & [entity_name, entity] : new_entities)
{
if (!entities.contains(entity_name))
changes.emplace_back(entity_name, ASTPtr{}, entity); // Create entities that are only present in `new_entities`
}
makeEventsForAllEntities(lock);
unlockAndNotify(lock);
// Sort `changes` to respect consistency of references and apply them one by one.
std::vector<Event> tx;
for (const auto & change : topologicallySortedChanges(changes))
{
for (const auto & event : change.toEvents())
{
// TODO(serxa): do validation and throw LOGICAL_ERROR if failed
applyEvent(lock, event);
tx.push_back(event);
}
}
// Notify subscribers
unlockAndNotify(lock, tx);
}
void WorkloadEntityStorageBase::makeEventsForAllEntities(std::unique_lock<std::recursive_mutex> &)
void WorkloadEntityStorageBase::applyEvent(
std::unique_lock<std::recursive_mutex> &,
const Event & event)
{
std::unordered_map<String, ASTPtr> workloads;
std::unordered_map<String, ASTPtr> resources;
for (auto & [entity_name, ast] : entities)
if (event.entity) // CREATE || CREATE OR REPLACE
{
if (typeid_cast<ASTCreateWorkloadQuery *>(ast.get()))
workloads.emplace(entity_name, ast);
else if (typeid_cast<ASTCreateResourceQuery *>(ast.get()))
resources.emplace(entity_name, ast);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid workload entity type '{}'", ast->getID());
auto * workload = typeid_cast<ASTCreateWorkloadQuery *>(event.entity.get());
// Validate workload
if (workload && !workload->hasParent())
root_name = workload->getWorkloadName();
// Remove references of a replaced entity (only for CREATE OR REPLACE)
if (auto it = entities.find(event.name); it != entities.end())
removeReferences(it->second);
// Insert references of created entity
insertReferences(event.entity);
// Store in memory
entities[event.name] = event.entity;
}
else // DROP
{
auto it = entities.find(event.name);
chassert(it != entities.end());
// Resources should be created first because workloads could reference them
for (auto & [entity_name, ast] : resources)
onEntityAdded(WorkloadEntityType::Resource, entity_name, ast);
if (event.name == root_name)
root_name.clear();
// Workloads should be created in an order such that children are created only after its parent is created
for (auto & [entity_name, ast] : topologicallySortedWorkloads(workloads))
onEntityAdded(WorkloadEntityType::Workload, entity_name, ast);
// Clean up references
removeReferences(it->second);
// Remove from memory
entities.erase(it);
}
}
std::vector<std::pair<String, ASTPtr>> WorkloadEntityStorageBase::getAllEntities() const
@ -528,4 +703,59 @@ void WorkloadEntityStorageBase::removeReferences(const ASTPtr & entity)
});
}
std::vector<WorkloadEntityStorageBase::Event> WorkloadEntityStorageBase::orderEntities(
const std::unordered_map<String, ASTPtr> & all_entities,
std::optional<Event> change)
{
std::vector<Event> result;
std::unordered_map<String, ASTPtr> workloads;
for (auto & [entity_name, ast] : all_entities)
{
if (typeid_cast<ASTCreateWorkloadQuery *>(ast.get()))
{
if (change && change->name == entity_name)
continue; // Skip this workload if it is removed or updated
workloads.emplace(entity_name, ast);
}
else if (typeid_cast<ASTCreateResourceQuery *>(ast.get()))
{
if (change && change->name == entity_name)
continue; // Skip this resource if it is removed or updated
// Resources should go first because workloads could reference them
result.emplace_back(WorkloadEntityType::Resource, entity_name, ast);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid workload entity type '{}'", ast->getID());
}
// Introduce new entity described by `change`
if (change && change->entity)
{
if (change->type == WorkloadEntityType::Workload)
workloads.emplace(change->name, change->entity);
else if (change->type == WorkloadEntityType::Resource)
result.emplace_back(WorkloadEntityType::Resource, change->name, change->entity);
}
// Workloads should go in an order such that children are enlisted only after its parent
for (auto & [entity_name, ast] : topologicallySortedWorkloads(workloads))
result.emplace_back(WorkloadEntityType::Workload, entity_name, ast);
return result;
}
String WorkloadEntityStorageBase::serializeAllEntities(std::optional<Event> change)
{
std::unique_lock<std::recursive_mutex> lock;
auto ordered_entities = orderEntities(entities, change);
WriteBufferFromOwnString buf;
for (const auto & event : ordered_entities)
{
formatAST(*event.entity, buf, false, true);
buf.write(";\n", 2);
}
return buf.str();
}
}

View File

@ -49,7 +49,14 @@ public:
const OnChangedHandler & handler) override;
protected:
virtual bool storeEntityImpl(
enum class OperationResult
{
Ok,
Failed,
Retry
};
virtual OperationResult storeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
@ -58,7 +65,7 @@ protected:
bool replace_if_exists,
const Settings & settings) = 0;
virtual bool removeEntityImpl(
virtual OperationResult removeEntityImpl(
const ContextPtr & current_context,
WorkloadEntityType entity_type,
const String & entity_name,
@ -66,18 +73,21 @@ protected:
std::unique_lock<std::recursive_mutex> getLock() const;
/// Replace current `entities` with `new_entities` and notifies subscribers.
/// Note that subscribers will be notified with a sequence of events.
/// It is guaranteed that all itermediate states (between every pair of consecutive events)
/// will be consistent (all references between entities will be valid)
void setAllEntities(const std::vector<std::pair<String, ASTPtr>> & new_entities);
void makeEventsForAllEntities(std::unique_lock<std::recursive_mutex> & lock);
/// Called by derived class after a new workload entity has been added.
void onEntityAdded(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & new_entity);
/// Serialize `entities` stored in memory plus one optional `change` into multiline string
String serializeAllEntities(std::optional<Event> change = {});
/// Called by derived class after an workload entity has been removed.
void onEntityRemoved(WorkloadEntityType entity_type, const String & entity_name);
private:
/// Change state in memory
void applyEvent(std::unique_lock<std::recursive_mutex> & lock, const Event & event);
/// Sends notifications to subscribers about changes in workload entities
/// (added with previous calls onEntityAdded(), onEntityRemoved()).
void unlockAndNotify(std::unique_lock<std::recursive_mutex> & lock);
/// Notify subscribers about changes describe by vector of events `tx`
void unlockAndNotify(std::unique_lock<std::recursive_mutex> & lock, std::vector<Event> tx);
/// Return true iff `references` has a path from `source` to `target`
bool isIndirectlyReferenced(const String & target, const String & source);
@ -88,6 +98,11 @@ protected:
/// Removes references that are described by `entity` from `references`
void removeReferences(const ASTPtr & entity);
/// Returns an ordered vector of `entities`
std::vector<Event> orderEntities(
const std::unordered_map<String, ASTPtr> & all_entitites,
std::optional<Event> change = {});
struct Handlers
{
std::mutex mutex;
@ -96,15 +111,14 @@ protected:
/// shared_ptr is here for safety because WorkloadEntityStorageBase can be destroyed before all subscriptions are removed.
std::shared_ptr<Handlers> handlers;
std::vector<Event> queue;
mutable std::recursive_mutex mutex;
std::unordered_map<String, ASTPtr> entities; /// Maps entity name into CREATE entity query
// Validation
std::unordered_map<String, std::unordered_set<String>> references; /// Keep track of references between entities. Key is target. Values is set of sources
std::unordered_map<String, std::unordered_set<String>> references; /// Keep track of references between entities. Key is target. Value is set of sources
String root_name; /// current root workload name
protected:
ContextPtr global_context;
};

View File

@ -34,15 +34,12 @@ std::unique_ptr<IWorkloadEntityStorage> createWorkloadEntityStorage(const Contex
zookeeper_path_key,
disk_path_key);
}
abort(); // TODO(serxa): create WorkloadEntityKeeperStorage object
//return std::make_unique<WorkloadEntityKeeperStorage>(global_context, config.getString(zookeeper_path_key));
}
else
{
String default_path = fs::path{global_context->getPath()} / "workload" / "";
String path = config.getString(disk_path_key, default_path);
return std::make_unique<WorkloadEntityDiskStorage>(global_context, path);
return std::make_unique<WorkloadEntityKeeperStorage>(global_context, config.getString(zookeeper_path_key));
}
String default_path = fs::path{global_context->getPath()} / "workload" / "";
String path = config.getString(disk_path_key, default_path);
return std::make_unique<WorkloadEntityDiskStorage>(global_context, path);
}
}

View File

@ -19,6 +19,9 @@ public:
{
AccessMode mode;
String disk;
friend bool operator ==(const Operation & lhs, const Operation & rhs) { return lhs.mode == rhs.mode && lhs.disk == rhs.disk; }
friend bool operator !=(const Operation & lhs, const Operation & rhs) { return !(lhs == rhs); }
};
using Operations = std::vector<Operation>;

View File

@ -0,0 +1,16 @@
#include <Parsers/ParserCreateWorkloadEntity.h>
#include <Parsers/ParserCreateWorkloadQuery.h>
#include <Parsers/ParserCreateResourceQuery.h>
namespace DB
{
bool ParserCreateWorkloadEntity::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserCreateWorkloadQuery create_workload_p;
ParserCreateResourceQuery create_resource_p;
return create_workload_p.parse(pos, node, expected) || create_resource_p.parse(pos, node, expected);
}
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/// Special parser for the CREATE WORKLOAD and CREATE RESOURCE queries.
class ParserCreateWorkloadEntity : public IParserBase
{
protected:
const char * getName() const override { return "CREATE workload entity query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -1,4 +1,5 @@
<clickhouse>
<workload_zookeeper_path>/clickhouse/workload/definitions.sql</workload_zookeeper_path>
<storage_configuration>
<disks>
<s3>

View File

@ -24,6 +24,7 @@ node = cluster.add_instance(
"configs/workloads.xml.default",
],
with_minio=True,
with_zookeeper=True,
)