Added documentation

This commit is contained in:
Maksim Kita 2021-03-04 14:58:36 +03:00
parent aa0778933c
commit 9173544003
4 changed files with 137 additions and 79 deletions

View File

@ -8,25 +8,83 @@
#include <Common/MoveOrCopyIfThrow.h>
/// TODO: Add documentation
/** Pool for limited size objects that cannot be used from different threads simultaneously.
* The main use case is to have fixed size of objects that can be reused in difference threads during their lifetime
* and have to be initialized on demand.
* Two main properies of pool are allocated objects size and borrowed objects size.
* Allocated objects size is size of objects that are currently allocated by the pool.
* Borrowed objects size is size of objects that are borrowed from clients.
* If max_size == 0 then pool has unlimited size and objects will be allocated without limit.
*
* Pool provides following strategy for borrowing object:
* If max_size == 0 then pool has unlimited size and objects will be allocated without limit.
* 1. If pool has objects that can be increase borrowed objects size and return it.
* 2. If pool allocatedObjectsSize is lower than max objects size or pool has unlimited size
* allocate new object, increase borrowed objects size and return it.
* 3. If pool is full wait on condition variable with or without timeout until some object
* will be returned to the pool.
*/
template <typename T>
class BorrowedObjectPool final
{
public:
explicit BorrowedObjectPool(size_t max_size_) : max_size(max_size_) {}
/// Borrow object from pool. If pull is full and all objects were borrowed
/// then calling thread will wait until some object will be returned into pool.
template <typename FactoryFunc>
T borrowObject(FactoryFunc && func)
void borrowObject(T & dest, FactoryFunc && func)
{
return borrowObjectImpl<NoTimeoutStrategy>(std::forward<FactoryFunc>(func), NoTimeoutStrategy());
std::unique_lock<std::mutex> lock(objects_mutex);
if (!objects.empty())
{
dest = borrowFromObjects();
return;
}
bool has_unlimited_size = (max_size == 0);
if (unlikely(has_unlimited_size) || allocated_objects_size < max_size)
{
dest = allocateObjectForBorrowing(std::forward<FactoryFunc>(func));
return;
}
condition_variable.wait(lock, [this] { return !objects.empty(); });
dest = borrowFromObjects();
}
/// Same as borrowObject function, but wait with timeout.
/// Returns true if object was borrowed during timeout.
template <typename FactoryFunc>
T tryBorrowObject(FactoryFunc && func, size_t timeout_in_milliseconds = 0)
bool tryBorrowObject(T & dest, FactoryFunc && func, size_t timeout_in_milliseconds = 0)
{
return borrowObjectImpl<WaitTimeoutStrategy>(std::forward<FactoryFunc>(func), WaitTimeoutStrategy{timeout_in_milliseconds});
std::unique_lock<std::mutex> lock(objects_mutex);
if (!objects.empty())
{
dest = borrowFromObjects();
return true;
}
bool has_unlimited_size = (max_size == 0);
if (unlikely(has_unlimited_size) || allocated_objects_size < max_size)
{
dest = allocateObjectForBorrowing(std::forward<FactoryFunc>(func));
return true;
}
bool wait_result = condition_variable.wait_for(lock, std::chrono::milliseconds(timeout_in_milliseconds), [this] { return !objects.empty(); });
if (wait_result)
dest = borrowFromObjects();
return wait_result;
}
/// Return object into pool. Client must return same object that was borrowed.
ALWAYS_INLINE inline void returnObject(T && object_to_return)
{
std::unique_lock<std::mutex> lck(objects_mutex);
@ -37,12 +95,28 @@ public:
condition_variable.notify_one();
}
/// Max pool size
ALWAYS_INLINE inline size_t maxSize() const
{
return max_size;
}
/// Allocated objects size by the pool. If allocatedObjectsSize == maxSize then pool is full.
ALWAYS_INLINE inline size_t allocatedObjectsSize() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
return allocated_objects_size;
}
/// Returns allocatedObjectsSize == maxSize
ALWAYS_INLINE inline bool isFull() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
return allocated_objects_size == max_size;
}
/// Borrowed objects size. If borrowedObjectsSize == allocatedObjectsSize and pool is full.
/// Then client will wait during borrowObject function call.
ALWAYS_INLINE inline size_t borrowedObjectsSize() const
{
std::unique_lock<std::mutex> lock(objects_mutex);
@ -51,34 +125,6 @@ public:
private:
struct NoTimeoutStrategy {};
struct WaitTimeoutStrategy { size_t timeout_in_milliseconds; };
template <typename TimeoutStrategy, typename FactoryFunc>
ALWAYS_INLINE inline T borrowObjectImpl(FactoryFunc && func, TimeoutStrategy strategy [[maybe_unused]])
{
std::unique_lock<std::mutex> lock(objects_mutex);
if (!objects.empty())
return borrowFromObjects();
bool has_unlimited_size = (max_size == 0);
if (unlikely(has_unlimited_size))
return allocateObjectForBorrowing(std::forward<FactoryFunc>(func));
if (allocated_objects_size < max_size)
return allocateObjectForBorrowing(std::forward<FactoryFunc>(func));
if constexpr (std::is_same_v<TimeoutStrategy, WaitTimeoutStrategy>)
condition_variable.wait_for(lock, std::chrono::milliseconds(strategy.timeout_in_milliseconds), [this] { return !objects.empty(); });
else
condition_variable.wait(lock, [this] { return !objects.empty(); });
return borrowFromObjects();
}
template <typename FactoryFunc>
ALWAYS_INLINE inline T allocateObjectForBorrowing(FactoryFunc && func)
{

View File

@ -58,12 +58,19 @@ ShellCommand::~ShellCommand()
if (wait_called)
return;
if (shouldTerminateProcess())
if (destructor_strategy.terminate_in_destructor)
{
LOG_TRACE(getLogger(), "Will kill shell command pid {} with SIGTERM", pid);
int retcode = kill(pid, SIGTERM);
if (retcode != 0)
LOG_WARNING(getLogger(), "Cannot kill shell command pid {} errno '{}'", pid, errnoToString(retcode));
size_t try_wait_timeout = destructor_strategy.wait_for_normal_exit_before_termination_seconds;
bool process_terminated_normally = tryWaitProcessWithTimeout(try_wait_timeout);
if (!process_terminated_normally)
{
LOG_TRACE(getLogger(), "Will kill shell command pid {} with SIGTERM", pid);
int retcode = kill(pid, SIGTERM);
if (retcode != 0)
LOG_WARNING(getLogger(), "Cannot kill shell command pid {} errno '{}'", pid, errnoToString(retcode));
}
}
else
{
@ -78,52 +85,59 @@ ShellCommand::~ShellCommand()
}
}
bool ShellCommand::shouldTerminateProcess()
bool ShellCommand::tryWaitProcessWithTimeout(size_t timeout_in_seconds)
{
if (!destructor_strategy.terminate_in_destructor)
return false;
int status = 0;
size_t wait_before_signal_seconds = destructor_strategy.wait_for_normal_exit_before_termination_seconds;
LOG_TRACE(getLogger(), "Try wait for shell command pid ({}) with timeout ({})", pid, timeout_in_seconds);
if (wait_before_signal_seconds > 0)
wait_called = true;
struct timespec interval {.tv_sec = 1, .tv_nsec = 0};
try
{
LOG_TRACE(getLogger(), "Wait for shell command pid ({}) before termination with timeout ({})", pid, wait_before_signal_seconds);
in.close();
out.close();
err.close();
struct timespec interval{.tv_sec = 1, .tv_nsec = 0};
try
if (timeout_in_seconds == 0)
{
in.close();
out.close();
err.close();
/// If there is no timeout before signal try to waitpid 1 time without block so we can avoid sending
/// signal if process is already terminated normally finished.
int status = 0;
while (wait_before_signal_seconds != 0)
{
int waitpid_res = waitpid(pid, &status, WNOHANG);
if (waitpid_res == 0)
{
--wait_before_signal_seconds;
nanosleep(&interval, nullptr);
continue;
}
else if (waitpid_res == -1 && errno != EINTR)
return true;
else
return false;
}
return true;
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
return process_terminated_normally;
}
catch (...)
/// If timeout is positive try waitpid without block in loop until
/// process is normally terminated or waitpid return error
while (timeout_in_seconds != 0)
{
return true;
int waitpid_res = waitpid(pid, &status, WNOHANG);
bool process_terminated_normally = (waitpid_res == pid);
if (process_terminated_normally)
return true;
else if (waitpid_res == 0)
{
--timeout_in_seconds;
nanosleep(&interval, nullptr);
continue;
}
else if (waitpid_res == -1 && errno != EINTR)
return false;
}
}
else
return true;
catch (...)
{
return false;
}
return false;
}
void ShellCommand::logCommand(const char * filename, char * const argv[])

View File

@ -47,7 +47,7 @@ private:
ShellCommand(pid_t pid_, int & in_fd_, int & out_fd_, int & err_fd_, ShellCommandDestructorStrategy destructor_strategy_);
bool shouldTerminateProcess();
bool tryWaitProcessWithTimeout(size_t timeout_in_seconds);
static Poco::Logger * getLogger();

View File

@ -217,16 +217,14 @@ BlockInputStreamPtr ExecutablePoolDictionarySource::loadKeys(const Columns & key
BlockInputStreamPtr ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
{
std::cerr << "ExecutablePoolDictionarySource::getStreamForBlock borrow object start " << std::endl;
std::cerr << "Borrowed objects size " << process_pool->borrowedObjectsSize() << " allocated objects size " << process_pool->allocatedObjectsSize() << std::endl;
std::unique_ptr<ShellCommand> process = process_pool->tryBorrowObject([this]()
std::unique_ptr<ShellCommand> process;
process_pool->borrowObject(process, [this]()
{
bool terminate_in_destructor = true;
ShellCommandDestructorStrategy strategy { terminate_in_destructor, command_termination_timeout };
auto shell_command = ShellCommand::execute(command, false, strategy);
return shell_command;
}, 5000);
});
size_t rows_to_read = block.rows();
auto read_stream = context.getInputFormat(format, process->out, sample_block, rows_to_read);