improve workload entities subscription model

This commit is contained in:
serxa 2024-09-22 00:13:55 +00:00
parent b60d1427a9
commit 36b8481793
7 changed files with 149 additions and 115 deletions

View File

@ -1,4 +1,3 @@
#include "Common/Scheduler/IResourceManager.h"
#include <Common/Scheduler/Nodes/IOResourceManager.h>
#include <Common/Scheduler/Nodes/FifoQueue.h>
@ -231,34 +230,34 @@ String IOResourceManager::Workload::getParent() const
IOResourceManager::IOResourceManager(IWorkloadEntityStorage & storage_)
: storage(storage_)
{
workload_change_subscription = storage.subscribeForChanges(WorkloadEntityType::Workload, [this] (
WorkloadEntityType,
const String & entity_name,
const ASTPtr & entity)
subscription = storage.getAllEntitiesAndSubscribe(
[this] (const std::vector<IWorkloadEntityStorage::Event> & events)
{
try
{
if (entity)
createOrUpdateWorkload(entity_name, entity);
else
deleteWorkload(entity_name);
}
catch (...)
{
// TODO(serxa): handle CRUD errors
}
});
resource_change_subscription = storage.subscribeForChanges(WorkloadEntityType::Resource, [this] (
WorkloadEntityType,
const String & entity_name,
const ASTPtr & entity /* new or changed entity, null if removed */)
{
try
{
if (entity)
createResource(entity_name, entity);
else
deleteResource(entity_name);
for (auto [entity_type, entity_name, entity] : events)
{
switch (entity_type)
{
case WorkloadEntityType::Workload:
{
if (entity)
createOrUpdateWorkload(entity_name, entity);
else
deleteWorkload(entity_name);
break;
}
case WorkloadEntityType::Resource:
{
if (entity)
createResource(entity_name, entity);
else
deleteResource(entity_name);
break;
}
case WorkloadEntityType::MAX: break;
}
}
}
catch (...)
{
@ -269,8 +268,7 @@ IOResourceManager::IOResourceManager(IWorkloadEntityStorage & storage_)
IOResourceManager::~IOResourceManager()
{
resource_change_subscription.reset();
workload_change_subscription.reset();
subscription.reset();
resources.clear();
workloads.clear();
}

View File

@ -262,8 +262,7 @@ private:
std::vector<Workload *> topologicallySortedWorkloads();
IWorkloadEntityStorage & storage;
scope_guard workload_change_subscription;
scope_guard resource_change_subscription;
scope_guard subscription;
std::mutex mutex;
std::unordered_map<String, WorkloadPtr> workloads; // TSA_GUARDED_BY(mutex);

View File

@ -59,9 +59,6 @@ public:
/// Stops watching.
virtual void stopWatching() {}
/// Immediately reloads all entities, throws an exception if failed.
virtual void reloadEntities() = 0;
/// Stores an entity.
virtual bool storeEntity(
const ContextPtr & current_context,
@ -79,15 +76,16 @@ public:
const String & entity_name,
bool throw_if_not_exists) = 0;
using OnChangedHandler = std::function<void(
WorkloadEntityType /* entity_type */,
const String & /* entity_name */,
const ASTPtr & /* new or changed entity, null if removed */)>;
struct Event
{
WorkloadEntityType type;
String name;
ASTPtr entity; /// new or changed entity, null if removed
};
using OnChangedHandler = std::function<void(const std::vector<Event> &)>;
/// Subscribes for all changes.
virtual scope_guard subscribeForChanges(
WorkloadEntityType entity_type,
const OnChangedHandler & handler) = 0;
/// Gets all current entries, pass them through `handler` and subscribes for all later changes.
virtual scope_guard getAllEntitiesAndSubscribe(const OnChangedHandler & handler) = 0;
};
}

View File

@ -126,13 +126,6 @@ void WorkloadEntityDiskStorage::loadEntities()
}
void WorkloadEntityDiskStorage::reloadEntities()
{
// TODO(serxa): it does not send notifications, maybe better to remove this method completely
loadEntitiesImpl();
}
void WorkloadEntityDiskStorage::loadEntitiesImpl()
{
LOG_INFO(log, "Loading workload entities from {}", dir_path);

View File

@ -13,11 +13,8 @@ class WorkloadEntityDiskStorage : public WorkloadEntityStorageBase
{
public:
WorkloadEntityDiskStorage(const ContextPtr & global_context_, const String & dir_path_);
void loadEntities() override;
void reloadEntities() override;
private:
bool storeEntityImpl(
const ContextPtr & current_context,

View File

@ -8,6 +8,10 @@
#include <Parsers/ASTCreateWorkloadQuery.h>
#include <Parsers/ASTCreateResourceQuery.h>
#include <mutex>
#include <unordered_set>
namespace DB
{
@ -15,6 +19,7 @@ namespace ErrorCodes
{
extern const int WORKLOAD_ENTITY_ALREADY_EXISTS;
extern const int UNKNOWN_WORKLOAD_ENTITY;
extern const int LOGICAL_ERROR;
}
namespace
@ -47,6 +52,34 @@ WorkloadEntityType getEntityType(const ASTPtr & ptr)
return WorkloadEntityType::MAX;
}
void topologicallySortedWorkloadsImpl(const String & name, const ASTPtr & ast, const std::unordered_map<String, ASTPtr> & workloads, std::unordered_set<String> & visited, std::vector<std::pair<String, ASTPtr>> & sorted_workloads)
{
if (visited.contains(name))
return;
visited.insert(name);
// Recurse into parent (if any)
String parent = typeid_cast<ASTCreateWorkloadQuery *>(ast.get())->getWorkloadParent();
if (!parent.empty())
{
auto parent_iter = workloads.find(parent);
if (parent_iter == workloads.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Workload metadata inconsistency: Workload '{}' parent '{}' does not exist. This must be fixed manually.", name, parent);
topologicallySortedWorkloadsImpl(parent, parent_iter->second, workloads, visited, sorted_workloads);
}
sorted_workloads.emplace_back(name, ast);
}
std::vector<std::pair<String, ASTPtr>> topologicallySortedWorkloads(const std::unordered_map<String, ASTPtr> & workloads)
{
std::vector<std::pair<String, ASTPtr>> sorted_workloads;
std::unordered_set<String> visited;
for (const auto & [name, ast] : workloads)
topologicallySortedWorkloadsImpl(name, ast, workloads, visited, sorted_workloads);
return sorted_workloads;
}
}
WorkloadEntityStorageBase::WorkloadEntityStorageBase(ContextPtr global_context_)
@ -125,7 +158,7 @@ bool WorkloadEntityStorageBase::storeEntity(
bool replace_if_exists,
const Settings & settings)
{
std::lock_guard lock{mutex};
std::unique_lock lock{mutex};
create_entity_query = normalizeCreateWorkloadEntityQuery(*create_entity_query, global_context);
@ -153,7 +186,7 @@ bool WorkloadEntityStorageBase::storeEntity(
onEntityAdded(entity_type, entity_name, create_entity_query);
}
sendNotifications();
unlockAndNotify(lock);
return stored;
}
@ -164,7 +197,7 @@ bool WorkloadEntityStorageBase::removeEntity(
const String & entity_name,
bool throw_if_not_exists)
{
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
auto it = entities.find(entity_name);
if (it == entities.end())
{
@ -186,88 +219,79 @@ bool WorkloadEntityStorageBase::removeEntity(
onEntityRemoved(entity_type, entity_name);
}
sendNotifications();
unlockAndNotify(lock);
return removed;
}
scope_guard WorkloadEntityStorageBase::subscribeForChanges(
WorkloadEntityType entity_type,
const OnChangedHandler & handler)
scope_guard WorkloadEntityStorageBase::getAllEntitiesAndSubscribe(const OnChangedHandler & handler)
{
std::lock_guard lock{handlers->mutex};
auto & list = handlers->by_type[static_cast<size_t>(entity_type)];
list.push_back(handler);
auto handler_it = std::prev(list.end());
scope_guard result;
return [my_handlers = handlers, entity_type, handler_it]
std::vector<Event> current_state;
{
std::lock_guard lock2{my_handlers->mutex};
auto & list2 = my_handlers->by_type[static_cast<size_t>(entity_type)];
list2.erase(handler_it);
};
std::unique_lock lock{mutex};
chassert(queue.empty());
makeEventsForAllEntities(lock);
current_state = std::move(queue);
std::lock_guard lock2{handlers->mutex};
handlers->list.push_back(handler);
auto handler_it = std::prev(handlers->list.end());
result = [my_handlers = handlers, handler_it]
{
std::lock_guard lock3{my_handlers->mutex};
my_handlers->list.erase(handler_it);
};
}
// When you subscribe you get all the entities back to your handler immediately if already loaded, or later when loaded
handler(current_state);
return result;
}
void WorkloadEntityStorageBase::onEntityAdded(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & new_entity)
{
std::lock_guard lock{queue_mutex};
Event event;
event.name = entity_name;
event.type = entity_type;
event.entity = new_entity;
queue.push(std::move(event));
queue.push_back(Event{.type = entity_type, .name = entity_name, .entity = new_entity});
}
void WorkloadEntityStorageBase::onEntityUpdated(WorkloadEntityType entity_type, const String & entity_name, const ASTPtr & changed_entity)
{
std::lock_guard lock{queue_mutex};
Event event;
event.name = entity_name;
event.type = entity_type;
event.entity = changed_entity;
queue.push(std::move(event));
queue.push_back(Event{.type = entity_type, .name = entity_name, .entity = changed_entity});
}
void WorkloadEntityStorageBase::onEntityRemoved(WorkloadEntityType entity_type, const String & entity_name)
{
std::lock_guard lock{queue_mutex};
Event event;
event.name = entity_name;
event.type = entity_type;
queue.push(std::move(event));
queue.push_back(Event{.type = entity_type, .name = entity_name, .entity = {}});
}
void WorkloadEntityStorageBase::sendNotifications()
void WorkloadEntityStorageBase::unlockAndNotify(std::unique_lock<std::recursive_mutex> & mutex_lock)
{
/// Only one thread can send notification at any time.
std::lock_guard sending_notifications_lock{sending_notifications};
std::unique_lock queue_lock{queue_mutex};
while (!queue.empty())
/// Only one thread can send notification at any time, that is why we need `mutex_lock`
if (!queue.empty())
{
auto event = std::move(queue.front());
queue.pop();
queue_lock.unlock();
auto events = std::move(queue);
std::vector<OnChangedHandler> current_handlers;
{
std::lock_guard handlers_lock{handlers->mutex};
boost::range::copy(handlers->by_type[static_cast<size_t>(event.type)], std::back_inserter(current_handlers));
boost::range::copy(handlers->list, std::back_inserter(current_handlers));
}
mutex_lock.unlock();
for (const auto & handler : current_handlers)
{
try
{
handler(event.type, event.name, event.entity);
handler(events);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
queue_lock.lock();
}
}
@ -276,21 +300,54 @@ std::unique_lock<std::recursive_mutex> WorkloadEntityStorageBase::getLock() cons
return std::unique_lock{mutex};
}
void WorkloadEntityStorageBase::setAllEntities(const std::vector<std::pair<String, ASTPtr>> & new_entities)
{
std::unordered_map<String, ASTPtr> normalized_entities;
for (const auto & [entity_name, create_query] : new_entities)
normalized_entities[entity_name] = normalizeCreateWorkloadEntityQuery(*create_query, global_context);
// TODO(serxa): do validation and throw LOGICAL_ERROR if failed
// Note that notifications are not sent, because it is hard to send notifications in right order to maintain invariants.
// Another code path using getAllEntities() should be used for initialization
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
chassert(entities.empty());
entities = std::move(normalized_entities);
// Quick check to avoid extra work
{
std::lock_guard lock2(handlers->mutex);
if (handlers->list.empty())
return;
}
makeEventsForAllEntities(lock);
unlockAndNotify(lock);
}
void WorkloadEntityStorageBase::makeEventsForAllEntities(std::unique_lock<std::recursive_mutex> &)
{
std::unordered_map<String, ASTPtr> workloads;
std::unordered_map<String, ASTPtr> resources;
for (auto & [entity_name, ast] : entities)
{
if (typeid_cast<ASTCreateWorkloadQuery *>(ast.get()))
workloads.emplace(entity_name, ast);
else if (typeid_cast<ASTCreateResourceQuery *>(ast.get()))
resources.emplace(entity_name, ast);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid workload entity type '{}'", ast->getID());
}
for (auto & [entity_name, ast] : topologicallySortedWorkloads(workloads))
onEntityAdded(WorkloadEntityType::Workload, entity_name, ast);
for (auto & [entity_name, ast] : resources)
onEntityAdded(WorkloadEntityType::Resource, entity_name, ast);
}
std::vector<std::pair<String, ASTPtr>> WorkloadEntityStorageBase::getAllEntities() const
{
std::lock_guard lock{mutex};

View File

@ -3,7 +3,6 @@
#include <unordered_map>
#include <list>
#include <mutex>
#include <queue>
#include <Common/Scheduler/Workload/IWorkloadEntityStorage.h>
#include <Interpreters/Context_fwd.h>
@ -45,8 +44,7 @@ public:
const String & entity_name,
bool throw_if_not_exists) override;
virtual scope_guard subscribeForChanges(
WorkloadEntityType entity_type,
virtual scope_guard getAllEntitiesAndSubscribe(
const OnChangedHandler & handler) override;
protected:
@ -66,7 +64,9 @@ protected:
bool throw_if_not_exists) = 0;
std::unique_lock<std::recursive_mutex> getLock() const;
void setAllEntities(const std::vector<std::pair<String, ASTPtr>> & new_entities);
void makeEventsForAllEntities(std::unique_lock<std::recursive_mutex> & lock);
void removeAllEntitiesExcept(const Strings & entity_names_to_keep);
/// Called by derived class after a new workload entity has been added.
@ -80,25 +80,17 @@ protected:
/// Sends notifications to subscribers about changes in workload entities
/// (added with previous calls onEntityAdded(), onEntityUpdated(), onEntityRemoved()).
void sendNotifications();
void unlockAndNotify(std::unique_lock<std::recursive_mutex> & lock);
struct Handlers
{
std::mutex mutex;
std::list<OnChangedHandler> by_type[static_cast<size_t>(WorkloadEntityType::MAX)];
std::list<OnChangedHandler> list;
};
/// shared_ptr is here for safety because WorkloadEntityStorageBase can be destroyed before all subscriptions are removed.
std::shared_ptr<Handlers> handlers;
struct Event
{
WorkloadEntityType type;
String name;
ASTPtr entity;
};
std::queue<Event> queue;
std::mutex queue_mutex;
std::mutex sending_notifications;
std::vector<Event> queue;
mutable std::recursive_mutex mutex;
std::unordered_map<String, ASTPtr> entities; // Maps entity name into CREATE entity query