ClickHouse/dbms/src/Processors/Executors/PipelineExecutor.cpp

721 lines
20 KiB
C++
Raw Normal View History

2019-02-06 16:31:18 +00:00
#include <Processors/Executors/PipelineExecutor.h>
#include <unordered_map>
#include <queue>
2019-02-08 16:10:57 +00:00
#include <IO/WriteBufferFromString.h>
#include <Processors/printPipeline.h>
#include <Common/EventCounter.h>
2019-04-05 10:52:07 +00:00
#include <ext/scope_guard.h>
#include <Common/CurrentThread.h>
2019-02-06 16:31:18 +00:00
2019-05-16 14:57:27 +00:00
#include <boost/lockfree/queue.hpp>
#include <Common/Stopwatch.h>
2019-02-06 16:31:18 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int QUOTA_EXPIRED;
extern const int QUERY_WAS_CANCELLED;
}
static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception)
{
/// Don't add additional info to limits and quota exceptions, and in case of kill query (to pass tests).
return exception.code() != ErrorCodes::TOO_MANY_ROWS_OR_BYTES
&& exception.code() != ErrorCodes::QUOTA_EXPIRED
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
}
2019-08-03 11:02:40 +00:00
PipelineExecutor::PipelineExecutor(Processors & processors_)
: processors(processors_)
2019-06-26 15:57:40 +00:00
, cancelled(false)
, finished(false)
, num_processing_executors(0)
, expand_pipeline_task(nullptr)
2019-02-06 16:31:18 +00:00
{
2019-02-08 16:10:57 +00:00
buildGraph();
2019-02-06 16:31:18 +00:00
}
bool PipelineExecutor::addEdges(UInt64 node)
2019-02-06 16:31:18 +00:00
{
auto throwUnknownProcessor = [](const IProcessor * proc, const IProcessor * parent, bool from_input_port)
{
String msg = "Processor " + proc->getName() + " was found as " + (from_input_port ? "input" : "output")
+ " for processor " + parent->getName() + ", but not found in list of processors.";
2019-02-06 16:31:18 +00:00
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
};
const IProcessor * cur = graph[node].processor;
2019-02-06 16:31:18 +00:00
auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges)
2019-02-06 16:31:18 +00:00
{
auto it = processors_map.find(to_proc);
if (it == processors_map.end())
throwUnknownProcessor(to_proc, cur, true);
2019-02-06 16:31:18 +00:00
UInt64 proc_num = it->second;
Edge * edge_ptr = nullptr;
2019-02-06 16:31:18 +00:00
for (auto & edge : edges)
if (edge.to == proc_num)
edge_ptr = &edge;
2019-02-06 16:31:18 +00:00
if (!edge_ptr)
2019-02-06 16:31:18 +00:00
{
edge_ptr = &edges.emplace_back();
edge_ptr->to = proc_num;
}
2019-02-06 16:31:18 +00:00
from_port.setVersion(&edge_ptr->version);
};
2019-02-06 16:31:18 +00:00
bool was_edge_added = false;
auto & inputs = processors[node]->getInputs();
auto from_input = graph[node].backEdges.size();
if (from_input < inputs.size())
{
was_edge_added = true;
for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it)
{
const IProcessor * proc = &it->getOutputPort().getProcessor();
add_edge(*it, proc, graph[node].backEdges);
}
}
2019-02-08 16:10:57 +00:00
auto & outputs = processors[node]->getOutputs();
auto from_output = graph[node].directEdges.size();
if (from_output < outputs.size())
{
was_edge_added = true;
for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it)
{
const IProcessor * proc = &it->getInputPort().getProcessor();
add_edge(*it, proc, graph[node].directEdges);
}
}
return was_edge_added;
}
2019-02-06 16:31:18 +00:00
void PipelineExecutor::buildGraph()
{
UInt64 num_processors = processors.size();
2019-02-08 16:10:57 +00:00
2019-06-18 13:08:22 +00:00
graph.reserve(num_processors);
for (UInt64 node = 0; node < num_processors; ++node)
{
IProcessor * proc = processors[node].get();
processors_map[proc] = node;
2019-06-18 13:08:22 +00:00
graph.emplace_back(proc, node);
2019-02-06 16:31:18 +00:00
}
for (UInt64 node = 0; node < num_processors; ++node)
addEdges(node);
2019-02-06 16:31:18 +00:00
}
2019-06-26 15:57:40 +00:00
void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
2019-02-08 16:10:57 +00:00
{
UInt64 num_processors = processors.size();
for (UInt64 proc = 0; proc < num_processors; ++proc)
{
if (graph[proc].directEdges.empty())
{
2019-06-19 18:30:02 +00:00
stack.push(proc);
2019-02-08 16:10:57 +00:00
graph[proc].status = ExecStatus::Preparing;
}
}
}
2019-09-06 13:28:49 +00:00
static void executeJob(IProcessor * processor)
{
try
{
2019-09-06 13:28:49 +00:00
processor->work();
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("While executing " + processor->getName() + " ("
+ toString(reinterpret_cast<std::uintptr_t>(processor)) + ") ");
throw;
}
}
2019-06-18 13:08:22 +00:00
void PipelineExecutor::addJob(ExecutionState * execution_state)
2019-02-08 16:10:57 +00:00
{
2019-09-06 13:28:49 +00:00
auto job = [execution_state]()
2019-02-08 16:10:57 +00:00
{
2019-06-26 15:57:40 +00:00
try
2019-05-16 14:57:27 +00:00
{
// Stopwatch watch;
2019-09-06 13:28:49 +00:00
executeJob(execution_state->processor);
// execution_state->execution_time_ns += watch.elapsed();
2019-05-16 14:57:27 +00:00
2019-06-26 15:57:40 +00:00
++execution_state->num_executed_jobs;
}
catch (...)
{
execution_state->exception = std::current_exception();
}
};
2019-02-08 16:10:57 +00:00
2019-06-26 15:57:40 +00:00
execution_state->job = std::move(job);
2019-02-08 16:10:57 +00:00
}
2019-06-19 18:30:02 +00:00
void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
{
auto & cur_node = graph[pid];
auto new_processors = cur_node.processor->expandPipeline();
for (const auto & processor : new_processors)
{
if (processors_map.count(processor.get()))
throw Exception("Processor " + processor->getName() + " was already added to pipeline.",
ErrorCodes::LOGICAL_ERROR);
processors_map[processor.get()] = graph.size();
2019-06-18 13:08:22 +00:00
graph.emplace_back(processor.get(), graph.size());
}
{
std::lock_guard guard(processors_mutex);
processors.insert(processors.end(), new_processors.begin(), new_processors.end());
}
UInt64 num_processors = processors.size();
for (UInt64 node = 0; node < num_processors; ++node)
{
if (addEdges(node))
{
if (graph[node].status == ExecStatus::Idle || graph[node].status == ExecStatus::New)
{
graph[node].status = ExecStatus::Preparing;
2019-06-19 18:30:02 +00:00
stack.push(node);
}
}
}
}
2019-06-26 15:57:40 +00:00
bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stack)
{
2019-06-19 18:30:02 +00:00
/// In this method we have ownership on edge, but node can be concurrently accessed.
auto & node = graph[edge.to];
2019-06-19 18:30:02 +00:00
ExecStatus status = node.status.load();
/// Don't add processor if nothing was read from port.
2019-06-19 18:30:02 +00:00
if (status != ExecStatus::New && edge.version == edge.prev_version)
return false;
if (status == ExecStatus::Finished)
return false;
2019-06-19 18:30:02 +00:00
/// Signal that node need to be prepared.
node.need_to_be_prepared = true;
edge.prev_version = edge.version;
2019-06-19 18:30:02 +00:00
/// Try to get ownership for node.
2019-06-19 18:30:02 +00:00
/// Assume that current status is New or Idle. Otherwise, can't prepare node.
if (status != ExecStatus::New)
status = ExecStatus::Idle;
2019-06-19 18:30:02 +00:00
/// Statuses but New and Idle are not interesting because they own node.
/// Prepare will be called in owning thread before changing status.
while (!node.status.compare_exchange_weak(status, ExecStatus::Preparing))
if (!(status == ExecStatus::New || status == ExecStatus::Idle) || !node.need_to_be_prepared)
return false;
stack.push(edge.to);
return true;
}
2019-07-10 06:52:44 +00:00
bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async)
2019-02-06 16:31:18 +00:00
{
2019-06-19 18:30:02 +00:00
/// In this method we have ownership on node.
2019-02-06 16:31:18 +00:00
auto & node = graph[pid];
2019-06-17 09:24:56 +00:00
{
2019-06-19 18:30:02 +00:00
/// Stopwatch watch;
/// Disable flag before prepare call. Otherwise, we can skip prepare request.
/// Prepare can be called more times than needed, but it's ok.
node.need_to_be_prepared = false;
2019-06-17 09:24:56 +00:00
auto status = node.processor->prepare();
2019-06-19 18:30:02 +00:00
/// node.execution_state->preparation_time_ns += watch.elapsed();
2019-06-17 09:24:56 +00:00
node.last_processor_status = status;
}
2019-02-08 16:10:57 +00:00
2019-06-26 15:57:40 +00:00
auto add_neighbours_to_prepare_queue = [&] ()
2019-02-08 16:10:57 +00:00
{
for (auto & edge : node.backEdges)
2019-07-10 06:52:44 +00:00
tryAddProcessorToStackIfUpdated(edge, parents);
for (auto & edge : node.directEdges)
2019-07-10 06:52:44 +00:00
tryAddProcessorToStackIfUpdated(edge, children);
2019-06-19 18:30:02 +00:00
};
auto try_release_ownership = [&] ()
{
2019-06-26 15:57:40 +00:00
/// This function can be called after expand pipeline, where node from outer scope is not longer valid.
auto & node_ = graph[pid];
2019-06-19 18:30:02 +00:00
ExecStatus expected = ExecStatus::Idle;
2019-06-26 15:57:40 +00:00
node_.status = ExecStatus::Idle;
2019-06-19 18:30:02 +00:00
2019-06-26 15:57:40 +00:00
if (node_.need_to_be_prepared)
2019-06-19 18:30:02 +00:00
{
2019-06-26 15:57:40 +00:00
while (!node_.status.compare_exchange_weak(expected, ExecStatus::Preparing))
if (!(expected == ExecStatus::Idle) || !node_.need_to_be_prepared)
2019-06-19 18:30:02 +00:00
return;
2019-07-10 06:52:44 +00:00
children.push(pid);
2019-06-19 18:30:02 +00:00
}
2019-02-08 16:10:57 +00:00
};
2019-02-06 16:31:18 +00:00
2019-06-17 09:24:56 +00:00
switch (node.last_processor_status)
2019-02-06 16:31:18 +00:00
{
case IProcessor::Status::NeedData:
case IProcessor::Status::PortFull:
{
2019-02-08 16:10:57 +00:00
add_neighbours_to_prepare_queue();
2019-06-19 18:30:02 +00:00
try_release_ownership();
2019-02-06 16:31:18 +00:00
break;
}
case IProcessor::Status::Finished:
{
2019-02-08 16:10:57 +00:00
add_neighbours_to_prepare_queue();
2019-02-06 16:31:18 +00:00
node.status = ExecStatus::Finished;
break;
}
case IProcessor::Status::Ready:
{
2019-02-08 16:10:57 +00:00
node.status = ExecStatus::Executing;
2019-06-24 15:14:58 +00:00
return true;
2019-02-06 16:31:18 +00:00
}
case IProcessor::Status::Async:
{
2019-06-19 18:30:02 +00:00
throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
// node.status = ExecStatus::Executing;
// addAsyncJob(pid);
// break;
2019-02-06 16:31:18 +00:00
}
case IProcessor::Status::Wait:
{
2019-02-08 16:10:57 +00:00
if (!async)
throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
break;
}
case IProcessor::Status::ExpandPipeline:
{
executor_contexts[thread_number]->task_list.emplace_back(
node.execution_state.get(),
2019-07-10 06:52:44 +00:00
&parents
);
2019-03-15 17:06:32 +00:00
ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back();
ExpandPipelineTask * expected = nullptr;
2019-06-19 18:30:02 +00:00
while (!expand_pipeline_task.compare_exchange_strong(expected, desired))
2019-06-19 18:30:02 +00:00
{
2019-07-07 14:57:50 +00:00
doExpandPipeline(expected, true);
2019-06-19 18:30:02 +00:00
expected = nullptr;
}
2019-07-07 14:57:50 +00:00
doExpandPipeline(desired, true);
2019-06-19 18:30:02 +00:00
2019-06-26 15:57:40 +00:00
/// node is not longer valid after pipeline was expanded
graph[pid].need_to_be_prepared = true;
2019-06-19 18:30:02 +00:00
try_release_ownership();
break;
2019-02-06 16:31:18 +00:00
}
}
2019-06-24 15:14:58 +00:00
return false;
2019-02-06 16:31:18 +00:00
}
2019-07-07 14:57:50 +00:00
void PipelineExecutor::doExpandPipeline(ExpandPipelineTask * task, bool processing)
{
std::unique_lock lock(task->mutex);
2019-07-07 14:57:50 +00:00
if (processing)
++task->num_waiting_processing_threads;
task->condvar.wait(lock, [&]()
2019-06-19 18:30:02 +00:00
{
2019-07-07 14:57:50 +00:00
return task->num_waiting_processing_threads >= num_processing_executors || expand_pipeline_task != task;
2019-06-19 18:30:02 +00:00
});
/// After condvar.wait() task may point to trash. Can change it only if it is still in expand_pipeline_task.
if (expand_pipeline_task == task)
2019-02-06 16:31:18 +00:00
{
expandPipeline(*task->stack, task->node_to_expand->processors_id);
2019-06-26 15:57:40 +00:00
expand_pipeline_task = nullptr;
2019-06-26 15:57:40 +00:00
2019-06-19 18:30:02 +00:00
lock.unlock();
task->condvar.notify_all();
2019-02-06 16:31:18 +00:00
}
}
void PipelineExecutor::cancel()
{
cancelled = true;
finish();
std::lock_guard guard(processors_mutex);
for (auto & processor : processors)
processor->cancel();
}
2019-06-25 17:19:32 +00:00
void PipelineExecutor::finish()
2019-06-24 15:14:58 +00:00
{
2019-06-26 15:57:40 +00:00
{
2019-07-07 13:14:02 +00:00
std::lock_guard lock(task_queue_mutex);
finished = true;
2019-06-26 15:57:40 +00:00
}
2019-07-07 13:14:02 +00:00
2019-09-04 12:34:15 +00:00
for (auto & context : executor_contexts)
{
{
std::lock_guard lock(context->mutex);
context->wake_flag = true;
}
context->condvar.notify_one();
}
2019-06-24 15:14:58 +00:00
}
2019-05-16 14:57:27 +00:00
void PipelineExecutor::execute(size_t num_threads)
2019-02-06 16:31:18 +00:00
{
try
{
2019-05-16 14:57:27 +00:00
executeImpl(num_threads);
2019-06-26 15:57:40 +00:00
/// Execution can be stopped because of exception. Check and rethrow if any.
for (auto & node : graph)
if (node.execution_state->exception)
std::rethrow_exception(node.execution_state->exception);
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("\nCurrent state:\n" + dumpPipeline());
throw;
}
if (cancelled)
return;
bool all_processors_finished = true;
for (auto & node : graph)
if (node.status != ExecStatus::Finished)
all_processors_finished = false;
if (!all_processors_finished)
throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
}
2019-07-07 13:14:02 +00:00
void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads)
{
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
UInt64 wait_time_ns = 0;
2019-05-16 14:57:27 +00:00
Stopwatch total_time_watch;
2019-06-24 15:14:58 +00:00
ExecutionState * state = nullptr;
2019-06-19 18:30:02 +00:00
2019-07-10 06:52:44 +00:00
auto prepare_processor = [&](UInt64 pid, Stack & children, Stack & parents)
2019-06-19 18:30:02 +00:00
{
try
{
2019-07-10 06:52:44 +00:00
return prepareProcessor(pid, children, parents, thread_num, false);
2019-06-19 18:30:02 +00:00
}
catch (...)
{
graph[pid].execution_state->exception = std::current_exception();
2019-06-25 17:19:32 +00:00
finish();
2019-06-19 18:30:02 +00:00
}
2019-06-24 15:14:58 +00:00
return false;
2019-06-19 18:30:02 +00:00
};
2019-09-03 08:42:26 +00:00
using Queue = std::queue<ExecutionState *>;
2019-07-10 06:52:44 +00:00
2019-07-11 06:50:44 +00:00
auto prepare_all_processors = [&](Queue & queue, Stack & stack, Stack & children, Stack & parents)
2019-07-10 06:52:44 +00:00
{
while (!stack.empty() && !finished)
{
auto current_processor = stack.top();
stack.pop();
if (prepare_processor(current_processor, children, parents))
2019-09-03 08:42:26 +00:00
queue.push(graph[current_processor].execution_state.get());
2019-07-10 06:52:44 +00:00
}
};
while (!finished)
{
2019-09-04 12:34:15 +00:00
bool first = true;
2019-06-18 13:08:22 +00:00
/// First, find any processor to execute.
/// Just travers graph and prepare any processor.
2019-09-04 12:34:15 +00:00
while (!finished)
2019-05-16 14:57:27 +00:00
{
{
2019-09-04 12:34:15 +00:00
std::unique_lock lock(task_queue_mutex);
if (!first)
--num_waiting_threads;
first = false;
2019-09-04 09:37:56 +00:00
if (!task_queue.empty())
{
2019-09-06 13:28:49 +00:00
state = task_queue.front();
task_queue.pop();
2019-09-03 11:15:37 +00:00
2019-09-04 13:17:50 +00:00
if (!task_queue.empty() && !threads_queue.empty())
2019-09-04 12:34:15 +00:00
{
2019-09-06 16:04:18 +00:00
auto thread_to_wake = threads_queue.top();
2019-09-04 12:34:15 +00:00
threads_queue.pop();
lock.unlock();
std::lock_guard guard(executor_contexts[thread_to_wake]->mutex);
executor_contexts[thread_to_wake]->wake_flag = true;
executor_contexts[thread_to_wake]->condvar.notify_one();
}
2019-09-03 12:06:45 +00:00
2019-09-04 09:37:56 +00:00
break;
}
2019-06-19 18:30:02 +00:00
2019-09-04 09:37:56 +00:00
++num_waiting_threads;
2019-09-03 12:06:45 +00:00
2019-09-04 09:37:56 +00:00
if (num_waiting_threads == num_threads)
{
lock.unlock();
2019-09-04 12:34:15 +00:00
finish();
2019-09-04 09:37:56 +00:00
break;
}
2019-09-04 12:34:15 +00:00
threads_queue.push(thread_num);
}
{
std::unique_lock lock(executor_contexts[thread_num]->mutex);
executor_contexts[thread_num]->condvar.wait(lock, [&]
2019-09-04 09:37:56 +00:00
{
2019-09-04 12:34:15 +00:00
return finished || executor_contexts[thread_num]->wake_flag;
2019-09-04 09:37:56 +00:00
});
2019-09-04 12:34:15 +00:00
executor_contexts[thread_num]->wake_flag = false;
2019-09-04 09:37:56 +00:00
}
2019-05-16 14:57:27 +00:00
}
if (finished)
2019-06-17 09:24:56 +00:00
break;
2019-06-18 13:08:22 +00:00
while (state)
2019-02-06 16:31:18 +00:00
{
if (finished)
break;
2019-02-08 16:10:57 +00:00
2019-07-10 06:52:44 +00:00
addJob(state);
{
// Stopwatch execution_time_watch;
2019-09-06 13:28:49 +00:00
state->job();
// execution_time_ns += execution_time_watch.elapsed();
2019-07-10 06:52:44 +00:00
}
if (state->exception)
finish();
if (finished)
break;
// Stopwatch processing_time_watch;
/// Try to execute neighbour processor.
{
2019-07-10 06:52:44 +00:00
Stack children;
Stack parents;
2019-09-03 18:05:44 +00:00
Queue queue;
2019-06-19 18:30:02 +00:00
++num_processing_executors;
while (auto task = expand_pipeline_task.load())
2019-07-07 14:57:50 +00:00
doExpandPipeline(task, true);
2019-06-19 18:30:02 +00:00
/// Execute again if can.
2019-07-10 06:52:44 +00:00
if (!prepare_processor(state->processors_id, children, parents))
2019-06-18 13:08:22 +00:00
state = nullptr;
2019-06-17 09:24:56 +00:00
/// Process all neighbours. Children will be on the top of stack, then parents.
2019-07-11 06:50:44 +00:00
prepare_all_processors(queue, children, children, parents);
2019-09-06 13:28:49 +00:00
if (!state && !queue.empty())
{
state = queue.front();
queue.pop();
}
2019-07-11 06:50:44 +00:00
prepare_all_processors(queue, parents, parents, parents);
2019-07-10 06:52:44 +00:00
2019-09-03 08:42:26 +00:00
if (!queue.empty())
2019-06-17 09:24:56 +00:00
{
2019-09-04 12:34:15 +00:00
std::unique_lock lock(task_queue_mutex);
2019-09-02 15:49:18 +00:00
2019-09-03 18:05:44 +00:00
while (!queue.empty() && !finished)
{
task_queue.push(queue.front());
2019-09-03 08:42:26 +00:00
queue.pop();
2019-09-03 18:05:44 +00:00
}
2019-09-04 12:34:15 +00:00
if (!threads_queue.empty())
{
2019-09-06 16:04:18 +00:00
auto thread_to_wake = threads_queue.top();
2019-09-04 12:34:15 +00:00
threads_queue.pop();
lock.unlock();
std::lock_guard guard(executor_contexts[thread_to_wake]->mutex);
executor_contexts[thread_to_wake]->wake_flag = true;
executor_contexts[thread_to_wake]->condvar.notify_one();
}
2019-09-03 18:05:44 +00:00
}
2019-09-03 11:15:37 +00:00
--num_processing_executors;
while (auto task = expand_pipeline_task.load())
2019-07-07 14:57:50 +00:00
doExpandPipeline(task, false);
2019-02-08 16:10:57 +00:00
}
// processing_time_ns += processing_time_watch.elapsed();
2019-02-06 16:31:18 +00:00
}
}
total_time_ns = total_time_watch.elapsed();
wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns;
LOG_TRACE(log, "Thread finished."
<< " Total time: " << (total_time_ns / 1e9) << " sec."
<< " Execution time: " << (execution_time_ns / 1e9) << " sec."
<< " Processing time: " << (processing_time_ns / 1e9) << " sec."
<< " Wait time: " << (wait_time_ns / 1e9) << "sec.");
}
void PipelineExecutor::executeImpl(size_t num_threads)
{
2019-06-19 18:30:02 +00:00
Stack stack;
2019-07-10 06:52:44 +00:00
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
auto thread_group = CurrentThread::getGroup();
using ThreadsData = std::vector<ThreadFromGlobalPool>;
ThreadsData threads;
threads.reserve(num_threads);
bool finished_flag = false;
SCOPE_EXIT(
if (!finished_flag)
{
finish();
for (auto & thread : threads)
if (thread.joinable())
thread.join();
}
);
2019-06-26 15:57:40 +00:00
addChildlessProcessorsToStack(stack);
2019-06-19 18:30:02 +00:00
2019-06-18 13:08:22 +00:00
{
std::lock_guard lock(task_queue_mutex);
2019-06-18 13:08:22 +00:00
while (!stack.empty())
2019-07-10 06:52:44 +00:00
{
UInt64 proc = stack.top();
stack.pop();
if (prepareProcessor(proc, stack, stack, 0, false))
{
auto cur_state = graph[proc].execution_state.get();
2019-09-03 18:05:44 +00:00
task_queue.push(cur_state);
}
2019-07-10 06:52:44 +00:00
}
2019-06-18 13:08:22 +00:00
}
2019-06-26 15:57:40 +00:00
for (size_t i = 0; i < num_threads; ++i)
{
threads.emplace_back([this, thread_group, thread_num = i, num_threads]
{
2019-07-08 11:55:25 +00:00
/// ThreadStatus thread_status;
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
2019-06-24 15:14:58 +00:00
executeSingleThread(thread_num, num_threads);
});
}
for (auto & thread : threads)
if (thread.joinable())
thread.join();
finished_flag = true;
}
2019-02-06 16:31:18 +00:00
String PipelineExecutor::dumpPipeline() const
{
2019-05-16 14:57:27 +00:00
for (auto & node : graph)
{
if (node.execution_state)
node.processor->setDescription(
2019-06-17 09:24:56 +00:00
"(" + std::to_string(node.execution_state->num_executed_jobs) + " jobs, execution time: "
+ std::to_string(node.execution_state->execution_time_ns / 1e9) + " sec., preparation time: "
+ std::to_string(node.execution_state->preparation_time_ns / 1e9) + " sec.)");
2019-05-16 14:57:27 +00:00
}
std::vector<IProcessor::Status> statuses;
std::vector<IProcessor *> proc_list;
statuses.reserve(graph.size());
proc_list.reserve(graph.size());
2019-02-06 16:31:18 +00:00
for (auto & proc : graph)
2019-02-06 16:31:18 +00:00
{
proc_list.emplace_back(proc.processor);
statuses.emplace_back(proc.last_processor_status);
}
2019-02-06 16:31:18 +00:00
WriteBufferFromOwnString out;
printPipeline(processors, statuses, out);
out.finish();
2019-02-08 16:10:57 +00:00
return out.str();
2019-02-06 16:31:18 +00:00
}
}