ClickHouse/dbms/src/Processors/Executors/PipelineExecutor.cpp

529 lines
15 KiB
C++
Raw Normal View History

2019-02-06 16:31:18 +00:00
#include <Processors/Executors/PipelineExecutor.h>
#include <unordered_map>
#include <queue>
2019-02-08 16:10:57 +00:00
#include <IO/WriteBufferFromString.h>
#include <Processors/printPipeline.h>
#include <Common/EventCounter.h>
2019-04-05 10:52:07 +00:00
#include <ext/scope_guard.h>
#include <Common/CurrentThread.h>
2019-02-06 16:31:18 +00:00
2019-05-16 14:57:27 +00:00
#include <boost/lockfree/queue.hpp>
#include <Common/Stopwatch.h>
2019-02-06 16:31:18 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int QUOTA_EXPIRED;
extern const int QUERY_WAS_CANCELLED;
}
static bool checkCanAddAdditionalInfoToException(const DB::Exception & exception)
{
/// Don't add additional info to limits and quota exceptions, and in case of kill query (to pass tests).
return exception.code() != ErrorCodes::TOO_MANY_ROWS_OR_BYTES
&& exception.code() != ErrorCodes::QUOTA_EXPIRED
&& exception.code() != ErrorCodes::QUERY_WAS_CANCELLED;
}
PipelineExecutor::PipelineExecutor(Processors processors)
2019-05-16 14:57:27 +00:00
: processors(std::move(processors)), cancelled(false), finished(false)
2019-02-06 16:31:18 +00:00
{
2019-02-08 16:10:57 +00:00
buildGraph();
2019-02-06 16:31:18 +00:00
}
bool PipelineExecutor::addEdges(UInt64 node)
2019-02-06 16:31:18 +00:00
{
auto throwUnknownProcessor = [](const IProcessor * proc, const IProcessor * parent, bool from_input_port)
{
String msg = "Processor " + proc->getName() + " was found as " + (from_input_port ? "input" : "output")
+ " for processor " + parent->getName() + ", but not found in list of processors.";
2019-02-06 16:31:18 +00:00
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
};
const IProcessor * cur = graph[node].processor;
2019-02-06 16:31:18 +00:00
auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges)
2019-02-06 16:31:18 +00:00
{
auto it = processors_map.find(to_proc);
if (it == processors_map.end())
throwUnknownProcessor(to_proc, cur, true);
2019-02-06 16:31:18 +00:00
UInt64 proc_num = it->second;
Edge * edge_ptr = nullptr;
2019-02-06 16:31:18 +00:00
for (auto & edge : edges)
if (edge.to == proc_num)
edge_ptr = &edge;
2019-02-06 16:31:18 +00:00
if (!edge_ptr)
2019-02-06 16:31:18 +00:00
{
edge_ptr = &edges.emplace_back();
edge_ptr->to = proc_num;
}
2019-02-06 16:31:18 +00:00
from_port.setVersion(&edge_ptr->version);
};
2019-02-06 16:31:18 +00:00
bool was_edge_added = false;
auto & inputs = processors[node]->getInputs();
auto from_input = graph[node].backEdges.size();
if (from_input < inputs.size())
{
was_edge_added = true;
for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it)
{
const IProcessor * proc = &it->getOutputPort().getProcessor();
add_edge(*it, proc, graph[node].backEdges);
}
}
2019-02-08 16:10:57 +00:00
auto & outputs = processors[node]->getOutputs();
auto from_output = graph[node].directEdges.size();
if (from_output < outputs.size())
{
was_edge_added = true;
for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it)
{
const IProcessor * proc = &it->getInputPort().getProcessor();
add_edge(*it, proc, graph[node].directEdges);
}
}
return was_edge_added;
}
2019-02-06 16:31:18 +00:00
void PipelineExecutor::buildGraph()
{
UInt64 num_processors = processors.size();
2019-02-08 16:10:57 +00:00
graph.resize(num_processors);
for (UInt64 node = 0; node < num_processors; ++node)
{
IProcessor * proc = processors[node].get();
processors_map[proc] = node;
graph[node].processor = proc;
2019-02-06 16:31:18 +00:00
}
for (UInt64 node = 0; node < num_processors; ++node)
addEdges(node);
2019-02-06 16:31:18 +00:00
}
2019-02-08 16:10:57 +00:00
void PipelineExecutor::addChildlessProcessorsToQueue()
{
UInt64 num_processors = processors.size();
for (UInt64 proc = 0; proc < num_processors; ++proc)
{
if (graph[proc].directEdges.empty())
{
prepare_queue.push(proc);
graph[proc].status = ExecStatus::Preparing;
}
}
}
void PipelineExecutor::processFinishedExecutionQueue()
{
2019-05-16 14:57:27 +00:00
UInt64 finished_job = graph.size();
2019-02-08 16:10:57 +00:00
while (!finished_execution_queue.empty())
{
2019-05-16 14:57:27 +00:00
/// Should be successful as single consumer is used.
while (finished_execution_queue.pop(finished_job));
++num_waited_tasks;
++graph[finished_job].execution_state->num_executed_jobs;
2019-02-08 16:10:57 +00:00
2019-05-16 14:57:27 +00:00
if (graph[finished_job].execution_state->exception)
std::rethrow_exception(graph[finished_job].execution_state->exception);
2019-05-16 14:57:27 +00:00
graph[finished_job].status = ExecStatus::Preparing;
prepare_queue.push(finished_job);
2019-02-08 16:10:57 +00:00
}
}
2019-05-16 14:57:27 +00:00
void PipelineExecutor::processFinishedExecutionQueueSafe()
2019-02-08 16:10:57 +00:00
{
2019-05-16 14:57:27 +00:00
// if (pool)
// {
// /// std::lock_guard lock(finished_execution_mutex);
// processFinishedExecutionQueue(queue);
// }
// else
processFinishedExecutionQueue();
2019-02-08 16:10:57 +00:00
}
bool PipelineExecutor::addProcessorToPrepareQueueIfUpdated(Edge & edge)
{
auto & node = graph[edge.to];
2019-02-08 16:10:57 +00:00
/// Don't add processor if nothing was read from port.
if (node.status != ExecStatus::New && edge.version == edge.prev_version)
2019-02-08 16:10:57 +00:00
return false;
edge.prev_version = edge.version;
if (node.status == ExecStatus::Idle || node.status == ExecStatus::New)
2019-02-08 16:10:57 +00:00
{
prepare_queue.push(edge.to);
node.status = ExecStatus::Preparing;
return true;
}
return false;
}
static void executeJob(IProcessor * processor)
{
try
{
processor->work();
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("While executing " + processor->getName() + " ("
+ toString(reinterpret_cast<std::uintptr_t>(processor)) + ") ");
throw;
}
}
2019-05-16 14:57:27 +00:00
void PipelineExecutor::addJob(UInt64 pid)
2019-02-08 16:10:57 +00:00
{
2019-05-16 14:57:27 +00:00
if (!threads.empty())
2019-02-08 16:10:57 +00:00
{
2019-05-16 14:57:27 +00:00
if (!graph[pid].execution_state)
graph[pid].execution_state = std::make_unique<ExecutionState>();
2019-05-16 14:57:27 +00:00
auto job = [this, pid, processor = graph[pid].processor, execution_state = graph[pid].execution_state.get()]()
{
SCOPE_EXIT(
2019-05-16 14:57:27 +00:00
while (!finished_execution_queue.push(pid));
event_counter.notify()
);
2019-04-05 10:52:07 +00:00
try
{
2019-05-16 14:57:27 +00:00
Stopwatch watch;
executeJob(processor);
2019-05-16 14:57:27 +00:00
execution_state->execution_time_ns += watch.elapsed();
}
catch (...)
{
2019-05-16 14:57:27 +00:00
/// Note: It's important to save exception before pushing pid to finished_execution_queue
execution_state->exception = std::current_exception();
}
2019-02-08 16:10:57 +00:00
};
2019-05-16 14:57:27 +00:00
graph[pid].execution_state->job = std::move(job);
while (!task_queue.push(graph[pid].execution_state.get()))
sleep(0);
task_condvar.notify_one();
2019-02-08 16:10:57 +00:00
++num_tasks_to_wait;
}
else
{
/// Execute task in main thread.
executeJob(graph[pid].processor);
2019-05-16 14:57:27 +00:00
while (!finished_execution_queue.push(pid));
2019-02-08 16:10:57 +00:00
}
}
void PipelineExecutor::addAsyncJob(UInt64 pid)
{
graph[pid].processor->schedule(event_counter);
graph[pid].status = ExecStatus::Async;
++num_tasks_to_wait;
}
void PipelineExecutor::expandPipeline(UInt64 pid)
{
auto & cur_node = graph[pid];
auto new_processors = cur_node.processor->expandPipeline();
for (const auto & processor : new_processors)
{
if (processors_map.count(processor.get()))
throw Exception("Processor " + processor->getName() + " was already added to pipeline.",
ErrorCodes::LOGICAL_ERROR);
processors_map[processor.get()] = graph.size();
graph.emplace_back();
graph.back().processor = processor.get();
}
processors.insert(processors.end(), new_processors.begin(), new_processors.end());
UInt64 num_processors = processors.size();
for (UInt64 node = 0; node < num_processors; ++node)
{
if (addEdges(node))
{
if (graph[node].status == ExecStatus::Idle || graph[node].status == ExecStatus::New)
{
graph[node].status = ExecStatus::Preparing;
prepare_queue.push(node);
}
}
}
}
2019-05-16 14:57:27 +00:00
void PipelineExecutor::prepareProcessor(UInt64 pid, bool async)
2019-02-06 16:31:18 +00:00
{
auto & node = graph[pid];
auto status = node.processor->prepare();
2019-02-08 16:10:57 +00:00
node.last_processor_status = status;
auto add_neighbours_to_prepare_queue = [&, this]
{
for (auto & edge : node.directEdges)
addProcessorToPrepareQueueIfUpdated(edge);
for (auto & edge : node.backEdges)
addProcessorToPrepareQueueIfUpdated(edge);
};
2019-02-06 16:31:18 +00:00
switch (status)
{
case IProcessor::Status::NeedData:
2019-02-08 16:10:57 +00:00
{
add_neighbours_to_prepare_queue();
node.status = ExecStatus::Idle;
break;
}
2019-02-06 16:31:18 +00:00
case IProcessor::Status::PortFull:
{
2019-02-08 16:10:57 +00:00
add_neighbours_to_prepare_queue();
2019-02-06 16:31:18 +00:00
node.status = ExecStatus::Idle;
break;
}
case IProcessor::Status::Finished:
{
2019-02-08 16:10:57 +00:00
add_neighbours_to_prepare_queue();
2019-02-06 16:31:18 +00:00
node.status = ExecStatus::Finished;
break;
}
case IProcessor::Status::Ready:
{
2019-02-08 16:10:57 +00:00
node.status = ExecStatus::Executing;
2019-05-16 14:57:27 +00:00
addJob(pid);
2019-02-06 16:31:18 +00:00
break;
}
case IProcessor::Status::Async:
{
2019-02-08 16:10:57 +00:00
node.status = ExecStatus::Executing;
addAsyncJob(pid);
break;
2019-02-06 16:31:18 +00:00
}
case IProcessor::Status::Wait:
{
2019-02-08 16:10:57 +00:00
if (!async)
throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
break;
}
case IProcessor::Status::ExpandPipeline:
{
expandPipeline(pid);
/// Add node to queue again.
prepare_queue.push(pid);
2019-03-15 17:06:32 +00:00
/// node ref is not valid now.
graph[pid].status = ExecStatus::Preparing;
break;
2019-02-06 16:31:18 +00:00
}
}
}
2019-05-16 14:57:27 +00:00
void PipelineExecutor::processPrepareQueue()
2019-02-06 16:31:18 +00:00
{
2019-02-08 16:10:57 +00:00
while (!prepare_queue.empty())
2019-02-06 16:31:18 +00:00
{
2019-02-08 16:10:57 +00:00
UInt64 proc = prepare_queue.front();
prepare_queue.pop();
2019-05-16 14:57:27 +00:00
prepareProcessor(proc, false);
2019-02-06 16:31:18 +00:00
}
}
2019-05-16 14:57:27 +00:00
void PipelineExecutor::processAsyncQueue()
2019-02-08 16:10:57 +00:00
{
UInt64 num_processors = processors.size();
for (UInt64 node = 0; node < num_processors; ++node)
if (graph[node].status == ExecStatus::Async)
2019-05-16 14:57:27 +00:00
prepareProcessor(node, true);
2019-02-08 16:10:57 +00:00
}
2019-05-16 14:57:27 +00:00
void PipelineExecutor::execute(size_t num_threads)
2019-02-06 16:31:18 +00:00
{
2019-02-08 16:10:57 +00:00
addChildlessProcessorsToQueue();
2019-02-06 16:31:18 +00:00
try
{
/// Wait for all tasks to finish in case of exception.
SCOPE_EXIT(
2019-05-16 14:57:27 +00:00
finished = true;
task_condvar.notify_all();
for (auto & thread : threads)
thread.join();
);
2019-05-16 14:57:27 +00:00
executeImpl(num_threads);
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("\nCurrent state:\n" + dumpPipeline());
throw;
}
if (cancelled)
return;
bool all_processors_finished = true;
for (auto & node : graph)
if (node.status != ExecStatus::Finished)
all_processors_finished = false;
if (!all_processors_finished)
throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
}
2019-05-16 14:57:27 +00:00
void PipelineExecutor::executeImpl(size_t num_threads)
{
2019-05-16 14:57:27 +00:00
/// No need to make task_queue longer than num_threads.
/// Therefore, finished_execution_queue can't be longer than num_threads too.
task_queue.reserve_unsafe(num_threads);
finished_execution_queue.reserve_unsafe(num_threads);
if (num_threads > 1)
{
/// For single thread execution will execute jobs in main thread.
threads.reserve(num_threads);
auto thread_group = CurrentThread::getGroup();
for (size_t i = 0; i < num_threads; ++i)
{
threads.emplace_back([this, thread_group, thread_number = i]
{
ThreadStatus thread_status;
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 wait_time_ns = 0;
Stopwatch total_time_watch;
ExecutionState * state = nullptr;
while (!finished)
{
while (task_queue.pop(state))
{
Stopwatch execution_time_watch;
state->job();
execution_time_ns += execution_time_watch.elapsed();
}
/// Note: we don't wait in thread like in ordinary thread pool.
/// Probably, it's better to add waiting on condition variable.
/// But not will avoid it to escape extra locking.
/// std::unique_lock lock(task_mutex);
/// task_condvar.wait(lock);
}
total_time_ns = total_time_watch.elapsed();
wait_time_ns = total_time_ns - execution_time_ns;
LOG_TRACE(log, "Thread " << thread_number << " finished."
<< " Total time: " << (total_time_ns / 1e9) << " sec."
<< " Execution time: " << (execution_time_ns / 1e9) << " sec."
<< " Wait time: " << (wait_time_ns / 1e9) << "sec.");
});
}
}
2019-04-05 10:52:07 +00:00
while (!cancelled)
2019-02-06 16:31:18 +00:00
{
2019-05-16 14:57:27 +00:00
processFinishedExecutionQueueSafe();
processPrepareQueue();
processAsyncQueue();
2019-02-08 16:10:57 +00:00
if (prepare_queue.empty())
2019-02-06 16:31:18 +00:00
{
2019-02-08 16:10:57 +00:00
/// For single-thread executor.
2019-05-16 14:57:27 +00:00
if (num_threads == 1)
{
if (!finished_execution_queue.empty())
continue;
else
break;
}
2019-02-08 16:10:57 +00:00
if (num_tasks_to_wait > num_waited_tasks)
{
/// Try wait anything.
event_counter.wait();
++num_waited_tasks;
}
else
{
/// Here prepare_queue is empty and we have nobody to wait for. Exiting.
break;
}
2019-02-06 16:31:18 +00:00
}
}
}
2019-02-06 16:31:18 +00:00
String PipelineExecutor::dumpPipeline() const
{
2019-05-16 14:57:27 +00:00
for (auto & node : graph)
{
if (node.execution_state)
node.processor->setDescription(
"(" + std::to_string(node.execution_state->num_executed_jobs) + ", "
+ std::to_string(node.execution_state->execution_time_ns / 1e9) + " sec.)");
}
std::vector<IProcessor::Status> statuses;
std::vector<IProcessor *> proc_list;
statuses.reserve(graph.size());
proc_list.reserve(graph.size());
2019-02-06 16:31:18 +00:00
for (auto & proc : graph)
2019-02-06 16:31:18 +00:00
{
proc_list.emplace_back(proc.processor);
statuses.emplace_back(proc.last_processor_status);
}
2019-02-06 16:31:18 +00:00
WriteBufferFromOwnString out;
printPipeline(processors, statuses, out);
out.finish();
2019-02-08 16:10:57 +00:00
return out.str();
2019-02-06 16:31:18 +00:00
}
}