Refactor Pipes part 1.

This commit is contained in:
Nikolai Kochetov 2020-07-31 19:54:54 +03:00
parent 39530f837e
commit 2ae94f4570
4 changed files with 234 additions and 12 deletions

View File

@ -29,23 +29,33 @@ Pipes narrowPipes(Pipes pipes, size_t width)
if (size <= width) if (size <= width)
return pipes; return pipes;
std::vector<Pipes> partitions(width); std::vector<std::vector<OutputPort *>> partitions(width);
auto distribution = getDistribution(size, width); auto distribution = getDistribution(size, width);
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].emplace_back(std::move(pipes[i])); partitions[distribution[i]].emplace_back(pipes.getOutputPort(i));
Pipes res; Processors concats;
res.reserve(width); concats.reserve(width);
for (size_t i = 0; i < width; ++i) for (size_t i = 0; i < width; ++i)
{ {
auto processor = std::make_shared<ConcatProcessor>(partitions[i].at(0).getHeader(), partitions[i].size()); auto concat = std::make_shared<ConcatProcessor>(partitions[i].at(0)->getHeader(), partitions[i].size());
res.emplace_back(std::move(partitions[i]), std::move(processor)); size_t next_port = 0;
for (auto & port : concat->getInputs())
{
connect(*partitions[i][next_port], port);
++next_port;
}
concats.emplace_back(std::move(concat));
} }
return res; auto processors = Pipes::detachProcessors(std::move(pipes));
processors.insert(processors.end(), concats.begin(), concats.end());
return Pipes(std::move(processors));
} }
} }

View File

@ -54,6 +54,149 @@ static void checkSource(const IProcessor & source)
toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR); toString(source.getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
} }
Pipes::Pipes(ProcessorPtr source)
{
checkSource(*source);
output_ports.push_back(&source->getOutputs().front());
header = output_ports.front()->getHeader();
processors.emplace_back(std::move(source));
max_parallel_streams = 1;
}
Pipes::Pipes(Processors processors_) : processors(std::move(processors_))
{
/// Create hash table with processors.
std::unordered_set<const IProcessor *> set;
for (const auto & processor : processors)
set.emplace(processor.get());
for (auto & processor : processors)
{
for (const auto & port : processor->getInputs())
{
if (!port.isConnected())
throw Exception("Cannot create Pipes because processor " + processor->getName() +
" has not connected input port", ErrorCodes::LOGICAL_ERROR);
const auto * connected_processor = &port.getOutputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Cannot create Pipes because processor " + processor->getName() +
" has input port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
for (auto & port : processor->getOutputs())
{
if (!port.isConnected())
{
output_ports.push_back(&port);
continue;
}
const auto * connected_processor = &port.getInputPort().getProcessor();
if (set.count(connected_processor) == 0)
throw Exception("Cannot create Pipes because processor " + processor->getName() +
" has output port which is connected with unknown processor " +
connected_processor->getName(), ErrorCodes::LOGICAL_ERROR);
}
}
if (output_ports.empty())
throw Exception("Cannot create Pipes because processors don't have any not-connected output ports",
ErrorCodes::LOGICAL_ERROR);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes");
max_parallel_streams = output_ports.size();
}
void Pipes::addTransform(ProcessorPtr transform)
{
auto & inputs = transform->getInputs();
if (inputs.size() != output_ports.size())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
"Processor has " + std::to_string(inputs.size()) + " input ports, "
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
size_t next_output = 0;
for (auto & input : inputs)
{
connect(*output_ports[next_output], input);
++next_output;
}
auto & outputs = transform->getOutputs();
if (outputs.empty())
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs",
ErrorCodes::LOGICAL_ERROR);
output_ports.clear();
output_ports.reserve(outputs.size());
for (auto & output : outputs)
output_ports.emplace_back(&output);
header = output_ports.front()->getHeader();
for (size_t i = 1; i < output_ports.size(); ++i)
assertBlocksHaveEqualStructure(header, output_ports[i]->getHeader(), "Pipes");
if (totals_port)
assertBlocksHaveEqualStructure(header, totals_port->getHeader(), "Pipes");
if (extremes_port)
assertBlocksHaveEqualStructure(header, extremes_port->getHeader(), "Pipes");
}
void Pipes::addSimpleTransform(const ProcessorGetter & getter)
{
Block new_header;
auto add_transform = [&](OutputPort *& port, StreamType stream_type)
{
if (!port)
return;
auto transform = getter(port->getHeader(), stream_type);
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (transform->getOutputs().size() != 1)
throw Exception("Processor for query pipeline transform should have single output, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
const auto & out_header = transform ? transform->getOutputs().front().getHeader()
: port->getHeader();
if (new_header)
assertBlocksHaveEqualStructure(new_header, out_header, "QueryPipeline");
else
new_header = out_header;
if (transform)
{
connect(*port, transform->getInputs().front());
port = &transform->getOutputs().front();
processors.emplace_back(std::move(transform));
}
};
for (auto & port : output_ports)
add_transform(port, StreamType::Main);
add_transform(totals_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
header = std::move(new_header);
}
Pipe::Pipe(ProcessorPtr source) Pipe::Pipe(ProcessorPtr source)
{ {

View File

@ -6,11 +6,84 @@ namespace DB
{ {
class Pipe; class Pipe;
using Pipes = std::vector<Pipe>;
class IStorage; class IStorage;
using StoragePtr = std::shared_ptr<IStorage>; using StoragePtr = std::shared_ptr<IStorage>;
/// Pipes is a set of processors which represents the part of pipeline.
/// Pipes contains a list of output ports, with specified port for totals and specified port for extremes.
/// All output ports have same header.
/// All other ports are connected, all connections are inside processors set.
class Pipes
{
public:
/// Create from source. Source must have no input ports and single output.
explicit Pipes(ProcessorPtr source);
/// Create from processors. Use all not-connected output ports as output_ports. Check invariants.
explicit Pipes(Processors processors_);
Pipes(const Pipes & other) = delete;
Pipes(Pipes && other) = default;
Pipes & operator=(const Pipes & other) = delete;
Pipes & operator=(Pipes && other) = default;
const Block & getHeader() const { return header; }
bool empty() const { return output_ports.empty(); }
size_t size() const { return output_ports.size(); }
OutputPort * getOutputPort(size_t pos) const { return output_ports[pos]; }
OutputPort * getTotalsPort() const { return totals_port; }
OutputPort * getExtremesPort() const { return extremes_port; }
/// Add processor to list, add it output ports to output_ports.
/// Processor shouldn't have input ports, output ports shouldn't be connected.
/// Output headers should have same structure and be compatible with current header (if not empty()).
/// void addSource(ProcessorPtr source);
/// Add processor to list. It should have size() input ports with compatible header.
/// Output ports should have same headers.
/// If totals or extremes are not empty, transform shouldn't change header.
void addTransform(ProcessorPtr transform);
enum class StreamType
{
Main = 0, /// Stream for query data. There may be several streams of this type.
Totals, /// Stream for totals. No more then one.
Extremes, /// Stream for extremes. No more then one.
};
using ProcessorGetter = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
/// Add transform with single input and single output for each port.
void addSimpleTransform(const ProcessorGetter & port);
/// Destroy pipes and get processors.
static Processors detachProcessors(Pipes pipes) { return std::move(pipes.processors); }
private:
Processors processors;
/// Header is common for all output below.
Block header;
/// Output ports. Totals and extremes are allowed to be empty.
std::vector<OutputPort *> output_ports;
OutputPort * totals_port = nullptr;
OutputPort * extremes_port = nullptr;
/// It is the max number of processors which can be executed in parallel for each step. See QueryPipeline::Streams.
/// Usually, it's the same as the number of output ports.
size_t max_parallel_streams = 0;
std::vector<TableLockHolder> table_locks;
/// Some processors may implicitly use Context or temporary Storage created by Interpreter.
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
};
/// Pipe is a set of processors which represents the part of pipeline with single output. /// Pipe is a set of processors which represents the part of pipeline with single output.
/// All processors in pipe are connected. All ports are connected except the output one. /// All processors in pipe are connected. All ports are connected except the output one.
class Pipe class Pipe

View File

@ -151,9 +151,6 @@ public:
/// Will read from this stream after all data was read from other streams. /// Will read from this stream after all data was read from other streams.
void addDelayedStream(ProcessorPtr source); void addDelayedStream(ProcessorPtr source);
/// Check if resize transform was used. (In that case another distinct transform will be added).
bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
/// Changes the number of input ports if needed. Adds ResizeTransform. /// Changes the number of input ports if needed. Adds ResizeTransform.
void resize(size_t num_streams, bool force = false, bool strict = false); void resize(size_t num_streams, bool force = false, bool strict = false);
@ -167,7 +164,6 @@ public:
size_t getNumStreams() const { return streams.size(); } size_t getNumStreams() const { return streams.size(); }
bool hasMoreThanOneStream() const { return getNumStreams() > 1; }
bool hasTotals() const { return totals_having_port != nullptr; } bool hasTotals() const { return totals_having_port != nullptr; }
const Block & getHeader() const { return current_header; } const Block & getHeader() const { return current_header; }