Merge pull request #61866 from azat/pool-remove-AllocateNewBypassingPool

Remove PoolBase::AllocateNewBypassingPool
This commit is contained in:
Alexey Milovidov 2024-03-26 11:02:44 +03:00 committed by GitHub
commit 76ed7e40fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,11 +1,9 @@
#pragma once #pragma once
#include <condition_variable>
#include <mutex> #include <mutex>
#include <type_traits> #include <condition_variable>
#include <variant>
#include <boost/noncopyable.hpp>
#include <Poco/Timespan.h> #include <Poco/Timespan.h>
#include <boost/noncopyable.hpp>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/Exception.h> #include <Common/Exception.h>
@ -17,6 +15,14 @@ namespace ProfileEvents
extern const Event ConnectionPoolIsFullMicroseconds; extern const Event ConnectionPoolIsFullMicroseconds;
} }
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/** A class from which you can inherit and get a pool of something. Used for database connection pools. /** A class from which you can inherit and get a pool of something. Used for database connection pools.
* Descendant class must provide a method for creating a new object to place in the pool. * Descendant class must provide a method for creating a new object to place in the pool.
*/ */
@ -29,22 +35,6 @@ public:
using ObjectPtr = std::shared_ptr<Object>; using ObjectPtr = std::shared_ptr<Object>;
using Ptr = std::shared_ptr<PoolBase<TObject>>; using Ptr = std::shared_ptr<PoolBase<TObject>>;
enum class BehaviourOnLimit
{
/**
* Default behaviour - when limit on pool size is reached, callers will wait until object will be returned back in pool.
*/
Wait,
/**
* If no free objects in pool - allocate a new object, but not store it in pool.
* This behaviour is needed when we simply don't want to waste time waiting or if we cannot guarantee that query could be processed using fixed amount of connections.
* For example, when we read from table on s3, one GetObject request corresponds to the whole FileSystemCache segment. This segments are shared between different
* reading tasks, so in general case connection could be taken from pool by one task and returned back by another one. And these tasks are processed completely independently.
*/
AllocateNewBypassingPool,
};
private: private:
/** The object with the flag, whether it is currently used. */ /** The object with the flag, whether it is currently used. */
@ -99,53 +89,37 @@ public:
Object & operator*() && = delete; Object & operator*() && = delete;
const Object & operator*() const && = delete; const Object & operator*() const && = delete;
Object * operator->() & { return castToObjectPtr(); } Object * operator->() & { return &*data->data.object; }
const Object * operator->() const & { return castToObjectPtr(); } const Object * operator->() const & { return &*data->data.object; }
Object & operator*() & { return *castToObjectPtr(); } Object & operator*() & { return *data->data.object; }
const Object & operator*() const & { return *castToObjectPtr(); } const Object & operator*() const & { return *data->data.object; }
/** /**
* Expire an object to make it reallocated later. * Expire an object to make it reallocated later.
*/ */
void expire() void expire()
{ {
if (data.index() == 1) data->data.is_expired = true;
std::get<1>(data)->data.is_expired = true;
} }
bool isNull() const { return data.index() == 0 ? !std::get<0>(data) : !std::get<1>(data); } bool isNull() const { return data == nullptr; }
PoolBase * getPool() const
{
if (!data)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Attempt to get pool from uninitialized entry");
return &data->data.pool;
}
private: private:
/** std::shared_ptr<PoolEntryHelper> data;
* Plain object will be stored instead of PoolEntryHelper if fallback was made in get() (see BehaviourOnLimit::AllocateNewBypassingPool).
*/
std::variant<ObjectPtr, std::shared_ptr<PoolEntryHelper>> data;
explicit Entry(ObjectPtr && object) : data(std::move(object)) { } explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) {}
explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) { }
auto castToObjectPtr() const
{
return std::visit(
[](const auto & ptr)
{
using T = std::decay_t<decltype(ptr)>;
if constexpr (std::is_same_v<ObjectPtr, T>)
return ptr.get();
else
return ptr->data.object.get();
},
data);
}
}; };
virtual ~PoolBase() = default; virtual ~PoolBase() = default;
/** Allocates the object. /** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
* If 'behaviour_on_limit' is Wait - wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite.
* If 'behaviour_on_limit' is AllocateNewBypassingPool and there is no free object - a new object will be created but not stored in the pool.
*/
Entry get(Poco::Timespan::TimeDiff timeout) Entry get(Poco::Timespan::TimeDiff timeout)
{ {
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
@ -176,9 +150,6 @@ public:
return Entry(*items.back()); return Entry(*items.back());
} }
if (behaviour_on_limit == BehaviourOnLimit::AllocateNewBypassingPool)
return Entry(allocObject());
Stopwatch blocked; Stopwatch blocked;
if (timeout < 0) if (timeout < 0)
{ {
@ -213,8 +184,6 @@ private:
/** The maximum size of the pool. */ /** The maximum size of the pool. */
unsigned max_items; unsigned max_items;
BehaviourOnLimit behaviour_on_limit;
/** Pool. */ /** Pool. */
Objects items; Objects items;
@ -225,8 +194,8 @@ private:
protected: protected:
LoggerPtr log; LoggerPtr log;
PoolBase(unsigned max_items_, LoggerPtr log_, BehaviourOnLimit behaviour_on_limit_ = BehaviourOnLimit::Wait) PoolBase(unsigned max_items_, LoggerPtr log_)
: max_items(max_items_), behaviour_on_limit(behaviour_on_limit_), log(log_) : max_items(max_items_), log(log_)
{ {
items.reserve(max_items); items.reserve(max_items);
} }