Use the generic class ext::scope_guard for subscriptions in IAccessStorage instead of a special class.

This commit is contained in:
Vitaly Baranov 2020-01-29 18:51:12 +03:00
parent b6b7feef44
commit 6a8a69fd0c
10 changed files with 53 additions and 147 deletions

View File

@ -299,44 +299,24 @@ std::vector<UUID> IAccessStorage::tryUpdate(const std::vector<UUID> & ids, const
}
IAccessStorage::SubscriptionPtr IAccessStorage::subscribeForChanges(std::type_index type, const OnChangedHandler & handler) const
ext::scope_guard IAccessStorage::subscribeForChanges(std::type_index type, const OnChangedHandler & handler) const
{
return subscribeForChangesImpl(type, handler);
}
IAccessStorage::SubscriptionPtr IAccessStorage::subscribeForChanges(const UUID & id, const OnChangedHandler & handler) const
ext::scope_guard IAccessStorage::subscribeForChanges(const UUID & id, const OnChangedHandler & handler) const
{
return subscribeForChangesImpl(id, handler);
}
IAccessStorage::SubscriptionPtr IAccessStorage::subscribeForChanges(const std::vector<UUID> & ids, const OnChangedHandler & handler) const
ext::scope_guard IAccessStorage::subscribeForChanges(const std::vector<UUID> & ids, const OnChangedHandler & handler) const
{
if (ids.empty())
return nullptr;
if (ids.size() == 1)
return subscribeForChangesImpl(ids[0], handler);
std::vector<SubscriptionPtr> subscriptions;
subscriptions.reserve(ids.size());
ext::scope_guard subscriptions;
for (const auto & id : ids)
{
auto subscription = subscribeForChangesImpl(id, handler);
if (subscription)
subscriptions.push_back(std::move(subscription));
}
class SubscriptionImpl : public Subscription
{
public:
SubscriptionImpl(std::vector<SubscriptionPtr> subscriptions_)
: subscriptions(std::move(subscriptions_)) {}
private:
std::vector<SubscriptionPtr> subscriptions;
};
return std::make_unique<SubscriptionImpl>(std::move(subscriptions));
subscriptions.join(subscribeForChangesImpl(id, handler));
return subscriptions;
}

View File

@ -3,6 +3,7 @@
#include <Access/IAccessEntity.h>
#include <Core/Types.h>
#include <Core/UUID.h>
#include <ext/scope_guard.h>
#include <functional>
#include <optional>
#include <vector>
@ -109,26 +110,19 @@ public:
/// Updates multiple entities in the storage. Returns the list of successfully updated.
std::vector<UUID> tryUpdate(const std::vector<UUID> & ids, const UpdateFunc & update_func);
class Subscription
{
public:
virtual ~Subscription() {}
};
using SubscriptionPtr = std::unique_ptr<Subscription>;
using OnChangedHandler = std::function<void(const UUID & /* id */, const AccessEntityPtr & /* new or changed entity, null if removed */)>;
/// Subscribes for all changes.
/// Can return nullptr if cannot subscribe (identifier not found) or if it doesn't make sense (the storage is read-only).
SubscriptionPtr subscribeForChanges(std::type_index type, const OnChangedHandler & handler) const;
ext::scope_guard subscribeForChanges(std::type_index type, const OnChangedHandler & handler) const;
template <typename EntityType>
SubscriptionPtr subscribeForChanges(OnChangedHandler handler) const { return subscribeForChanges(typeid(EntityType), handler); }
ext::scope_guard subscribeForChanges(OnChangedHandler handler) const { return subscribeForChanges(typeid(EntityType), handler); }
/// Subscribes for changes of a specific entry.
/// Can return nullptr if cannot subscribe (identifier not found) or if it doesn't make sense (the storage is read-only).
SubscriptionPtr subscribeForChanges(const UUID & id, const OnChangedHandler & handler) const;
SubscriptionPtr subscribeForChanges(const std::vector<UUID> & ids, const OnChangedHandler & handler) const;
ext::scope_guard subscribeForChanges(const UUID & id, const OnChangedHandler & handler) const;
ext::scope_guard subscribeForChanges(const std::vector<UUID> & ids, const OnChangedHandler & handler) const;
bool hasSubscription(std::type_index type) const;
bool hasSubscription(const UUID & id) const;
@ -142,8 +136,8 @@ protected:
virtual UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) = 0;
virtual void removeImpl(const UUID & id) = 0;
virtual void updateImpl(const UUID & id, const UpdateFunc & update_func) = 0;
virtual SubscriptionPtr subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const = 0;
virtual SubscriptionPtr subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const = 0;
virtual ext::scope_guard subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const = 0;
virtual ext::scope_guard subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const = 0;
virtual bool hasSubscriptionImpl(const UUID & id) const = 0;
virtual bool hasSubscriptionImpl(std::type_index type) const = 0;

View File

@ -6,7 +6,7 @@
namespace DB
{
MemoryAccessStorage::MemoryAccessStorage(const String & storage_name_)
: IAccessStorage(storage_name_), shared_ptr_to_this{std::make_shared<const MemoryAccessStorage *>(this)}
: IAccessStorage(storage_name_)
{
}
@ -256,85 +256,38 @@ void MemoryAccessStorage::prepareNotifications(const Entry & entry, bool remove,
}
IAccessStorage::SubscriptionPtr MemoryAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const
ext::scope_guard MemoryAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const
{
class SubscriptionImpl : public Subscription
std::lock_guard lock{mutex};
auto handler_it = handlers_by_type.emplace(type, handler);
return [this, handler_it]
{
public:
SubscriptionImpl(
const MemoryAccessStorage & storage_,
std::type_index type_,
const OnChangedHandler & handler_)
: storage_weak(storage_.shared_ptr_to_this)
{
std::lock_guard lock{storage_.mutex};
handler_it = storage_.handlers_by_type.emplace(type_, handler_);
}
~SubscriptionImpl() override
{
auto storage = storage_weak.lock();
if (storage)
{
std::lock_guard lock{(*storage)->mutex};
(*storage)->handlers_by_type.erase(handler_it);
}
}
private:
std::weak_ptr<const MemoryAccessStorage *> storage_weak;
std::unordered_multimap<std::type_index, OnChangedHandler>::iterator handler_it;
std::lock_guard lock2{mutex};
handlers_by_type.erase(handler_it);
};
return std::make_unique<SubscriptionImpl>(*this, type, handler);
}
IAccessStorage::SubscriptionPtr MemoryAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
ext::scope_guard MemoryAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
{
class SubscriptionImpl : public Subscription
std::lock_guard lock{mutex};
auto it = entries.find(id);
if (it == entries.end())
return {};
const Entry & entry = it->second;
auto handler_it = entry.handlers_by_id.insert(entry.handlers_by_id.end(), handler);
return [this, id, handler_it]
{
public:
SubscriptionImpl(
const MemoryAccessStorage & storage_,
const UUID & id_,
const OnChangedHandler & handler_)
: storage_weak(storage_.shared_ptr_to_this),
id(id_)
std::lock_guard lock2{mutex};
auto it2 = entries.find(id);
if (it2 != entries.end())
{
std::lock_guard lock{storage_.mutex};
auto it = storage_.entries.find(id);
if (it == storage_.entries.end())
{
storage_weak.reset();
return;
}
const Entry & entry = it->second;
handler_it = entry.handlers_by_id.insert(entry.handlers_by_id.end(), handler_);
const Entry & entry2 = it2->second;
entry2.handlers_by_id.erase(handler_it);
}
~SubscriptionImpl() override
{
auto storage = storage_weak.lock();
if (storage)
{
std::lock_guard lock{(*storage)->mutex};
auto it = (*storage)->entries.find(id);
if (it != (*storage)->entries.end())
{
const Entry & entry = it->second;
entry.handlers_by_id.erase(handler_it);
}
}
}
private:
std::weak_ptr<const MemoryAccessStorage *> storage_weak;
UUID id;
std::list<OnChangedHandler>::iterator handler_it;
};
return std::make_unique<SubscriptionImpl>(*this, id, handler);
}

View File

@ -29,8 +29,8 @@ private:
UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override;
void removeImpl(const UUID & id) override;
void updateImpl(const UUID & id, const UpdateFunc & update_func) override;
SubscriptionPtr subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
SubscriptionPtr subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const override;
ext::scope_guard subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
ext::scope_guard subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const override;
bool hasSubscriptionImpl(const UUID & id) const override;
bool hasSubscriptionImpl(std::type_index type) const override;
@ -60,6 +60,5 @@ private:
std::unordered_map<UUID, Entry> entries; /// We want to search entries both by ID and by the pair of name and type.
std::unordered_map<NameTypePair, Entry *, Hash> names; /// and by the pair of name and type.
mutable std::unordered_multimap<std::type_index, OnChangedHandler> handlers_by_type;
std::shared_ptr<const MemoryAccessStorage *> shared_ptr_to_this; /// We need weak pointers to `this` to implement subscriptions.
};
}

View File

@ -185,41 +185,21 @@ void MultipleAccessStorage::updateImpl(const UUID & id, const UpdateFunc & updat
}
IAccessStorage::SubscriptionPtr MultipleAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
ext::scope_guard MultipleAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
{
auto storage = findStorage(id);
if (!storage)
return nullptr;
return {};
return storage->subscribeForChanges(id, handler);
}
IAccessStorage::SubscriptionPtr MultipleAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const
ext::scope_guard MultipleAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const
{
std::vector<SubscriptionPtr> subscriptions;
ext::scope_guard subscriptions;
for (const auto & nested_storage : nested_storages)
{
auto subscription = nested_storage->subscribeForChanges(type, handler);
if (subscription)
subscriptions.emplace_back(std::move(subscription));
}
if (subscriptions.empty())
return nullptr;
if (subscriptions.size() == 1)
return std::move(subscriptions[0]);
class SubscriptionImpl : public Subscription
{
public:
SubscriptionImpl(std::vector<SubscriptionPtr> subscriptions_)
: subscriptions(std::move(subscriptions_)) {}
private:
std::vector<SubscriptionPtr> subscriptions;
};
return std::make_unique<SubscriptionImpl>(std::move(subscriptions));
subscriptions.join(nested_storage->subscribeForChanges(type, handler));
return subscriptions;
}

View File

@ -38,8 +38,8 @@ protected:
UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override;
void removeImpl(const UUID & id) override;
void updateImpl(const UUID & id, const UpdateFunc & update_func) override;
SubscriptionPtr subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
SubscriptionPtr subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const override;
ext::scope_guard subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
ext::scope_guard subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const override;
bool hasSubscriptionImpl(const UUID & id) const override;
bool hasSubscriptionImpl(std::type_index type) const override;

View File

@ -1,7 +1,7 @@
#pragma once
#include <Access/QuotaContext.h>
#include <Access/IAccessStorage.h>
#include <ext/scope_guard.h>
#include <memory>
#include <mutex>
#include <unordered_map>
@ -56,7 +56,7 @@ private:
mutable std::mutex mutex;
std::unordered_map<UUID /* quota id */, QuotaInfo> all_quotas;
bool all_quotas_read = false;
IAccessStorage::SubscriptionPtr subscription;
ext::scope_guard subscription;
std::vector<std::weak_ptr<QuotaContext>> contexts;
};
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Access/RowPolicyContext.h>
#include <Access/IAccessStorage.h>
#include <ext/scope_guard.h>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
@ -46,7 +46,7 @@ private:
const AccessControlManager & access_control_manager;
std::unordered_map<UUID, PolicyInfo> all_policies;
bool all_policies_read = false;
IAccessStorage::SubscriptionPtr subscription;
ext::scope_guard subscription;
std::vector<std::weak_ptr<RowPolicyContext>> contexts;
std::mutex mutex;
};

View File

@ -250,13 +250,13 @@ void UsersConfigAccessStorage::updateImpl(const UUID & id, const UpdateFunc &)
}
IAccessStorage::SubscriptionPtr UsersConfigAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
ext::scope_guard UsersConfigAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
{
return memory_storage.subscribeForChanges(id, handler);
}
IAccessStorage::SubscriptionPtr UsersConfigAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const
ext::scope_guard UsersConfigAccessStorage::subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const
{
return memory_storage.subscribeForChanges(type, handler);
}

View File

@ -32,8 +32,8 @@ private:
UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override;
void removeImpl(const UUID & id) override;
void updateImpl(const UUID & id, const UpdateFunc & update_func) override;
SubscriptionPtr subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
SubscriptionPtr subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const override;
ext::scope_guard subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
ext::scope_guard subscribeForChangesImpl(std::type_index type, const OnChangedHandler & handler) const override;
bool hasSubscriptionImpl(const UUID & id) const override;
bool hasSubscriptionImpl(std::type_index type) const override;