diff --git a/src/Common/PoolBase.h b/src/Common/PoolBase.h index ef35002c45a..d6fc1656eca 100644 --- a/src/Common/PoolBase.h +++ b/src/Common/PoolBase.h @@ -1,11 +1,9 @@ #pragma once -#include #include -#include -#include -#include +#include #include +#include #include #include @@ -17,6 +15,14 @@ namespace ProfileEvents extern const Event ConnectionPoolIsFullMicroseconds; } +namespace DB +{ + namespace ErrorCodes + { + extern const int LOGICAL_ERROR; + } +} + /** A class from which you can inherit and get a pool of something. Used for database connection pools. * Descendant class must provide a method for creating a new object to place in the pool. */ @@ -29,22 +35,6 @@ public: using ObjectPtr = std::shared_ptr; using Ptr = std::shared_ptr>; - enum class BehaviourOnLimit - { - /** - * Default behaviour - when limit on pool size is reached, callers will wait until object will be returned back in pool. - */ - Wait, - - /** - * If no free objects in pool - allocate a new object, but not store it in pool. - * This behaviour is needed when we simply don't want to waste time waiting or if we cannot guarantee that query could be processed using fixed amount of connections. - * For example, when we read from table on s3, one GetObject request corresponds to the whole FileSystemCache segment. This segments are shared between different - * reading tasks, so in general case connection could be taken from pool by one task and returned back by another one. And these tasks are processed completely independently. - */ - AllocateNewBypassingPool, - }; - private: /** The object with the flag, whether it is currently used. */ @@ -99,53 +89,37 @@ public: Object & operator*() && = delete; const Object & operator*() const && = delete; - Object * operator->() & { return castToObjectPtr(); } - const Object * operator->() const & { return castToObjectPtr(); } - Object & operator*() & { return *castToObjectPtr(); } - const Object & operator*() const & { return *castToObjectPtr(); } + Object * operator->() & { return &*data->data.object; } + const Object * operator->() const & { return &*data->data.object; } + Object & operator*() & { return *data->data.object; } + const Object & operator*() const & { return *data->data.object; } /** * Expire an object to make it reallocated later. */ void expire() { - if (data.index() == 1) - std::get<1>(data)->data.is_expired = true; + data->data.is_expired = true; } - bool isNull() const { return data.index() == 0 ? !std::get<0>(data) : !std::get<1>(data); } + bool isNull() const { return data == nullptr; } + + PoolBase * getPool() const + { + if (!data) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Attempt to get pool from uninitialized entry"); + return &data->data.pool; + } private: - /** - * Plain object will be stored instead of PoolEntryHelper if fallback was made in get() (see BehaviourOnLimit::AllocateNewBypassingPool). - */ - std::variant> data; + std::shared_ptr data; - explicit Entry(ObjectPtr && object) : data(std::move(object)) { } - - explicit Entry(PooledObject & object) : data(std::make_shared(object)) { } - - auto castToObjectPtr() const - { - return std::visit( - [](const auto & ptr) - { - using T = std::decay_t; - if constexpr (std::is_same_v) - return ptr.get(); - else - return ptr->data.object.get(); - }, - data); - } + explicit Entry(PooledObject & object) : data(std::make_shared(object)) {} }; virtual ~PoolBase() = default; - /** Allocates the object. - * If 'behaviour_on_limit' is Wait - wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. - * If 'behaviour_on_limit' is AllocateNewBypassingPool and there is no free object - a new object will be created but not stored in the pool. - */ + /** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */ Entry get(Poco::Timespan::TimeDiff timeout) { std::unique_lock lock(mutex); @@ -176,9 +150,6 @@ public: return Entry(*items.back()); } - if (behaviour_on_limit == BehaviourOnLimit::AllocateNewBypassingPool) - return Entry(allocObject()); - Stopwatch blocked; if (timeout < 0) { @@ -213,8 +184,6 @@ private: /** The maximum size of the pool. */ unsigned max_items; - BehaviourOnLimit behaviour_on_limit; - /** Pool. */ Objects items; @@ -225,8 +194,8 @@ private: protected: LoggerPtr log; - PoolBase(unsigned max_items_, LoggerPtr log_, BehaviourOnLimit behaviour_on_limit_ = BehaviourOnLimit::Wait) - : max_items(max_items_), behaviour_on_limit(behaviour_on_limit_), log(log_) + PoolBase(unsigned max_items_, LoggerPtr log_) + : max_items(max_items_), log(log_) { items.reserve(max_items); }