Add custom ugradable lock.

This commit is contained in:
Nikolai Kochetov 2021-10-29 15:34:55 +03:00
parent 6914efe2db
commit a482ed3db3
4 changed files with 179 additions and 8 deletions

View File

@ -4,7 +4,7 @@
#include <Processors/Executors/PollingQueue.h>
#include <Processors/Executors/ThreadsQueue.h>
#include <Processors/Executors/TasksQueue.h>
#include <boost/thread/shared_mutex.hpp>
#include <Processors/Executors/UpgradableLock.h>
#include <stack>
namespace DB
@ -47,7 +47,7 @@ public:
using Queue = std::queue<ExecutingGraph::Node *>;
/// Data used by stopping pipeline task.
boost::shared_mutex stopping_pipeline_mutex;
UpgradableMutex stopping_pipeline_mutex;
void finish();
bool isFinished() const { return finished; }

View File

@ -10,7 +10,6 @@
#include <Interpreters/ProcessList.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <base/scope_guard_safe.h>
#include <boost/thread/locks.hpp>
#ifndef NDEBUG
#include <Common/Stopwatch.h>
@ -126,7 +125,7 @@ bool PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
return true;
}
bool PipelineExecutor::prepareProcessor(UInt64 pid, Queue & queue, Queue & async_queue, boost::upgrade_lock<boost::shared_mutex> & pipeline_lock)
bool PipelineExecutor::prepareProcessor(UInt64 pid, Queue & queue, Queue & async_queue, UpgradableMutex::ReadGuard & pipeline_lock)
{
std::stack<ExecutingGraph::Edge *> updated_edges;
Stack updated_processors;
@ -268,7 +267,7 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Queue & queue, Queue & async
if (need_expand_pipeline)
{
{
boost::upgrade_to_unique_lock lock(pipeline_lock);
UpgradableMutex::WriteGuard lock(pipeline_lock);
if (!expandPipeline(updated_processors, pid))
return false;
}
@ -426,7 +425,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yie
Queue async_queue;
{
boost::upgrade_lock<boost::shared_mutex> pipeline_read_lock(tasks.stopping_pipeline_mutex);
UpgradableMutex::ReadGuard pipeline_read_lock(tasks.stopping_pipeline_mutex);
/// Prepare processor after execution.
if (!prepareProcessor(context.getProcessorID(), queue, async_queue, pipeline_read_lock))
@ -465,7 +464,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
Queue queue;
Queue async_queue;
boost::upgrade_lock<boost::shared_mutex> pipeline_read_lock(tasks.stopping_pipeline_mutex);
UpgradableMutex::ReadGuard pipeline_read_lock(tasks.stopping_pipeline_mutex);
while (!stack.empty())
{

View File

@ -74,7 +74,7 @@ private:
/// Prepare processor with pid number.
/// Check parents and children of current processor and push them to stacks if they also need to be prepared.
/// If processor wants to be expanded, ExpandPipelineTask from thread_number's execution context will be used.
bool prepareProcessor(UInt64 pid, Queue & queue, Queue & async_queue, boost::upgrade_lock<boost::shared_mutex> & pipeline_lock);
bool prepareProcessor(UInt64 pid, Queue & queue, Queue & async_queue, UpgradableMutex::ReadGuard & pipeline_lock);
void initializeExecution(size_t num_threads); /// Initialize executor contexts and task_queue.
void finalizeExecution(); /// Check all processors are finished.

View File

@ -0,0 +1,172 @@
#pragma once
#include <atomic>
#include <list>
#include <mutex>
#include <condition_variable>
namespace DB
{
/// RWLock which allows to upgrade read lock to write lock.
/// Read locks should be fast if there is no write lock.
///
/// Newly created write lock waits for all active read locks.
/// Newly created read lock waits for all write locks. Starvation is possible.
///
/// Mutex must live longer than locks.
/// Read lock must live longer than corresponding write lock.
///
/// For every write lock, a new internal state is created inside mutex.
/// This state is not deallocated until the destruction of mutex itself.
///
/// Usage example:
///
/// UpgradableMutex mutex;
/// {
/// UpgradableMutex::ReadLock read_lock(mutex);
/// ...
/// {
/// UpgradableMutex::WriteLock write_lock(read_lock);
/// ...
/// }
/// ...
/// }
class UpgradableMutex
{
private:
/// Implementation idea
///
/// ----------- (read scope)
/// ++num_readers
/// ** wait for active writer (in loop, starvation is possible here) **
///
/// =========== (write scope)
/// ** create new State **
/// ** wait for active writer (in loop, starvation is possible here) **
/// ** wait for all active readers **
///
/// ** notify all waiting readers for the current state.
/// =========== (end write scope)
///
/// --num_readers
/// ** notify current active writer **
/// ----------- (end read scope)
struct State
{
size_t num_waiting = 0;
bool is_done = false;
std::mutex mutex;
std::condition_variable read_condvar;
std::condition_variable write_condvar;
void wait() noexcept
{
std::unique_lock lock(mutex);
++num_waiting;
write_condvar.notify_one();
while (!is_done)
read_condvar.wait(lock);
}
void lock(std::atomic_size_t & num_locked) noexcept
{
/// Note : num_locked is an atomic
/// which can change it's value without locked mutex.
/// We support an invariant that after changing num_locked value,
/// UpgradableMutex::write_state is checked, and in case of active
/// write lock, we always notify it's write condvar.
std::unique_lock lock(mutex);
++num_waiting;
while (num_locked.load() < num_waiting)
write_condvar.wait(lock);
}
void unlock() noexcept
{
{
std::unique_lock lock(mutex);
is_done = true;
}
read_condvar.notify_all();
}
};
std::atomic_size_t num_readers = 0;
std::list<State> states;
std::mutex states_mutex;
std::atomic<State *> write_state{nullptr};
void lock() noexcept
{
++num_readers;
while (auto * state = write_state.load())
state->wait();
}
void unlock() noexcept
{
--num_readers;
while (auto * state = write_state.load())
state->write_condvar.notify_one();
}
State * allocState()
{
std::lock_guard guard(states_mutex);
return &states.emplace_back();
}
void upgrade(State & state) noexcept
{
State * expected = nullptr;
/// Only change nullptr -> state is possible.
while (!write_state.compare_exchange_strong(expected, &state))
{
expected->wait();
expected = nullptr;
}
state.lock(num_readers);
}
void degrade(State & state) noexcept
{
write_state = nullptr;
state.unlock();
}
public:
class ReadGuard
{
public:
explicit ReadGuard(UpgradableMutex & lock_) : lock(lock_) { lock.lock(); }
~ReadGuard() { lock.unlock(); }
UpgradableMutex & lock;
};
class WriteGuard
{
public:
explicit WriteGuard(ReadGuard & read_guard_) : read_guard(read_guard_)
{
state = read_guard.lock.allocState();
read_guard.lock.upgrade(*state);
}
~WriteGuard()
{
if (state)
read_guard.lock.degrade(*state);
}
private:
ReadGuard & read_guard;
State * state = nullptr;
};
};
}