Merge pull request #19587 from ClickHouse/refactor-pipeline-executor

Refactor pipeline executor
This commit is contained in:
Nikolai Kochetov 2021-11-12 11:49:05 +03:00 committed by GitHub
commit 6623c609cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 951 additions and 682 deletions

View File

@ -30,6 +30,7 @@
#include <Processors/ISink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <pcg_random.hpp>
#include <base/scope_guard.h>
#include <Common/config_version.h>
#include <Common/config.h>

View File

@ -11,6 +11,7 @@
#include <Dictionaries//DictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <base/logger_useful.h>
namespace
{

View File

@ -21,6 +21,7 @@
#include <Common/FieldVisitorHash.h>
#include <Access/Common/AccessFlags.h>
#include <Formats/FormatFactory.h>
#include <base/logger_useful.h>
namespace DB

View File

@ -20,6 +20,7 @@
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <filesystem>
#include <base/logger_useful.h>
namespace fs = std::filesystem;

View File

@ -1,7 +1,9 @@
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Poco/Event.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
#include <base/scope_guard_safe.h>
#include <iostream>

View File

@ -1,4 +1,6 @@
#include <Processors/Executors/ExecutingGraph.h>
#include <stack>
#include <Common/Stopwatch.h>
namespace DB
{
@ -8,7 +10,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
ExecutingGraph::ExecutingGraph(const Processors & processors)
ExecutingGraph::ExecutingGraph(Processors & processors_) : processors(processors_)
{
uint64_t num_processors = processors.size();
nodes.reserve(num_processors);
@ -88,9 +90,36 @@ bool ExecutingGraph::addEdges(uint64_t node)
return was_edge_added;
}
std::vector<uint64_t> ExecutingGraph::expandPipeline(const Processors & processors)
bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
{
auto & cur_node = *nodes[pid];
Processors new_processors;
try
{
new_processors = cur_node.processor->expandPipeline();
}
catch (...)
{
cur_node.exception = std::current_exception();
return false;
}
{
std::lock_guard guard(processors_mutex);
processors.insert(processors.end(), new_processors.begin(), new_processors.end());
}
uint64_t num_processors = processors.size();
std::vector<uint64_t> back_edges_sizes(num_processors, 0);
std::vector<uint64_t> direct_edge_sizes(num_processors, 0);
for (uint64_t node = 0; node < nodes.size(); ++node)
{
direct_edge_sizes[node] = nodes[node]->direct_edges.size();
back_edges_sizes[node] = nodes[node]->back_edges.size();
}
nodes.reserve(num_processors);
while (nodes.size() < num_processors)
@ -112,7 +141,226 @@ std::vector<uint64_t> ExecutingGraph::expandPipeline(const Processors & processo
updated_nodes.push_back(node);
}
return updated_nodes;
for (auto updated_node : updated_nodes)
{
auto & node = *nodes[updated_node];
size_t num_direct_edges = node.direct_edges.size();
size_t num_back_edges = node.back_edges.size();
std::lock_guard guard(node.status_mutex);
for (uint64_t edge = back_edges_sizes[updated_node]; edge < num_back_edges; ++edge)
node.updated_input_ports.emplace_back(edge);
for (uint64_t edge = direct_edge_sizes[updated_node]; edge < num_direct_edges; ++edge)
node.updated_output_ports.emplace_back(edge);
if (node.status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
stack.push(updated_node);
}
}
return true;
}
void ExecutingGraph::initializeExecution(Queue & queue)
{
std::stack<uint64_t> stack;
/// Add childless processors to stack.
uint64_t num_processors = nodes.size();
for (uint64_t proc = 0; proc < num_processors; ++proc)
{
if (nodes[proc]->direct_edges.empty())
{
stack.push(proc);
/// do not lock mutex, as this function is executed in single thread
nodes[proc]->status = ExecutingGraph::ExecStatus::Preparing;
}
}
Queue async_queue;
while (!stack.empty())
{
uint64_t proc = stack.top();
stack.pop();
updateNode(proc, queue, async_queue);
if (!async_queue.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Async is only possible after work() call. Processor {}",
async_queue.front()->processor->getName());
}
}
bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue)
{
std::stack<Edge *> updated_edges;
std::stack<uint64_t> updated_processors;
updated_processors.push(pid);
UpgradableMutex::ReadGuard read_lock(nodes_mutex);
while (!updated_processors.empty() || !updated_edges.empty())
{
std::optional<std::unique_lock<std::mutex>> stack_top_lock;
if (updated_processors.empty())
{
auto * edge = updated_edges.top();
updated_edges.pop();
/// Here we have ownership on edge, but node can be concurrently accessed.
auto & node = *nodes[edge->to];
std::unique_lock lock(node.status_mutex);
ExecutingGraph::ExecStatus status = node.status;
if (status != ExecutingGraph::ExecStatus::Finished)
{
if (edge->backward)
node.updated_output_ports.push_back(edge->output_port_number);
else
node.updated_input_ports.push_back(edge->input_port_number);
if (status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
updated_processors.push(edge->to);
stack_top_lock = std::move(lock);
}
else
nodes[edge->to]->processor->onUpdatePorts();
}
}
if (!updated_processors.empty())
{
pid = updated_processors.top();
updated_processors.pop();
/// In this method we have ownership on node.
auto & node = *nodes[pid];
bool need_expand_pipeline = false;
if (!stack_top_lock)
stack_top_lock.emplace(node.status_mutex);
{
#ifndef NDEBUG
Stopwatch watch;
#endif
std::unique_lock<std::mutex> lock(std::move(*stack_top_lock));
try
{
node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
}
catch (...)
{
node.exception = std::current_exception();
return false;
}
#ifndef NDEBUG
node.preparation_time_ns += watch.elapsed();
#endif
node.updated_input_ports.clear();
node.updated_output_ports.clear();
switch (node.last_processor_status)
{
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
node.status = ExecutingGraph::ExecStatus::Idle;
break;
}
case IProcessor::Status::Finished:
{
node.status = ExecutingGraph::ExecStatus::Finished;
break;
}
case IProcessor::Status::Ready:
{
node.status = ExecutingGraph::ExecStatus::Executing;
queue.push(&node);
break;
}
case IProcessor::Status::Async:
{
node.status = ExecutingGraph::ExecStatus::Executing;
async_queue.push(&node);
break;
}
case IProcessor::Status::ExpandPipeline:
{
need_expand_pipeline = true;
break;
}
}
if (!need_expand_pipeline)
{
/// If you wonder why edges are pushed in reverse order,
/// it is because updated_edges is a stack, and we prefer to get from stack
/// input ports firstly, and then outputs, both in-order.
///
/// Actually, there should be no difference in which order we process edges.
/// However, some tests are sensitive to it (e.g. something like SELECT 1 UNION ALL 2).
/// Let's not break this behaviour so far.
for (auto it = node.post_updated_output_ports.rbegin(); it != node.post_updated_output_ports.rend(); ++it)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(*it);
updated_edges.push(edge);
edge->update_info.trigger();
}
for (auto it = node.post_updated_input_ports.rbegin(); it != node.post_updated_input_ports.rend(); ++it)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(*it);
updated_edges.push(edge);
edge->update_info.trigger();
}
node.post_updated_input_ports.clear();
node.post_updated_output_ports.clear();
}
}
if (need_expand_pipeline)
{
{
UpgradableMutex::WriteGuard lock(read_lock);
if (!expandPipeline(updated_processors, pid))
return false;
}
/// Add itself back to be prepared again.
updated_processors.push(pid);
}
}
}
return true;
}
void ExecutingGraph::cancel()
{
std::lock_guard guard(processors_mutex);
for (auto & processor : processors)
processor->cancel();
}
}

View File

@ -1,7 +1,10 @@
#pragma once
#include <Processors/Port.h>
#include <Processors/IProcessor.h>
#include <Processors/Executors/UpgradableLock.h>
#include <mutex>
#include <queue>
#include <stack>
namespace DB
{
@ -81,8 +84,7 @@ public:
ExecStatus status = ExecStatus::Idle;
std::mutex status_mutex;
/// Job and exception. Job calls processor->work() inside and catch exception.
std::function<void()> job;
/// Exception which happened after processor execution.
std::exception_ptr exception;
/// Last state for profiling.
@ -112,6 +114,7 @@ public:
}
};
using Queue = std::queue<Node *>;
using NodePtr = std::unique_ptr<Node>;
using Nodes = std::vector<NodePtr>;
Nodes nodes;
@ -120,12 +123,19 @@ public:
using ProcessorsMap = std::unordered_map<const IProcessor *, uint64_t>;
ProcessorsMap processors_map;
explicit ExecutingGraph(const Processors & processors);
explicit ExecutingGraph(Processors & processors_);
/// Update graph after processor returned ExpandPipeline status.
/// Processors should already contain newly-added processors.
/// Returns newly-added nodes and nodes which edges were modified.
std::vector<uint64_t> expandPipeline(const Processors & processors);
const Processors & getProcessors() const { return processors; }
/// Traverse graph the first time to update all the childless nodes.
void initializeExecution(Queue & queue);
/// Update processor with pid number (call IProcessor::prepare).
/// Check parents and children of current processor and push them to stacks if they also need to be updated.
/// If processor wants to be expanded, lock will be upgraded to get write access to pipeline.
bool updateNode(uint64_t pid, Queue & queue, Queue & async_queue);
void cancel();
private:
/// Add single edge to edges list. Check processor is known.
@ -134,6 +144,15 @@ private:
/// Append new edges for node. It is called for new node or when new port were added after ExpandPipeline.
/// Returns true if new edge was added.
bool addEdges(uint64_t node);
/// Update graph after processor (pid) returned ExpandPipeline status.
/// All new nodes and nodes with updated ports are pushed into stack.
bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
Processors & processors;
std::mutex processors_mutex;
UpgradableMutex nodes_mutex;
};
}

View File

@ -0,0 +1,107 @@
#include <Processors/Executors/ExecutionThreadContext.h>
#include <Common/Stopwatch.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int QUOTA_EXPIRED;
extern const int QUERY_WAS_CANCELLED;
}
void ExecutionThreadContext::wait(std::atomic_bool & finished)
{
std::unique_lock lock(mutex);
condvar.wait(lock, [&]
{
return finished || wake_flag;
});
wake_flag = false;
}
void ExecutionThreadContext::wakeUp()
{
std::lock_guard guard(mutex);
wake_flag = true;
condvar.notify_one();
}
static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception)
{
/// Don't add additional info to limits and quota exceptions, and in case of kill query (to pass tests).
return exception.code() != ErrorCodes::TOO_MANY_ROWS_OR_BYTES
&& exception.code() != ErrorCodes::QUOTA_EXPIRED
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
}
static void executeJob(IProcessor * processor)
{
try
{
processor->work();
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("While executing " + processor->getName());
throw;
}
}
bool ExecutionThreadContext::executeTask()
{
#ifndef NDEBUG
Stopwatch execution_time_watch;
#endif
try
{
executeJob(node->processor);
++node->num_executed_jobs;
}
catch (...)
{
node->exception = std::current_exception();
}
#ifndef NDEBUG
execution_time_ns += execution_time_watch.elapsed();
#endif
return node->exception == nullptr;
}
void ExecutionThreadContext::rethrowExceptionIfHas()
{
if (exception)
std::rethrow_exception(exception);
}
ExecutingGraph::Node * ExecutionThreadContext::tryPopAsyncTask()
{
ExecutingGraph::Node * task = nullptr;
if (!async_tasks.empty())
{
task = async_tasks.front();
async_tasks.pop();
if (async_tasks.empty())
has_async_tasks = false;
}
return task;
}
void ExecutionThreadContext::pushAsyncTask(ExecutingGraph::Node * async_task)
{
async_tasks.push(async_task);
has_async_tasks = true;
}
}

View File

@ -0,0 +1,61 @@
#pragma once
#include <Processors/Executors/ExecutingGraph.h>
#include <queue>
#include <condition_variable>
namespace DB
{
/// Context for each executing thread of PipelineExecutor.
class ExecutionThreadContext
{
private:
/// A queue of async tasks. Task is added to queue when waited.
std::queue<ExecutingGraph::Node *> async_tasks;
std::atomic_bool has_async_tasks = false;
/// This objects are used to wait for next available task.
std::condition_variable condvar;
std::mutex mutex;
bool wake_flag = false;
/// Currently processing node.
ExecutingGraph::Node * node = nullptr;
/// Exception from executing thread itself.
std::exception_ptr exception;
public:
#ifndef NDEBUG
/// Time for different processing stages.
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
UInt64 wait_time_ns = 0;
#endif
const size_t thread_number;
void wait(std::atomic_bool & finished);
void wakeUp();
/// Methods to access/change currently executing task.
bool hasTask() const { return node != nullptr; }
void setTask(ExecutingGraph::Node * task) { node = task; }
bool executeTask();
uint64_t getProcessorID() const { return node->processors_id; }
/// Methods to manage async tasks.
ExecutingGraph::Node * tryPopAsyncTask();
void pushAsyncTask(ExecutingGraph::Node * async_task);
bool hasAsyncTasks() const { return has_async_tasks; }
std::unique_lock<std::mutex> lockStatus() const { return std::unique_lock(node->status_mutex); }
void setException(std::exception_ptr exception_) { exception = std::move(exception_); }
void rethrowExceptionIfHas();
explicit ExecutionThreadContext(size_t thread_number_) : thread_number(thread_number_) {}
};
}

View File

@ -0,0 +1,193 @@
#include <Processors/Executors/ExecutorTasks.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void ExecutorTasks::finish()
{
{
std::lock_guard lock(mutex);
finished = true;
async_task_queue.finish();
}
std::lock_guard guard(executor_contexts_mutex);
for (auto & context : executor_contexts)
context->wakeUp();
}
void ExecutorTasks::rethrowFirstThreadException()
{
for (auto & executor_context : executor_contexts)
executor_context->rethrowExceptionIfHas();
}
void ExecutorTasks::tryGetTask(ExecutionThreadContext & context)
{
{
std::unique_lock lock(mutex);
if (auto * async_task = context.tryPopAsyncTask())
{
context.setTask(async_task);
--num_waiting_async_tasks;
}
else if (!task_queue.empty())
context.setTask(task_queue.pop(context.thread_number));
if (context.hasTask())
{
if (!task_queue.empty() && !threads_queue.empty())
{
size_t next_thread = context.thread_number + 1 == num_threads ? 0 : (context.thread_number + 1);
auto thread_to_wake = task_queue.getAnyThreadWithTasks(next_thread);
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.popAny();
lock.unlock();
executor_contexts[thread_to_wake]->wakeUp();
}
return;
}
if (threads_queue.size() + 1 == num_threads && async_task_queue.empty() && num_waiting_async_tasks == 0)
{
lock.unlock();
finish();
return;
}
#if defined(OS_LINUX)
if (num_threads == 1)
{
/// If we execute in single thread, wait for async tasks here.
auto res = async_task_queue.wait(lock);
if (!res)
{
if (finished)
return;
throw Exception("Empty task was returned from async task queue", ErrorCodes::LOGICAL_ERROR);
}
context.setTask(static_cast<ExecutingGraph::Node *>(res.data));
return;
}
#endif
threads_queue.push(context.thread_number);
}
context.wait(finished);
}
void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context)
{
context.setTask(nullptr);
/// Take local task from queue if has one.
if (!queue.empty() && !context.hasAsyncTasks())
{
context.setTask(queue.front());
queue.pop();
}
if (!queue.empty() || !async_queue.empty())
{
std::unique_lock lock(mutex);
#if defined(OS_LINUX)
while (!async_queue.empty() && !finished)
{
int fd = async_queue.front()->processor->schedule();
async_task_queue.addTask(context.thread_number, async_queue.front(), fd);
async_queue.pop();
}
#endif
while (!queue.empty() && !finished)
{
task_queue.push(queue.front(), context.thread_number);
queue.pop();
}
if (!threads_queue.empty() && !task_queue.empty() && !finished)
{
size_t next_thread = context.thread_number + 1 == num_threads ? 0 : (context.thread_number + 1);
auto thread_to_wake = task_queue.getAnyThreadWithTasks(next_thread);
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.popAny();
lock.unlock();
executor_contexts[thread_to_wake]->wakeUp();
}
}
}
void ExecutorTasks::init(size_t num_threads_)
{
num_threads = num_threads_;
threads_queue.init(num_threads);
task_queue.init(num_threads);
{
std::lock_guard guard(executor_contexts_mutex);
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i));
}
}
void ExecutorTasks::fill(Queue & queue)
{
std::lock_guard lock(mutex);
size_t next_thread = 0;
while (!queue.empty())
{
task_queue.push(queue.front(), next_thread);
queue.pop();
++next_thread;
if (next_thread >= num_threads)
next_thread = 0;
}
}
void ExecutorTasks::processAsyncTasks()
{
#if defined(OS_LINUX)
{
/// Wait for async tasks.
std::unique_lock lock(mutex);
while (auto task = async_task_queue.wait(lock))
{
auto * node = static_cast<ExecutingGraph::Node *>(task.data);
executor_contexts[task.thread_num]->pushAsyncTask(node);
++num_waiting_async_tasks;
if (threads_queue.has(task.thread_num))
{
threads_queue.pop(task.thread_num);
executor_contexts[task.thread_num]->wakeUp();
}
}
}
#endif
}
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <Processors/Executors/ExecutionThreadContext.h>
#include <Processors/Executors/PollingQueue.h>
#include <Processors/Executors/ThreadsQueue.h>
#include <Processors/Executors/TasksQueue.h>
#include <stack>
namespace DB
{
/// Manage tasks which are ready for execution. Used in PipelineExecutor.
class ExecutorTasks
{
/// If query is finished (or cancelled).
std::atomic_bool finished = false;
/// Contexts for every executing thread.
std::vector<std::unique_ptr<ExecutionThreadContext>> executor_contexts;
/// This mutex protects only executor_contexts vector. Needed to avoid race between init() and finish().
std::mutex executor_contexts_mutex;
/// Common mutex for all the following fields.
std::mutex mutex;
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue<ExecutingGraph::Node> task_queue;
/// Queue which stores tasks where processors returned Async status after prepare.
/// If multiple threads are using, main thread will wait for async tasks.
/// For single thread, will wait for async tasks only when task_queue is empty.
PollingQueue async_task_queue;
size_t num_threads = 0;
/// This is the total number of waited async tasks which are not executed yet.
/// sum(executor_contexts[i].async_tasks.size())
size_t num_waiting_async_tasks = 0;
/// A set of currently waiting threads.
ThreadsQueue threads_queue;
public:
using Stack = std::stack<UInt64>;
using Queue = std::queue<ExecutingGraph::Node *>;
void finish();
bool isFinished() const { return finished; }
void rethrowFirstThreadException();
void tryGetTask(ExecutionThreadContext & context);
void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);
void init(size_t num_threads_);
void fill(Queue & queue);
void processAsyncTasks();
ExecutionThreadContext & getThreadContext(size_t thread_num) { return *executor_contexts[thread_num]; }
};
}

View File

@ -5,6 +5,7 @@
#include <Common/setThreadName.h>
#include <Common/MemoryTracker.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Executors/ExecutingGraph.h>
#include <QueryPipeline/printPipeline.h>
#include <Processors/ISource.h>
#include <Interpreters/ProcessList.h>
@ -21,26 +22,12 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int QUOTA_EXPIRED;
extern const int QUERY_WAS_CANCELLED;
}
static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception)
{
/// Don't add additional info to limits and quota exceptions, and in case of kill query (to pass tests).
return exception.code() != ErrorCodes::TOO_MANY_ROWS_OR_BYTES
&& exception.code() != ErrorCodes::QUOTA_EXPIRED
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
}
PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
: processors(processors_)
, cancelled(false)
, finished(false)
, num_processing_executors(0)
, expand_pipeline_task(nullptr)
, process_list_element(elem)
PipelineExecutor::PipelineExecutor(Processors & processors, QueryStatus * elem)
: process_list_element(elem)
{
try
{
@ -67,334 +54,21 @@ PipelineExecutor::~PipelineExecutor()
process_list_element->removePipelineExecutor(this);
}
void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
const Processors & PipelineExecutor::getProcessors() const
{
UInt64 num_processors = processors.size();
for (UInt64 proc = 0; proc < num_processors; ++proc)
{
if (graph->nodes[proc]->direct_edges.empty())
{
stack.push(proc);
/// do not lock mutex, as this function is executed in single thread
graph->nodes[proc]->status = ExecutingGraph::ExecStatus::Preparing;
}
}
}
static void executeJob(IProcessor * processor)
{
try
{
processor->work();
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("While executing " + processor->getName());
throw;
}
}
void PipelineExecutor::addJob(ExecutingGraph::Node * execution_state)
{
auto job = [execution_state]()
{
try
{
// Stopwatch watch;
executeJob(execution_state->processor);
// execution_state->execution_time_ns += watch.elapsed();
++execution_state->num_executed_jobs;
}
catch (...)
{
execution_state->exception = std::current_exception();
}
};
execution_state->job = std::move(job);
}
bool PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
{
auto & cur_node = *graph->nodes[pid];
Processors new_processors;
try
{
new_processors = cur_node.processor->expandPipeline();
}
catch (...)
{
cur_node.exception = std::current_exception();
return false;
}
{
std::lock_guard guard(processors_mutex);
processors.insert(processors.end(), new_processors.begin(), new_processors.end());
}
uint64_t num_processors = processors.size();
std::vector<uint64_t> back_edges_sizes(num_processors, 0);
std::vector<uint64_t> direct_edge_sizes(num_processors, 0);
for (uint64_t node = 0; node < graph->nodes.size(); ++node)
{
direct_edge_sizes[node] = graph->nodes[node]->direct_edges.size();
back_edges_sizes[node] = graph->nodes[node]->back_edges.size();
}
auto updated_nodes = graph->expandPipeline(processors);
for (auto updated_node : updated_nodes)
{
auto & node = *graph->nodes[updated_node];
size_t num_direct_edges = node.direct_edges.size();
size_t num_back_edges = node.back_edges.size();
std::lock_guard guard(node.status_mutex);
for (uint64_t edge = back_edges_sizes[updated_node]; edge < num_back_edges; ++edge)
node.updated_input_ports.emplace_back(edge);
for (uint64_t edge = direct_edge_sizes[updated_node]; edge < num_direct_edges; ++edge)
node.updated_output_ports.emplace_back(edge);
if (node.status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
stack.push(updated_node);
}
}
return true;
}
bool PipelineExecutor::tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, Queue & async_queue, size_t thread_number)
{
/// In this method we have ownership on edge, but node can be concurrently accessed.
auto & node = *graph->nodes[edge.to];
std::unique_lock lock(node.status_mutex);
ExecutingGraph::ExecStatus status = node.status;
if (status == ExecutingGraph::ExecStatus::Finished)
return true;
if (edge.backward)
node.updated_output_ports.push_back(edge.output_port_number);
else
node.updated_input_ports.push_back(edge.input_port_number);
if (status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
return prepareProcessor(edge.to, thread_number, queue, async_queue, std::move(lock));
}
else
graph->nodes[edge.to]->processor->onUpdatePorts();
return true;
}
bool PipelineExecutor::prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, Queue & async_queue, std::unique_lock<std::mutex> node_lock)
{
/// In this method we have ownership on node.
auto & node = *graph->nodes[pid];
bool need_expand_pipeline = false;
std::vector<ExecutingGraph::Edge *> updated_back_edges;
std::vector<ExecutingGraph::Edge *> updated_direct_edges;
{
#ifndef NDEBUG
Stopwatch watch;
#endif
std::unique_lock<std::mutex> lock(std::move(node_lock));
try
{
node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
}
catch (...)
{
node.exception = std::current_exception();
return false;
}
#ifndef NDEBUG
node.preparation_time_ns += watch.elapsed();
#endif
node.updated_input_ports.clear();
node.updated_output_ports.clear();
switch (node.last_processor_status)
{
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
node.status = ExecutingGraph::ExecStatus::Idle;
break;
}
case IProcessor::Status::Finished:
{
node.status = ExecutingGraph::ExecStatus::Finished;
break;
}
case IProcessor::Status::Ready:
{
node.status = ExecutingGraph::ExecStatus::Executing;
queue.push(&node);
break;
}
case IProcessor::Status::Async:
{
node.status = ExecutingGraph::ExecStatus::Executing;
async_queue.push(&node);
break;
}
case IProcessor::Status::ExpandPipeline:
{
need_expand_pipeline = true;
break;
}
}
{
for (auto & edge_id : node.post_updated_input_ports)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(edge_id);
updated_back_edges.emplace_back(edge);
edge->update_info.trigger();
}
for (auto & edge_id : node.post_updated_output_ports)
{
auto * edge = static_cast<ExecutingGraph::Edge *>(edge_id);
updated_direct_edges.emplace_back(edge);
edge->update_info.trigger();
}
node.post_updated_input_ports.clear();
node.post_updated_output_ports.clear();
}
}
{
for (auto & edge : updated_direct_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number))
return false;
}
for (auto & edge : updated_back_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number))
return false;
}
}
if (need_expand_pipeline)
{
Stack stack;
executor_contexts[thread_number]->task_list.emplace_back(&node, &stack);
ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back();
ExpandPipelineTask * expected = nullptr;
while (!expand_pipeline_task.compare_exchange_strong(expected, desired))
{
if (!doExpandPipeline(expected, true))
return false;
expected = nullptr;
}
if (!doExpandPipeline(desired, true))
return false;
/// Add itself back to be prepared again.
stack.push(pid);
while (!stack.empty())
{
auto item = stack.top();
if (!prepareProcessor(item, thread_number, queue, async_queue, std::unique_lock<std::mutex>(graph->nodes[item]->status_mutex)))
return false;
stack.pop();
}
}
return true;
}
bool PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task, bool processing)
{
std::unique_lock lock(task->mutex);
if (processing)
++task->num_waiting_processing_threads;
task->condvar.wait(lock, [&]()
{
return task->num_waiting_processing_threads >= num_processing_executors || expand_pipeline_task != task;
});
bool result = true;
/// After condvar.wait() task may point to trash. Can change it only if it is still in expand_pipeline_task.
if (expand_pipeline_task == task)
{
result = expandPipeline(*task->stack, task->node_to_expand->processors_id);
expand_pipeline_task = nullptr;
lock.unlock();
task->condvar.notify_all();
}
return result;
return graph->getProcessors();
}
void PipelineExecutor::cancel()
{
cancelled = true;
finish();
std::lock_guard guard(processors_mutex);
for (auto & processor : processors)
processor->cancel();
graph->cancel();
}
void PipelineExecutor::finish()
{
{
std::lock_guard lock(task_queue_mutex);
finished = true;
async_task_queue.finish();
}
std::lock_guard guard(executor_contexts_mutex);
for (auto & context : executor_contexts)
{
{
std::lock_guard lock(context->mutex);
context->wake_flag = true;
}
context->condvar.notify_one();
}
tasks.finish();
}
void PipelineExecutor::execute(size_t num_threads)
@ -412,9 +86,7 @@ void PipelineExecutor::execute(size_t num_threads)
std::rethrow_exception(node->exception);
/// Exception which happened in executing thread, but not at processor.
for (auto & executor_context : executor_contexts)
if (executor_context->exception)
std::rethrow_exception(executor_context->exception);
tasks.rethrowFirstThreadException();
}
catch (...)
{
@ -437,9 +109,9 @@ bool PipelineExecutor::executeStep(std::atomic_bool * yield_flag)
return true;
}
executeStepImpl(0, 1, yield_flag);
executeStepImpl(0, yield_flag);
if (!finished)
if (!tasks.isFinished())
return true;
/// Execution can be stopped because of exception. Check and rethrow if any.
@ -475,138 +147,47 @@ void PipelineExecutor::finalizeExecution()
throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
}
void PipelineExecutor::wakeUpExecutor(size_t thread_num)
void PipelineExecutor::executeSingleThread(size_t thread_num)
{
std::lock_guard guard(executor_contexts[thread_num]->mutex);
executor_contexts[thread_num]->wake_flag = true;
executor_contexts[thread_num]->condvar.notify_one();
}
void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads)
{
executeStepImpl(thread_num, num_threads);
executeStepImpl(thread_num);
#ifndef NDEBUG
auto & context = executor_contexts[thread_num];
LOG_TRACE(log, "Thread finished. Total time: {} sec. Execution time: {} sec. Processing time: {} sec. Wait time: {} sec.", (context->total_time_ns / 1e9), (context->execution_time_ns / 1e9), (context->processing_time_ns / 1e9), (context->wait_time_ns / 1e9));
auto & context = tasks.getThreadContext(thread_num);
LOG_TRACE(log,
"Thread finished. Total time: {} sec. Execution time: {} sec. Processing time: {} sec. Wait time: {} sec.",
(context.total_time_ns / 1e9),
(context.execution_time_ns / 1e9),
(context.processing_time_ns / 1e9),
(context.wait_time_ns / 1e9));
#endif
}
void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, std::atomic_bool * yield_flag)
void PipelineExecutor::executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag)
{
#ifndef NDEBUG
Stopwatch total_time_watch;
#endif
auto & context = executor_contexts[thread_num];
auto & node = context->node;
// auto & node = tasks.getNode(thread_num);
auto & context = tasks.getThreadContext(thread_num);
bool yield = false;
while (!finished && !yield)
while (!tasks.isFinished() && !yield)
{
/// First, find any processor to execute.
/// Just traverse graph and prepare any processor.
while (!finished && node == nullptr)
while (!tasks.isFinished() && !context.hasTask())
tasks.tryGetTask(context);
while (context.hasTask() && !yield)
{
{
std::unique_lock lock(task_queue_mutex);
if (!context->async_tasks.empty())
{
node = context->async_tasks.front();
context->async_tasks.pop();
--num_waiting_async_tasks;
if (context->async_tasks.empty())
context->has_async_tasks = false;
}
else if (!task_queue.empty())
node = task_queue.pop(thread_num);
if (node)
{
if (!task_queue.empty() && !threads_queue.empty())
{
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.popAny();
lock.unlock();
wakeUpExecutor(thread_to_wake);
}
break;
}
if (threads_queue.size() + 1 == num_threads && async_task_queue.empty() && num_waiting_async_tasks == 0)
{
lock.unlock();
finish();
break;
}
#if defined(OS_LINUX)
if (num_threads == 1)
{
/// If we execute in single thread, wait for async tasks here.
auto res = async_task_queue.wait(lock);
if (!res)
{
/// The query had been cancelled (finished is also set)
if (finished)
break;
throw Exception("Empty task was returned from async task queue", ErrorCodes::LOGICAL_ERROR);
}
node = static_cast<ExecutingGraph::Node *>(res.data);
break;
}
#endif
threads_queue.push(thread_num);
}
{
std::unique_lock lock(context->mutex);
context->condvar.wait(lock, [&]
{
return finished || context->wake_flag;
});
context->wake_flag = false;
}
}
if (finished)
break;
while (node && !yield)
{
if (finished)
if (tasks.isFinished())
break;
addJob(node);
{
#ifndef NDEBUG
Stopwatch execution_time_watch;
#endif
node->job();
#ifndef NDEBUG
context->execution_time_ns += execution_time_watch.elapsed();
#endif
}
if (node->exception)
if (!context.executeTask())
cancel();
if (finished)
if (tasks.isFinished())
break;
#ifndef NDEBUG
@ -618,67 +199,16 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
Queue queue;
Queue async_queue;
++num_processing_executors;
while (auto * task = expand_pipeline_task.load())
doExpandPipeline(task, true);
/// Prepare processor after execution.
{
auto lock = std::unique_lock<std::mutex>(node->status_mutex);
if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock)))
finish();
}
node = nullptr;
/// Take local task from queue if has one.
if (!queue.empty() && !context->has_async_tasks)
{
node = queue.front();
queue.pop();
}
if (!graph->updateNode(context.getProcessorID(), queue, async_queue))
finish();
/// Push other tasks to global queue.
if (!queue.empty() || !async_queue.empty())
{
std::unique_lock lock(task_queue_mutex);
#if defined(OS_LINUX)
while (!async_queue.empty() && !finished)
{
async_task_queue.addTask(thread_num, async_queue.front(), async_queue.front()->processor->schedule());
async_queue.pop();
}
#endif
while (!queue.empty() && !finished)
{
task_queue.push(queue.front(), thread_num);
queue.pop();
}
if (!threads_queue.empty() && !task_queue.empty() && !finished)
{
auto thread_to_wake = task_queue.getAnyThreadWithTasks(thread_num + 1 == num_threads ? 0 : (thread_num + 1));
if (threads_queue.has(thread_to_wake))
threads_queue.pop(thread_to_wake);
else
thread_to_wake = threads_queue.popAny();
lock.unlock();
wakeUpExecutor(thread_to_wake);
}
}
--num_processing_executors;
while (auto * task = expand_pipeline_task.load())
doExpandPipeline(task, false);
tasks.pushTasks(queue, async_queue, context);
}
#ifndef NDEBUG
context->processing_time_ns += processing_time_watch.elapsed();
context.processing_time_ns += processing_time_watch.elapsed();
#endif
/// We have executed single processor. Check if we need to yield execution.
@ -688,8 +218,8 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
}
#ifndef NDEBUG
context->total_time_ns += total_time_watch.elapsed();
context->wait_time_ns = context->total_time_ns - context->execution_time_ns - context->processing_time_ns;
context.total_time_ns += total_time_watch.elapsed();
context.wait_time_ns = context.total_time_ns - context.execution_time_ns - context.processing_time_ns;
#endif
}
@ -697,49 +227,11 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
{
is_execution_initialized = true;
threads_queue.init(num_threads);
task_queue.init(num_threads);
Queue queue;
graph->initializeExecution(queue);
{
std::lock_guard guard(executor_contexts_mutex);
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
}
Stack stack;
addChildlessProcessorsToStack(stack);
{
std::lock_guard lock(task_queue_mutex);
Queue queue;
Queue async_queue;
size_t next_thread = 0;
while (!stack.empty())
{
UInt64 proc = stack.top();
stack.pop();
prepareProcessor(proc, 0, queue, async_queue, std::unique_lock<std::mutex>(graph->nodes[proc]->status_mutex));
while (!queue.empty())
{
task_queue.push(queue.front(), next_thread);
queue.pop();
++next_thread;
if (next_thread >= num_threads)
next_thread = 0;
}
while (!async_queue.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Async is only possible after work() call. Processor {}",
async_queue.front()->processor->getName());
}
}
tasks.init(num_threads);
tasks.fill(queue);
}
void PipelineExecutor::executeImpl(size_t num_threads)
@ -771,7 +263,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
for (size_t i = 0; i < num_threads; ++i)
{
threads.emplace_back([this, thread_group, thread_num = i, num_threads]
threads.emplace_back([this, thread_group, thread_num = i]
{
/// ThreadStatus thread_status;
@ -787,43 +279,25 @@ void PipelineExecutor::executeImpl(size_t num_threads)
try
{
executeSingleThread(thread_num, num_threads);
executeSingleThread(thread_num);
}
catch (...)
{
/// In case of exception from executor itself, stop other threads.
finish();
executor_contexts[thread_num]->exception = std::current_exception();
tasks.getThreadContext(thread_num).setException(std::current_exception());
}
});
}
#if defined(OS_LINUX)
{
/// Wait for async tasks.
std::unique_lock lock(task_queue_mutex);
while (auto task = async_task_queue.wait(lock))
{
auto * node = static_cast<ExecutingGraph::Node *>(task.data);
executor_contexts[task.thread_num]->async_tasks.push(node);
executor_contexts[task.thread_num]->has_async_tasks = true;
++num_waiting_async_tasks;
if (threads_queue.has(task.thread_num))
{
threads_queue.pop(task.thread_num);
wakeUpExecutor(task.thread_num);
}
}
}
#endif
tasks.processAsyncTasks();
for (auto & thread : threads)
if (thread.joinable())
thread.join();
}
else
executeSingleThread(0, num_threads);
executeSingleThread(0);
finished_flag = true;
}
@ -858,7 +332,7 @@ String PipelineExecutor::dumpPipeline() const
}
WriteBufferFromOwnString out;
printPipeline(processors, statuses, out);
printPipeline(graph->getProcessors(), statuses, out);
out.finalize();
return out.str();

View File

@ -1,11 +1,7 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/Executors/PollingQueue.h>
#include <Processors/Executors/ThreadsQueue.h>
#include <Processors/Executors/TasksQueue.h>
#include <Processors/Executors/ExecutingGraph.h>
#include <Common/ThreadPool.h>
#include <Processors/Executors/ExecutorTasks.h>
#include <Common/EventCounter.h>
#include <base/logger_useful.h>
@ -30,7 +26,7 @@ public:
/// During pipeline execution new processors can appear. They will be added to existing set.
///
/// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(Processors & processors_, QueryStatus * elem = nullptr);
explicit PipelineExecutor(Processors & processors, QueryStatus * elem = nullptr);
~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once.
@ -42,121 +38,36 @@ public:
/// Return true if execution should be continued.
bool executeStep(std::atomic_bool * yield_flag = nullptr);
const Processors & getProcessors() const { return processors; }
const Processors & getProcessors() const;
/// Cancel execution. May be called from another thread.
void cancel();
private:
Processors & processors;
std::mutex processors_mutex;
ExecutingGraphPtr graph;
ExecutorTasks tasks;
using Stack = std::stack<UInt64>;
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.
/// Stores processors need to be prepared. Preparing status is already set for them.
TaskQueue<ExecutingGraph::Node> task_queue;
/// Queue which stores tasks where processors returned Async status after prepare.
/// If multiple threads are using, main thread will wait for async tasks.
/// For single thread, will wait for async tasks only when task_queue is empty.
PollingQueue async_task_queue;
size_t num_waiting_async_tasks = 0;
ThreadsQueue threads_queue;
std::mutex task_queue_mutex;
/// Flag that checks that initializeExecution was called.
bool is_execution_initialized = false;
std::atomic_bool cancelled;
std::atomic_bool finished;
std::atomic_bool cancelled = false;
Poco::Logger * log = &Poco::Logger::get("PipelineExecutor");
/// Things to stop execution to expand pipeline.
struct ExpandPipelineTask
{
ExecutingGraph::Node * node_to_expand;
Stack * stack;
size_t num_waiting_processing_threads = 0;
std::mutex mutex;
std::condition_variable condvar;
ExpandPipelineTask(ExecutingGraph::Node * node_to_expand_, Stack * stack_)
: node_to_expand(node_to_expand_), stack(stack_) {}
};
std::atomic<size_t> num_processing_executors;
std::atomic<ExpandPipelineTask *> expand_pipeline_task;
/// Context for each thread.
struct ExecutorContext
{
/// Will store context for all expand pipeline tasks (it's easy and we don't expect many).
/// This can be solved by using atomic shard ptr.
std::list<ExpandPipelineTask> task_list;
std::queue<ExecutingGraph::Node *> async_tasks;
std::atomic_bool has_async_tasks = false;
std::condition_variable condvar;
std::mutex mutex;
bool wake_flag = false;
/// Currently processing node.
ExecutingGraph::Node * node = nullptr;
/// Exception from executing thread itself.
std::exception_ptr exception;
#ifndef NDEBUG
/// Time for different processing stages.
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
UInt64 wait_time_ns = 0;
#endif
};
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
std::mutex executor_contexts_mutex;
/// Processor ptr -> node number
using ProcessorsMap = std::unordered_map<const IProcessor *, UInt64>;
ProcessorsMap processors_map;
/// Now it's used to check if query was killed.
QueryStatus * const process_list_element = nullptr;
/// Graph related methods.
bool expandPipeline(Stack & stack, UInt64 pid);
using Queue = std::queue<ExecutingGraph::Node *>;
/// Pipeline execution related methods.
void addChildlessProcessorsToStack(Stack & stack);
bool tryAddProcessorToStackIfUpdated(ExecutingGraph::Edge & edge, Queue & queue, Queue & async_queue, size_t thread_number);
static void addJob(ExecutingGraph::Node * execution_state);
// TODO: void addAsyncJob(UInt64 pid);
/// Prepare processor with pid number.
/// Check parents and children of current processor and push them to stacks if they also need to be prepared.
/// If processor wants to be expanded, ExpandPipelineTask from thread_number's execution context will be used.
bool prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, Queue & async_queue, std::unique_lock<std::mutex> node_lock);
bool doExpandPipeline(ExpandPipelineTask * task, bool processing);
/// Continue executor (in case there are tasks in queue).
void wakeUpExecutor(size_t thread_num);
void initializeExecution(size_t num_threads); /// Initialize executor contexts and task_queue.
void finalizeExecution(); /// Check all processors are finished.
/// Methods connected to execution.
void executeImpl(size_t num_threads);
void executeStepImpl(size_t thread_num, size_t num_threads, std::atomic_bool * yield_flag = nullptr);
void executeSingleThread(size_t thread_num, size_t num_threads);
void executeStepImpl(size_t thread_num, std::atomic_bool * yield_flag = nullptr);
void executeSingleThread(size_t thread_num);
void finish();
String dumpPipeline() const;

View File

@ -4,8 +4,10 @@
#include <QueryPipeline/QueryPipeline.h>
#include <iostream>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
#include <base/scope_guard_safe.h>
#include <Poco/Event.h>
namespace DB
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <vector>
#include <queue>
#include <Common/Exception.h>
namespace DB
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/Exception.h>
#include <base/defines.h>
namespace DB
{
namespace ErrorCodes

View File

@ -0,0 +1,175 @@
#pragma once
#include <atomic>
#include <cassert>
#include <list>
#include <mutex>
#include <condition_variable>
namespace DB
{
/// RWLock which allows to upgrade read lock to write lock.
/// Read locks should be fast if there is no write lock.
///
/// Newly created write lock waits for all active read locks.
/// Newly created read lock waits for all write locks. Starvation is possible.
///
/// Mutex must live longer than locks.
/// Read lock must live longer than corresponding write lock.
///
/// For every write lock, a new internal state is created inside mutex.
/// This state is not deallocated until the destruction of mutex itself.
///
/// Usage example:
///
/// UpgradableMutex mutex;
/// {
/// UpgradableMutex::ReadLock read_lock(mutex);
/// ...
/// {
/// UpgradableMutex::WriteLock write_lock(read_lock);
/// ...
/// }
/// ...
/// }
class UpgradableMutex
{
private:
/// Implementation idea
///
/// ----------- (read scope)
/// ++num_readers
/// ** wait for active writer (in loop, starvation is possible here) **
///
/// =========== (write scope)
/// ** create new State **
/// ** wait for active writer (in loop, starvation is possible here) **
/// ** wait for all active readers **
///
/// ** notify all waiting readers for the current state.
/// =========== (end write scope)
///
/// --num_readers
/// ** notify current active writer **
/// ----------- (end read scope)
struct State
{
size_t num_waiting = 0;
bool is_done = false;
std::mutex mutex;
std::condition_variable read_condvar;
std::condition_variable write_condvar;
void wait() noexcept
{
std::unique_lock lock(mutex);
++num_waiting;
write_condvar.notify_one();
while (!is_done)
read_condvar.wait(lock);
}
void lock(std::atomic_size_t & num_readers_) noexcept
{
/// Note : num_locked is an atomic
/// which can change it's value without locked mutex.
/// We support an invariant that after changing num_locked value,
/// UpgradableMutex::write_state is checked, and in case of active
/// write lock, we always notify it's write condvar.
std::unique_lock lock(mutex);
++num_waiting;
while (num_waiting < num_readers_.load())
write_condvar.wait(lock);
}
void unlock() noexcept
{
{
std::unique_lock lock(mutex);
is_done = true;
}
read_condvar.notify_all();
}
};
std::atomic_size_t num_readers = 0;
std::list<State> states;
std::mutex states_mutex;
std::atomic<State *> write_state{nullptr};
void lock() noexcept
{
++num_readers;
while (auto * state = write_state.load())
state->wait();
}
void unlock() noexcept
{
--num_readers;
while (auto * state = write_state.load())
state->write_condvar.notify_one();
}
State * allocState()
{
std::lock_guard guard(states_mutex);
return &states.emplace_back();
}
void upgrade(State & state) noexcept
{
State * expected = nullptr;
/// Only change nullptr -> state is possible.
while (!write_state.compare_exchange_strong(expected, &state))
{
expected->wait();
expected = nullptr;
}
state.lock(num_readers);
}
void degrade(State & state) noexcept
{
State * my = write_state.exchange(nullptr);
if (&state != my)
std::terminate();
state.unlock();
}
public:
class ReadGuard
{
public:
explicit ReadGuard(UpgradableMutex & lock_) : lock(lock_) { lock.lock(); }
~ReadGuard() { lock.unlock(); }
UpgradableMutex & lock;
};
class WriteGuard
{
public:
explicit WriteGuard(ReadGuard & read_guard_) : read_guard(read_guard_)
{
state = read_guard.lock.allocState();
read_guard.lock.upgrade(*state);
}
~WriteGuard()
{
if (state)
read_guard.lock.degrade(*state);
}
private:
ReadGuard & read_guard;
State * state = nullptr;
};
};
}

View File

@ -1,11 +1,11 @@
#pragma once
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/IProcessor.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
@ -19,6 +19,9 @@ using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams
class QueryPlan;
class PipelineExecutor;
using PipelineExecutorPtr = std::shared_ptr<PipelineExecutor>;
struct SubqueryForSet;
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;
@ -122,7 +125,7 @@ public:
const Block & getHeader() const { return pipe.getHeader(); }
void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); }
void addInterpreterContext(std::shared_ptr<const Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addInterpreterContext(ContextPtr context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
void addQueryPlan(std::unique_ptr<QueryPlan> plan);
void setLimits(const StreamLocalLimits & limits) { pipe.setLimits(limits); }

View File

@ -28,6 +28,7 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sinks/SinkToStorage.h>
@ -39,6 +40,7 @@
#include <Poco/StreamCopier.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <base/range.h>
#include <base/logger_useful.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>

View File

@ -20,6 +20,7 @@
#include <regex>
#include <Common/setThreadName.h>
#include <Core/MySQL/Authentication.h>
#include <base/logger_useful.h>
#include <Common/config_version.h>

View File

@ -19,6 +19,7 @@ limitations under the License. */
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <base/logger_useful.h>