Fix data race in query finish/cancel

This commit is contained in:
Alexey Milovidov 2022-10-17 02:10:36 +02:00
parent 191158f93b
commit c6b2ee47df
13 changed files with 104 additions and 99 deletions

View File

@ -10,17 +10,17 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
ExecutingGraph::ExecutingGraph(Processors & processors_, bool profile_processors_) ExecutingGraph::ExecutingGraph(std::shared_ptr<Processors> processors_, bool profile_processors_)
: processors(processors_) : processors(std::move(processors_))
, profile_processors(profile_processors_) , profile_processors(profile_processors_)
{ {
uint64_t num_processors = processors.size(); uint64_t num_processors = processors->size();
nodes.reserve(num_processors); nodes.reserve(num_processors);
/// Create nodes. /// Create nodes.
for (uint64_t node = 0; node < num_processors; ++node) for (uint64_t node = 0; node < num_processors; ++node)
{ {
IProcessor * proc = processors[node].get(); IProcessor * proc = processors->at(node).get();
processors_map[proc] = node; processors_map[proc] = node;
nodes.emplace_back(std::make_unique<Node>(proc, node)); nodes.emplace_back(std::make_unique<Node>(proc, node));
} }
@ -109,10 +109,10 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
{ {
std::lock_guard guard(processors_mutex); std::lock_guard guard(processors_mutex);
processors.insert(processors.end(), new_processors.begin(), new_processors.end()); processors->insert(processors->end(), new_processors.begin(), new_processors.end());
} }
uint64_t num_processors = processors.size(); uint64_t num_processors = processors->size();
std::vector<uint64_t> back_edges_sizes(num_processors, 0); std::vector<uint64_t> back_edges_sizes(num_processors, 0);
std::vector<uint64_t> direct_edge_sizes(num_processors, 0); std::vector<uint64_t> direct_edge_sizes(num_processors, 0);
@ -126,7 +126,7 @@ bool ExecutingGraph::expandPipeline(std::stack<uint64_t> & stack, uint64_t pid)
while (nodes.size() < num_processors) while (nodes.size() < num_processors)
{ {
auto * processor = processors[nodes.size()].get(); auto * processor = processors->at(nodes.size()).get();
if (processors_map.contains(processor)) if (processors_map.contains(processor))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Processor {} was already added to pipeline", processor->getName()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Processor {} was already added to pipeline", processor->getName());
@ -386,7 +386,7 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
void ExecutingGraph::cancel() void ExecutingGraph::cancel()
{ {
std::lock_guard guard(processors_mutex); std::lock_guard guard(processors_mutex);
for (auto & processor : processors) for (auto & processor : *processors)
processor->cancel(); processor->cancel();
} }

View File

@ -6,6 +6,7 @@
#include <queue> #include <queue>
#include <stack> #include <stack>
namespace DB namespace DB
{ {
@ -123,9 +124,9 @@ public:
using ProcessorsMap = std::unordered_map<const IProcessor *, uint64_t>; using ProcessorsMap = std::unordered_map<const IProcessor *, uint64_t>;
ProcessorsMap processors_map; ProcessorsMap processors_map;
explicit ExecutingGraph(Processors & processors_, bool profile_processors_); explicit ExecutingGraph(std::shared_ptr<Processors> processors_, bool profile_processors_);
const Processors & getProcessors() const { return processors; } const Processors & getProcessors() const { return *processors; }
/// Traverse graph the first time to update all the childless nodes. /// Traverse graph the first time to update all the childless nodes.
void initializeExecution(Queue & queue); void initializeExecution(Queue & queue);
@ -149,7 +150,7 @@ private:
/// All new nodes and nodes with updated ports are pushed into stack. /// All new nodes and nodes with updated ports are pushed into stack.
bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid); bool expandPipeline(std::stack<uint64_t> & stack, uint64_t pid);
Processors & processors; std::shared_ptr<Processors> processors;
std::mutex processors_mutex; std::mutex processors_mutex;
UpgradableMutex nodes_mutex; UpgradableMutex nodes_mutex;

View File

@ -24,7 +24,7 @@ namespace ErrorCodes
} }
PipelineExecutor::PipelineExecutor(Processors & processors, QueryStatus * elem) PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatus * elem)
: process_list_element(elem) : process_list_element(elem)
{ {
if (process_list_element) if (process_list_element)
@ -41,7 +41,7 @@ PipelineExecutor::PipelineExecutor(Processors & processors, QueryStatus * elem)
/// If exception was thrown while pipeline initialization, it means that query pipeline was not build correctly. /// If exception was thrown while pipeline initialization, it means that query pipeline was not build correctly.
/// It is logical error, and we need more information about pipeline. /// It is logical error, and we need more information about pipeline.
WriteBufferFromOwnString buf; WriteBufferFromOwnString buf;
printPipeline(processors, buf); printPipeline(*processors, buf);
buf.finalize(); buf.finalize();
exception.addMessage("Query pipeline:\n" + buf.str()); exception.addMessage("Query pipeline:\n" + buf.str());

View File

@ -10,6 +10,7 @@
#include <queue> #include <queue>
#include <mutex> #include <mutex>
namespace DB namespace DB
{ {
@ -30,7 +31,7 @@ public:
/// During pipeline execution new processors can appear. They will be added to existing set. /// During pipeline execution new processors can appear. They will be added to existing set.
/// ///
/// Explicit graph representation is built in constructor. Throws if graph is not correct. /// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(Processors & processors, QueryStatus * elem); explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatus * elem);
~PipelineExecutor(); ~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once. /// Execute pipeline in multiple threads. Must be called once.

View File

@ -129,7 +129,7 @@ PushingAsyncPipelineExecutor::PushingAsyncPipelineExecutor(QueryPipeline & pipel
pushing_source = std::make_shared<PushingAsyncSource>(pipeline.input->getHeader()); pushing_source = std::make_shared<PushingAsyncSource>(pipeline.input->getHeader());
connect(pushing_source->getPort(), *pipeline.input); connect(pushing_source->getPort(), *pipeline.input);
pipeline.processors.emplace_back(pushing_source); pipeline.processors->emplace_back(pushing_source);
} }
PushingAsyncPipelineExecutor::~PushingAsyncPipelineExecutor() PushingAsyncPipelineExecutor::~PushingAsyncPipelineExecutor()

View File

@ -58,7 +58,7 @@ PushingPipelineExecutor::PushingPipelineExecutor(QueryPipeline & pipeline_) : pi
pushing_source = std::make_shared<PushingSource>(pipeline.input->getHeader(), input_wait_flag); pushing_source = std::make_shared<PushingSource>(pipeline.input->getHeader(), input_wait_flag);
connect(pushing_source->getPort(), *pipeline.input); connect(pushing_source->getPort(), *pipeline.input);
pipeline.processors.emplace_back(pushing_source); pipeline.processors->emplace_back(pushing_source);
} }
PushingPipelineExecutor::~PushingPipelineExecutor() PushingPipelineExecutor::~PushingPipelineExecutor()

View File

@ -23,9 +23,9 @@ TEST(Processors, PortsConnected)
connect(source->getPort(), sink->getPort()); connect(source->getPort(), sink->getPort());
Processors processors; auto processors = std::make_shared<Processors>();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
processors.emplace_back(std::move(sink)); processors->emplace_back(std::move(sink));
QueryStatus * element = nullptr; QueryStatus * element = nullptr;
PipelineExecutor executor(processors, element); PipelineExecutor executor(processors, element);
@ -46,9 +46,9 @@ TEST(Processors, PortsNotConnected)
/// connect(source->getPort(), sink->getPort()); /// connect(source->getPort(), sink->getPort());
Processors processors; auto processors = std::make_shared<Processors>();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
processors.emplace_back(std::move(sink)); processors->emplace_back(std::move(sink));
#ifndef ABORT_ON_LOGICAL_ERROR #ifndef ABORT_ON_LOGICAL_ERROR
try try

View File

@ -34,9 +34,8 @@ struct BlockIO
void onFinish() void onFinish()
{ {
if (finish_callback) if (finish_callback)
{
finish_callback(pipeline); finish_callback(pipeline);
}
pipeline.reset(); pipeline.reset();
} }

View File

@ -155,7 +155,7 @@ Pipe::Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, Output
totals_port = totals; totals_port = totals;
extremes_port = extremes; extremes_port = extremes;
output_ports.push_back(output); output_ports.push_back(output);
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
max_parallel_streams = 1; max_parallel_streams = 1;
} }
@ -168,18 +168,18 @@ Pipe::Pipe(ProcessorPtr source)
output_ports.push_back(&source->getOutputs().front()); output_ports.push_back(&source->getOutputs().front());
header = output_ports.front()->getHeader(); header = output_ports.front()->getHeader();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
max_parallel_streams = 1; max_parallel_streams = 1;
} }
Pipe::Pipe(Processors processors_) : processors(std::move(processors_)) Pipe::Pipe(std::shared_ptr<Processors> processors_) : processors(std::move(processors_))
{ {
/// Create hash table with processors. /// Create hash table with processors.
std::unordered_set<const IProcessor *> set; std::unordered_set<const IProcessor *> set;
for (const auto & processor : processors) for (const auto & processor : *processors)
set.emplace(processor.get()); set.emplace(processor.get());
for (auto & processor : processors) for (auto & processor : *processors)
{ {
for (const auto & port : processor->getInputs()) for (const auto & port : processor->getInputs())
{ {
@ -225,7 +225,7 @@ Pipe::Pipe(Processors processors_) : processors(std::move(processors_))
max_parallel_streams = output_ports.size(); max_parallel_streams = output_ports.size();
if (collected_processors) if (collected_processors)
for (const auto & processor : processors) for (const auto & processor : *processors)
collected_processors->emplace_back(processor); collected_processors->emplace_back(processor);
} }
@ -311,7 +311,7 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow
if (!allow_empty_header || pipe.header) if (!allow_empty_header || pipe.header)
assertCompatibleHeader(pipe.header, res.header, "Pipe::unitePipes"); assertCompatibleHeader(pipe.header, res.header, "Pipe::unitePipes");
res.processors.insert(res.processors.end(), pipe.processors.begin(), pipe.processors.end()); res.processors->insert(res.processors->end(), pipe.processors->begin(), pipe.processors->end());
res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end()); res.output_ports.insert(res.output_ports.end(), pipe.output_ports.begin(), pipe.output_ports.end());
res.max_parallel_streams += pipe.max_parallel_streams; res.max_parallel_streams += pipe.max_parallel_streams;
@ -323,15 +323,15 @@ Pipe Pipe::unitePipes(Pipes pipes, Processors * collected_processors, bool allow
extremes.emplace_back(pipe.extremes_port); extremes.emplace_back(pipe.extremes_port);
} }
size_t num_processors = res.processors.size(); size_t num_processors = res.processors->size();
res.totals_port = uniteTotals(totals, res.header, res.processors); res.totals_port = uniteTotals(totals, res.header, *res.processors);
res.extremes_port = uniteExtremes(extremes, res.header, res.processors); res.extremes_port = uniteExtremes(extremes, res.header, *res.processors);
if (res.collected_processors) if (res.collected_processors)
{ {
for (; num_processors < res.processors.size(); ++num_processors) for (; num_processors < res.processors->size(); ++num_processors)
res.collected_processors->emplace_back(res.processors[num_processors]); res.collected_processors->emplace_back(res.processors->at(num_processors));
} }
return res; return res;
@ -351,7 +351,7 @@ void Pipe::addSource(ProcessorPtr source)
collected_processors->emplace_back(source); collected_processors->emplace_back(source);
output_ports.push_back(&source->getOutputs().front()); output_ports.push_back(&source->getOutputs().front());
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size()); max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
} }
@ -373,7 +373,7 @@ void Pipe::addTotalsSource(ProcessorPtr source)
collected_processors->emplace_back(source); collected_processors->emplace_back(source);
totals_port = &source->getOutputs().front(); totals_port = &source->getOutputs().front();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
} }
void Pipe::addExtremesSource(ProcessorPtr source) void Pipe::addExtremesSource(ProcessorPtr source)
@ -393,7 +393,7 @@ void Pipe::addExtremesSource(ProcessorPtr source)
collected_processors->emplace_back(source); collected_processors->emplace_back(source);
extremes_port = &source->getOutputs().front(); extremes_port = &source->getOutputs().front();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
} }
static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors) static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors)
@ -413,12 +413,12 @@ static void dropPort(OutputPort *& port, Processors & processors, Processors * c
void Pipe::dropTotals() void Pipe::dropTotals()
{ {
dropPort(totals_port, processors, collected_processors); dropPort(totals_port, *processors, collected_processors);
} }
void Pipe::dropExtremes() void Pipe::dropExtremes()
{ {
dropPort(extremes_port, processors, collected_processors); dropPort(extremes_port, *processors, collected_processors);
} }
void Pipe::addTransform(ProcessorPtr transform) void Pipe::addTransform(ProcessorPtr transform)
@ -504,7 +504,7 @@ void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort
if (collected_processors) if (collected_processors)
collected_processors->emplace_back(transform); collected_processors->emplace_back(transform);
processors.emplace_back(std::move(transform)); processors->emplace_back(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size()); max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
} }
@ -595,7 +595,7 @@ void Pipe::addTransform(ProcessorPtr transform, InputPort * totals, InputPort *
if (collected_processors) if (collected_processors)
collected_processors->emplace_back(transform); collected_processors->emplace_back(transform);
processors.emplace_back(std::move(transform)); processors->emplace_back(std::move(transform));
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size()); max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
} }
@ -647,7 +647,7 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
if (collected_processors) if (collected_processors)
collected_processors->emplace_back(transform); collected_processors->emplace_back(transform);
processors.emplace_back(std::move(transform)); processors->emplace_back(std::move(transform));
} }
}; };
@ -698,7 +698,7 @@ void Pipe::addChains(std::vector<Chain> chains)
if (collected_processors) if (collected_processors)
collected_processors->emplace_back(transform); collected_processors->emplace_back(transform);
processors.emplace_back(std::move(transform)); processors->emplace_back(std::move(transform));
} }
} }
@ -757,7 +757,7 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
transform = std::make_shared<NullSink>(stream->getHeader()); transform = std::make_shared<NullSink>(stream->getHeader());
connect(*stream, transform->getInputs().front()); connect(*stream, transform->getInputs().front());
processors.emplace_back(std::move(transform)); processors->emplace_back(std::move(transform));
}; };
for (auto & port : output_ports) for (auto & port : output_ports)
@ -858,7 +858,7 @@ void Pipe::transform(const Transformer & transformer, bool check_ports)
collected_processors->emplace_back(processor); collected_processors->emplace_back(processor);
} }
processors.insert(processors.end(), new_processors.begin(), new_processors.end()); processors->insert(processors->end(), new_processors.begin(), new_processors.end());
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size()); max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
} }

View File

@ -5,6 +5,7 @@
#include <QueryPipeline/Chain.h> #include <QueryPipeline/Chain.h>
#include <QueryPipeline/SizeLimits.h> #include <QueryPipeline/SizeLimits.h>
namespace DB namespace DB
{ {
@ -33,7 +34,7 @@ public:
/// Create from source with specified totals end extremes (may be nullptr). Ports should be owned by source. /// Create from source with specified totals end extremes (may be nullptr). Ports should be owned by source.
explicit Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes); explicit Pipe(ProcessorPtr source, OutputPort * output, OutputPort * totals, OutputPort * extremes);
/// Create from processors. Use all not-connected output ports as output_ports. Check invariants. /// Create from processors. Use all not-connected output ports as output_ports. Check invariants.
explicit Pipe(Processors processors_); explicit Pipe(std::shared_ptr<Processors> processors_);
Pipe(const Pipe & other) = delete; Pipe(const Pipe & other) = delete;
Pipe(Pipe && other) = default; Pipe(Pipe && other) = default;
@ -41,7 +42,7 @@ public:
Pipe & operator=(Pipe && other) = default; Pipe & operator=(Pipe && other) = default;
const Block & getHeader() const { return header; } const Block & getHeader() const { return header; }
bool empty() const { return processors.empty(); } bool empty() const { return processors->empty(); }
size_t numOutputPorts() const { return output_ports.size(); } size_t numOutputPorts() const { return output_ports.size(); }
size_t maxParallelStreams() const { return max_parallel_streams; } size_t maxParallelStreams() const { return max_parallel_streams; }
OutputPort * getOutputPort(size_t pos) const { return output_ports[pos]; } OutputPort * getOutputPort(size_t pos) const { return output_ports[pos]; }
@ -96,15 +97,15 @@ public:
/// Unite several pipes together. They should have same header. /// Unite several pipes together. They should have same header.
static Pipe unitePipes(Pipes pipes); static Pipe unitePipes(Pipes pipes);
/// Get processors from Pipe. Use it with cautious, it is easy to loss totals and extremes ports. /// Get processors from Pipe. Use it with caution, it is easy to lose totals and extremes ports.
static Processors detachProcessors(Pipe pipe) { return std::move(pipe.processors); } static Processors detachProcessors(Pipe pipe) { return *std::move(pipe.processors); }
/// Get processors from Pipe without destroying pipe (used for EXPLAIN to keep QueryPlan). /// Get processors from Pipe without destroying pipe (used for EXPLAIN to keep QueryPlan).
const Processors & getProcessors() const { return processors; } const Processors & getProcessors() const { return *processors; }
private: private:
/// Header is common for all output below. /// Header is common for all output below.
Block header; Block header;
Processors processors; std::shared_ptr<Processors> processors;
/// Output ports. Totals and extremes are allowed to be empty. /// Output ports. Totals and extremes are allowed to be empty.
OutputPortRawPtrs output_ports; OutputPortRawPtrs output_ports;

View File

@ -21,6 +21,7 @@
#include <Processors/Transforms/ExpressionTransform.h> #include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h> #include <Processors/QueryPlan/ReadFromPreparedSource.h>
namespace DB namespace DB
{ {
@ -210,16 +211,16 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
QueryPipeline::QueryPipeline( QueryPipeline::QueryPipeline(
QueryPlanResourceHolder resources_, QueryPlanResourceHolder resources_,
Processors processors_) std::shared_ptr<Processors> processors_)
: resources(std::move(resources_)) : resources(std::move(resources_))
, processors(std::move(processors_)) , processors(std::move(processors_))
{ {
checkCompleted(processors); checkCompleted(*processors);
} }
QueryPipeline::QueryPipeline( QueryPipeline::QueryPipeline(
QueryPlanResourceHolder resources_, QueryPlanResourceHolder resources_,
Processors processors_, std::shared_ptr<Processors> processors_,
InputPort * input_) InputPort * input_)
: resources(std::move(resources_)) : resources(std::move(resources_))
, processors(std::move(processors_)) , processors(std::move(processors_))
@ -231,7 +232,7 @@ QueryPipeline::QueryPipeline(
"Cannot create pushing QueryPipeline because its input port is connected or null"); "Cannot create pushing QueryPipeline because its input port is connected or null");
bool found_input = false; bool found_input = false;
for (const auto & processor : processors) for (const auto & processor : *processors)
{ {
for (const auto & in : processor->getInputs()) for (const auto & in : processor->getInputs())
{ {
@ -255,7 +256,7 @@ QueryPipeline::QueryPipeline(std::shared_ptr<ISource> source) : QueryPipeline(Pi
QueryPipeline::QueryPipeline( QueryPipeline::QueryPipeline(
QueryPlanResourceHolder resources_, QueryPlanResourceHolder resources_,
Processors processors_, std::shared_ptr<Processors> processors_,
OutputPort * output_, OutputPort * output_,
OutputPort * totals_, OutputPort * totals_,
OutputPort * extremes_) OutputPort * extremes_)
@ -265,7 +266,7 @@ QueryPipeline::QueryPipeline(
, totals(totals_) , totals(totals_)
, extremes(extremes_) , extremes(extremes_)
{ {
checkPulling(processors, output, totals, extremes); checkPulling(*processors, output, totals, extremes);
} }
QueryPipeline::QueryPipeline(Pipe pipe) QueryPipeline::QueryPipeline(Pipe pipe)
@ -278,12 +279,12 @@ QueryPipeline::QueryPipeline(Pipe pipe)
extremes = pipe.getExtremesPort(); extremes = pipe.getExtremesPort();
processors = std::move(pipe.processors); processors = std::move(pipe.processors);
checkPulling(processors, output, totals, extremes); checkPulling(*processors, output, totals, extremes);
} }
else else
{ {
processors = std::move(pipe.processors); processors = std::move(pipe.processors);
checkCompleted(processors); checkCompleted(*processors);
} }
} }
@ -292,13 +293,13 @@ QueryPipeline::QueryPipeline(Chain chain)
, input(&chain.getInputPort()) , input(&chain.getInputPort())
, num_threads(chain.getNumThreads()) , num_threads(chain.getNumThreads())
{ {
processors.reserve(chain.getProcessors().size() + 1); processors->reserve(chain.getProcessors().size() + 1);
for (auto processor : chain.getProcessors()) for (auto processor : chain.getProcessors())
processors.emplace_back(std::move(processor)); processors->emplace_back(std::move(processor));
auto sink = std::make_shared<EmptySink>(chain.getOutputPort().getHeader()); auto sink = std::make_shared<EmptySink>(chain.getOutputPort().getHeader());
connect(chain.getOutputPort(), sink->getPort()); connect(chain.getOutputPort(), sink->getPort());
processors.emplace_back(std::move(sink)); processors->emplace_back(std::move(sink));
input = &chain.getInputPort(); input = &chain.getInputPort();
} }
@ -313,14 +314,14 @@ QueryPipeline::QueryPipeline(std::shared_ptr<IOutputFormat> format)
{ {
auto source = std::make_shared<NullSource>(format_totals.getHeader()); auto source = std::make_shared<NullSource>(format_totals.getHeader());
totals = &source->getPort(); totals = &source->getPort();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
} }
if (!extremes) if (!extremes)
{ {
auto source = std::make_shared<NullSource>(format_extremes.getHeader()); auto source = std::make_shared<NullSource>(format_extremes.getHeader());
extremes = &source->getPort(); extremes = &source->getPort();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
} }
connect(*totals, format_totals); connect(*totals, format_totals);
@ -332,7 +333,7 @@ QueryPipeline::QueryPipeline(std::shared_ptr<IOutputFormat> format)
output_format = format.get(); output_format = format.get();
processors.emplace_back(std::move(format)); processors->emplace_back(std::move(format));
} }
static void drop(OutputPort *& port, Processors & processors) static void drop(OutputPort *& port, Processors & processors)
@ -354,11 +355,11 @@ void QueryPipeline::complete(std::shared_ptr<ISink> sink)
if (!pulling()) if (!pulling())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to be completed with sink"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline must be pulling to be completed with sink");
drop(totals, processors); drop(totals, *processors);
drop(extremes, processors); drop(extremes, *processors);
connect(*output, sink->getPort()); connect(*output, sink->getPort());
processors.emplace_back(std::move(sink)); processors->emplace_back(std::move(sink));
output = nullptr; output = nullptr;
} }
@ -369,17 +370,17 @@ void QueryPipeline::complete(Chain chain)
resources = chain.detachResources(); resources = chain.detachResources();
drop(totals, processors); drop(totals, *processors);
drop(extremes, processors); drop(extremes, *processors);
processors.reserve(processors.size() + chain.getProcessors().size() + 1); processors->reserve(processors->size() + chain.getProcessors().size() + 1);
for (auto processor : chain.getProcessors()) for (auto processor : chain.getProcessors())
processors.emplace_back(std::move(processor)); processors->emplace_back(std::move(processor));
auto sink = std::make_shared<EmptySink>(chain.getOutputPort().getHeader()); auto sink = std::make_shared<EmptySink>(chain.getOutputPort().getHeader());
connect(*output, chain.getInputPort()); connect(*output, chain.getInputPort());
connect(chain.getOutputPort(), sink->getPort()); connect(chain.getOutputPort(), sink->getPort());
processors.emplace_back(std::move(sink)); processors->emplace_back(std::move(sink));
output = nullptr; output = nullptr;
} }
@ -400,7 +401,7 @@ void QueryPipeline::complete(Pipe pipe)
input = nullptr; input = nullptr;
auto pipe_processors = Pipe::detachProcessors(std::move(pipe)); auto pipe_processors = Pipe::detachProcessors(std::move(pipe));
processors.insert(processors.end(), pipe_processors.begin(), pipe_processors.end()); processors->insert(processors->end(), pipe_processors.begin(), pipe_processors.end());
} }
static void addMaterializing(OutputPort *& output, Processors & processors) static void addMaterializing(OutputPort *& output, Processors & processors)
@ -421,9 +422,9 @@ void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
if (format->expectMaterializedColumns()) if (format->expectMaterializedColumns())
{ {
addMaterializing(output, processors); addMaterializing(output, *processors);
addMaterializing(totals, processors); addMaterializing(totals, *processors);
addMaterializing(extremes, processors); addMaterializing(extremes, *processors);
} }
auto & format_main = format->getPort(IOutputFormat::PortKind::Main); auto & format_main = format->getPort(IOutputFormat::PortKind::Main);
@ -434,14 +435,14 @@ void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
{ {
auto source = std::make_shared<NullSource>(format_totals.getHeader()); auto source = std::make_shared<NullSource>(format_totals.getHeader());
totals = &source->getPort(); totals = &source->getPort();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
} }
if (!extremes) if (!extremes)
{ {
auto source = std::make_shared<NullSource>(format_extremes.getHeader()); auto source = std::make_shared<NullSource>(format_extremes.getHeader());
extremes = &source->getPort(); extremes = &source->getPort();
processors.emplace_back(std::move(source)); processors->emplace_back(std::move(source));
} }
connect(*output, format_main); connect(*output, format_main);
@ -455,7 +456,7 @@ void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
initRowsBeforeLimit(format.get()); initRowsBeforeLimit(format.get());
output_format = format.get(); output_format = format.get();
processors.emplace_back(std::move(format)); processors->emplace_back(std::move(format));
} }
Block QueryPipeline::getHeader() const Block QueryPipeline::getHeader() const
@ -504,7 +505,7 @@ void QueryPipeline::setLimitsAndQuota(const StreamLocalLimits & limits, std::sha
transform->setQuota(quota_); transform->setQuota(quota_);
connect(*output, transform->getInputPort()); connect(*output, transform->getInputPort());
output = &transform->getOutputPort(); output = &transform->getOutputPort();
processors.emplace_back(std::move(transform)); processors->emplace_back(std::move(transform));
} }
@ -529,7 +530,7 @@ void QueryPipeline::addCompletedPipeline(QueryPipeline other)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add not completed pipeline"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add not completed pipeline");
resources = std::move(other.resources); resources = std::move(other.resources);
processors.insert(processors.end(), other.processors.begin(), other.processors.end()); processors->insert(processors->end(), other.processors->begin(), other.processors->end());
} }
void QueryPipeline::reset() void QueryPipeline::reset()
@ -560,9 +561,9 @@ void QueryPipeline::convertStructureTo(const ColumnsWithTypeAndName & columns)
ActionsDAG::MatchColumnsMode::Position); ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(std::move(converting)); auto actions = std::make_shared<ExpressionActions>(std::move(converting));
addExpression(output, actions, processors); addExpression(output, actions, *processors);
addExpression(totals, actions, processors); addExpression(totals, actions, *processors);
addExpression(extremes, actions, processors); addExpression(extremes, actions, *processors);
} }
std::unique_ptr<ReadProgressCallback> QueryPipeline::getReadProgressCallback() const std::unique_ptr<ReadProgressCallback> QueryPipeline::getReadProgressCallback() const

View File

@ -4,6 +4,7 @@
#include <QueryPipeline/StreamLocalLimits.h> #include <QueryPipeline/StreamLocalLimits.h>
#include <functional> #include <functional>
namespace DB namespace DB
{ {
@ -34,6 +35,7 @@ class ReadProgressCallback;
struct ColumnWithTypeAndName; struct ColumnWithTypeAndName;
using ColumnsWithTypeAndName = std::vector<ColumnWithTypeAndName>; using ColumnsWithTypeAndName = std::vector<ColumnWithTypeAndName>;
class QueryPipeline class QueryPipeline
{ {
public: public:
@ -58,23 +60,23 @@ public:
/// completed /// completed
QueryPipeline( QueryPipeline(
QueryPlanResourceHolder resources_, QueryPlanResourceHolder resources_,
Processors processors_); std::shared_ptr<Processors> processors_);
/// pushing /// pushing
QueryPipeline( QueryPipeline(
QueryPlanResourceHolder resources_, QueryPlanResourceHolder resources_,
Processors processors_, std::shared_ptr<Processors> processors_,
InputPort * input_); InputPort * input_);
/// pulling /// pulling
QueryPipeline( QueryPipeline(
QueryPlanResourceHolder resources_, QueryPlanResourceHolder resources_,
Processors processors_, std::shared_ptr<Processors> processors_,
OutputPort * output_, OutputPort * output_,
OutputPort * totals_ = nullptr, OutputPort * totals_ = nullptr,
OutputPort * extremes_ = nullptr); OutputPort * extremes_ = nullptr);
bool initialized() const { return !processors.empty(); } bool initialized() const { return !processors->empty(); }
/// When initialized, exactly one of the following is true. /// When initialized, exactly one of the following is true.
/// Use PullingPipelineExecutor or PullingAsyncPipelineExecutor. /// Use PullingPipelineExecutor or PullingAsyncPipelineExecutor.
bool pulling() const { return output != nullptr; } bool pulling() const { return output != nullptr; }
@ -119,7 +121,7 @@ public:
/// Add processors and resources from other pipeline. Other pipeline should be completed. /// Add processors and resources from other pipeline. Other pipeline should be completed.
void addCompletedPipeline(QueryPipeline other); void addCompletedPipeline(QueryPipeline other);
const Processors & getProcessors() const { return processors; } const Processors & getProcessors() const { return *processors; }
/// For pulling pipeline, convert structure to expected. /// For pulling pipeline, convert structure to expected.
/// Trash, need to remove later. /// Trash, need to remove later.
@ -134,7 +136,7 @@ private:
std::shared_ptr<const EnabledQuota> quota; std::shared_ptr<const EnabledQuota> quota;
bool update_profile_events = true; bool update_profile_events = true;
Processors processors; std::shared_ptr<Processors> processors;
InputPort * input = nullptr; InputPort * input = nullptr;

View File

@ -327,9 +327,9 @@ QueryPipelineBuilderPtr QueryPipelineBuilder::mergePipelines(
collected_processors->emplace_back(transform); collected_processors->emplace_back(transform);
left->pipe.output_ports.front() = &transform->getOutputs().front(); left->pipe.output_ports.front() = &transform->getOutputs().front();
left->pipe.processors.emplace_back(transform); left->pipe.processors->emplace_back(transform);
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end()); left->pipe.processors->insert(left->pipe.processors->end(), right->pipe.processors->begin(), right->pipe.processors->end());
left->pipe.header = left->pipe.output_ports.front()->getHeader(); left->pipe.header = left->pipe.output_ports.front()->getHeader();
left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams); left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams);
return left; return left;
@ -383,7 +383,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
/// Collect the NEW processors for the right pipeline. /// Collect the NEW processors for the right pipeline.
QueryPipelineProcessorsCollector collector(*right); QueryPipelineProcessorsCollector collector(*right);
/// Remember the last step of the right pipeline. /// Remember the last step of the right pipeline.
ExpressionStep* step = typeid_cast<ExpressionStep*>(right->pipe.processors.back()->getQueryPlanStep()); ExpressionStep* step = typeid_cast<ExpressionStep*>(right->pipe.processors->back()->getQueryPlanStep());
if (!step) if (!step)
{ {
throw Exception(ErrorCodes::LOGICAL_ERROR, "The top step of the right pipeline should be ExpressionStep"); throw Exception(ErrorCodes::LOGICAL_ERROR, "The top step of the right pipeline should be ExpressionStep");
@ -467,7 +467,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
if (collected_processors) if (collected_processors)
collected_processors->emplace_back(joining); collected_processors->emplace_back(joining);
left->pipe.processors.emplace_back(std::move(joining)); left->pipe.processors->emplace_back(std::move(joining));
} }
if (left->hasTotals()) if (left->hasTotals())
@ -482,14 +482,14 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
if (collected_processors) if (collected_processors)
collected_processors->emplace_back(joining); collected_processors->emplace_back(joining);
left->pipe.processors.emplace_back(std::move(joining)); left->pipe.processors->emplace_back(std::move(joining));
} }
/// Move the collected processors to the last step in the right pipeline. /// Move the collected processors to the last step in the right pipeline.
Processors processors = collector.detachProcessors(); Processors processors = collector.detachProcessors();
step->appendExtraProcessors(processors); step->appendExtraProcessors(processors);
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end()); left->pipe.processors->insert(left->pipe.processors->end(), right->pipe.processors->begin(), right->pipe.processors->end());
left->resources = std::move(right->resources); left->resources = std::move(right->resources);
left->pipe.header = left->pipe.output_ports.front()->getHeader(); left->pipe.header = left->pipe.output_ports.front()->getHeader();
left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams); left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams);