Processors: experimental [#CLICKHOUSE-2948]

This commit is contained in:
Alexey Milovidov 2018-05-21 05:07:15 +03:00
parent c8b5a70247
commit c179f0703f
2 changed files with 190 additions and 122 deletions

View File

@ -9,6 +9,8 @@
#include <Poco/Event.h>
#include <Core/Block.h>
#include <iostream>
/** Processor is an element of query execution pipeline.
* It has zero or more input ports and zero or more output ports.
@ -180,7 +182,7 @@ private:
public:
using Port::Port;
void push(Block && block)
void push(Block block)
{
if (hasData())
throw Exception("Port already has data");
@ -266,7 +268,7 @@ protected:
public:
IProcessor() {}
IProcessor(InputPorts && inputs_, OutputPorts && outputs_)
IProcessor(InputPorts inputs_, OutputPorts outputs_)
: inputs(std::move(inputs_)), outputs(std::move(outputs_))
{
for (auto & port : inputs)
@ -299,19 +301,44 @@ public:
/// You may call 'schedule' method and processor will initiate some background work.
Async,
/// Processor is doing some work in background and you have to wait.
Wait
/// Processor is doing some work in background.
/// You may wait for next event or do something else and then you should call 'prepare' again.
Wait,
/// Call 'prepare' again.
Again,
};
/** Method 'prepare' is responsible for all cheap ("instantenous": O(1) of data volume, no wait) calculations.
*
* It may access input and output ports,
* indicate the need for work by another processor by returning NeedData or PortFull,
* or indicate the absense of work by returning Finished or Unneeded,
* it may pull data from input ports and push data to output ports.
*
* The method is not thread-safe and must be called from a single thread in one moment of time,
* even for different connected processors.
*
* Instead of all long work (CPU calculations or waiting) it should just prepare all required data and return Ready or Async.
*/
virtual Status prepare() = 0;
/// You may call this method if 'status' returned Ready.
/** You may call this method if 'prepare' returned Ready.
* This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
*
* Method work can be executed in parallel for different processors.
*/
virtual void work()
{
throw Exception("Method 'work' is not implemented for " + getName() + " processor");
}
/// You may call this method if 'status' returned Async.
/** You may call this method if 'prepare' returned Async.
* This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
*
* This method should return instantly and fire an event when asynchronous job will be done.
* When the job is not done, method 'prepare' will return Wait and the user may block and wait for next event before checking again.
*/
virtual void schedule(EventCounter & /*watch*/)
{
throw Exception("Method 'schedule' is not implemented for " + getName() + " processor");
@ -321,6 +348,19 @@ public:
auto & getInputs() { return inputs; }
auto & getOutputs() { return outputs; }
void dump() const
{
std::cerr << getName() << "\n";
std::cerr << "inputs:\n";
for (const auto & port : inputs)
std::cerr << "\t" << port.hasData() << " " << port.isNeeded() << " " << port.isFinished() << "\n";
std::cerr << "outputs:\n";
for (const auto & port : outputs)
std::cerr << "\t" << port.hasData() << " " << port.isNeeded() << "\n";
}
};
using ProcessorPtr = std::shared_ptr<IProcessor>;
@ -334,13 +374,22 @@ class SequentialPipelineExecutor : IProcessor
{
private:
std::list<ProcessorPtr> processors;
IProcessor * current_processor = nullptr;
/// Look for first Ready or Async processor by depth-first search in needed input ports.
/// NOTE: Pipeline must not have cycles.
template <typename Visit, typename Finish>
void traverse(IProcessor & processor, Visit && visit, Finish && finish)
{
Status status = processor.prepare();
Status status;
do
{
status = processor.prepare();
} while (status == Status::Again);
// processor.dump();
// std::cerr << "status: " << static_cast<int>(status) << "\n\n";
visit(processor, status);
if (status == Status::Ready || status == Status::Async)
@ -350,6 +399,11 @@ private:
for (auto & input : processor.getInputs())
if (input.isNeeded())
traverse(input.getOutputPort().getProcessor(), std::forward<Visit>(visit), std::forward<Finish>(finish));
if (status == Status::PortFull)
for (auto & output : processor.getOutputs())
if (output.hasData())
traverse(output.getInputPort().getProcessor(), std::forward<Visit>(visit), std::forward<Finish>(finish));
}
public:
@ -362,10 +416,13 @@ public:
Status prepare() override
{
current_processor = nullptr;
bool has_someone_to_wait = false;
bool found = false;
Status found_status = Status::Finished;
// std::cerr << "\n\n-----------------------------\n\n";
for (auto & element : processors)
{
traverse(*element,
@ -374,17 +431,17 @@ public:
if (status == Status::Wait)
has_someone_to_wait = true;
},
[&] (IProcessor &, Status status)
[&] (IProcessor & processor, Status status)
{
found = true;
current_processor = &processor;
found_status = status;
});
if (found)
if (current_processor)
break;
}
if (found)
if (current_processor)
return found_status;
if (has_someone_to_wait)
return Status::Wait;
@ -402,52 +459,19 @@ public:
void work() override
{
bool found = false;
for (auto & element : processors)
{
traverse(*element,
[] (IProcessor &, Status) {},
[&found] (IProcessor & processor, Status status)
{
if (status == Status::Ready)
{
found = true;
//std::cerr << processor.getName() << " will work\n";
processor.work();
}
});
if (found)
break;
}
if (!found)
if (!current_processor)
throw Exception("Bad pipeline");
// std::cerr << current_processor->getName() << " will work\n";
current_processor->work();
}
void schedule(EventCounter & watch) override
{
bool found = false;
for (auto & element : processors)
{
traverse(*element,
[] (IProcessor &, Status) {},
[&found, &watch] (IProcessor & processor, Status status)
{
if (status == Status::Async)
{
found = true;
//std::cerr << processor.getName() << " will schedule\n";
processor.schedule(watch);
}
});
if (found)
break;
}
if (!found)
if (!current_processor)
throw Exception("Bad pipeline");
current_processor->schedule(watch);
}
};
@ -457,11 +481,12 @@ class ISource : public IProcessor
protected:
OutputPort & output;
bool finished = false;
Block current_block;
virtual Block generate() = 0;
public:
ISource(Block && header)
ISource(Block header)
: IProcessor({}, {std::move(header)}), output(outputs.front())
{
}
@ -477,14 +502,16 @@ public:
if (!output.isNeeded())
return Status::Unneeded;
if (current_block)
output.push(std::move(current_block));
return Status::Ready;
}
void work() override
{
if (Block block = generate())
output.push(std::move(block));
else
current_block = generate();
if (!current_block)
finished = true;
}
@ -496,20 +523,27 @@ class ISink : public IProcessor
{
protected:
InputPort & input;
Block current_block;
virtual void consume(Block && block) = 0;
virtual void consume(Block block) = 0;
public:
ISink(Block && header)
ISink(Block header)
: IProcessor({std::move(header)}, {}), input(inputs.front())
{
}
Status prepare() override
{
if (input.hasData())
if (current_block)
return Status::Ready;
if (input.hasData())
{
current_block = input.pull();
return Status::Ready;
}
if (input.isFinished())
return Status::Finished;
@ -519,7 +553,10 @@ public:
void work() override
{
consume(input.pull());
// std::cerr << "Working\n";
consume(std::move(current_block));
// std::cerr << current_block.columns() << "\n";
}
InputPort & getPort() { return input; }
@ -535,10 +572,13 @@ protected:
InputPort & input;
OutputPort & output;
Block current_block;
bool transformed = false;
virtual void transform(Block & block) = 0;
public:
ITransform(Block && input_header, Block && output_header)
ITransform(Block input_header, Block output_header)
: IProcessor({std::move(input_header)}, {std::move(output_header)}),
input(inputs.front()), output(outputs.front())
{
@ -546,14 +586,25 @@ public:
Status prepare() override
{
if (output.hasData())
return Status::PortFull;
if (!output.isNeeded())
return Status::Unneeded;
if (current_block)
{
if (!transformed)
return Status::Ready;
else if (output.hasData())
return Status::PortFull;
else
output.push(std::move(current_block));
}
if (input.hasData())
{
current_block = input.pull();
transformed = false;
return Status::Ready;
}
if (input.isFinished())
return Status::Finished;
@ -564,9 +615,8 @@ public:
void work() override
{
Block data = input.pull();
transform(data);
output.push(std::move(data));
transform(current_block);
transformed = true;
}
InputPort & getInputPort() { return input; }
@ -621,17 +671,14 @@ public:
}
}
if (all_inputs_finished)
return Status::Finished;
if (all_inputs_have_no_data)
return Status::NeedData;
{
if (all_inputs_finished)
return Status::Finished;
else
return Status::NeedData;
}
return Status::Ready;
}
void work() override
{
for (auto & input : inputs)
{
if (input.hasData())
@ -647,6 +694,8 @@ public:
break;
}
}
return Status::Again;
}
};
@ -665,11 +714,13 @@ private:
size_t limit;
size_t offset;
size_t pos = 0; /// how many rows were read, including the last read block
size_t rows_read = 0; /// including the last read block
bool always_read_till_end;
Block current_block;
public:
LimitTransform(Block && header, size_t limit, size_t offset, bool always_read_till_end = false)
LimitTransform(Block header, size_t limit, size_t offset, bool always_read_till_end = false)
: IProcessor({std::move(header)}, {std::move(header)}),
input(inputs.front()), output(outputs.front()),
limit(limit), offset(offset), always_read_till_end(always_read_till_end)
@ -680,7 +731,15 @@ public:
Status prepare() override
{
if (pos >= offset + limit)
if (current_block)
{
if (output.hasData())
return Status::PortFull;
output.push(std::move(current_block));
}
if (rows_read >= offset + limit)
{
output.setFinished();
if (!always_read_till_end)
@ -690,53 +749,62 @@ public:
}
}
if (output.hasData())
return Status::PortFull;
if (!output.isNeeded())
return Status::Unneeded;
input.setNeeded();
return input.hasData()
? Status::Ready
: Status::NeedData;
if (!input.hasData())
return Status::NeedData;
current_block = input.pull();
/// Skip block (for 'always_read_till_end' case)
if (rows_read >= offset + limit)
{
current_block.clear();
return Status::NeedData;
}
size_t rows = current_block.rows();
rows_read += rows;
if (rows_read <= offset)
{
current_block.clear();
return Status::NeedData;
}
/// return the whole block
if (rows_read >= offset + rows && rows_read <= offset + limit)
{
if (output.hasData())
return Status::PortFull;
output.push(std::move(current_block));
return Status::NeedData;
}
return Status::Ready;
}
void work() override
{
Block block = input.pull();
if (pos >= offset + limit)
return;
size_t rows = block.rows();
pos += rows;
if (pos <= offset)
return;
/// return the whole block
if (pos >= offset + rows && pos <= offset + limit)
{
output.push(std::move(block));
return;
}
size_t rows = current_block.rows();
size_t columns = current_block.columns();
/// return a piece of the block
size_t start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));
static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(rows));
size_t length = std::min(
static_cast<Int64>(limit), std::min(
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(rows)));
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->cut(start, length);
output.push(std::move(block));
current_block.getByPosition(i).column = current_block.getByPosition(i).column->cut(start, length);
}
InputPort & getInputPort() { return input; }

View File

@ -62,12 +62,8 @@ public:
if (output.hasData())
return Status::PortFull;
return Status::Ready;
}
void work() override
{
output.push(std::move(current_block));
return Status::Again;
}
void schedule(EventCounter & watch) override
@ -115,7 +111,7 @@ public:
private:
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
void consume(Block && block) override
void consume(Block block) override
{
size_t rows = block.rows();
size_t columns = block.columns();
@ -139,21 +135,25 @@ private:
int main(int, char **)
try
{
auto source1 = std::make_shared<SleepyNumbersSource>(0, 100000);
auto source0 = std::make_shared<NumbersSource>();
auto header = source0->getPort().getHeader();
auto limit0 = std::make_shared<LimitTransform>(Block(header), 10, 0);
auto source1 = std::make_shared<SleepyNumbersSource>(100, 100000);
auto source2 = std::make_shared<SleepyNumbersSource>(1000, 200000);
auto header = source1->getPort().getHeader();
auto resize = std::make_shared<ResizeProcessor>(InputPorts{Block(header), Block(header)}, OutputPorts{Block(header)});
auto resize = std::make_shared<ResizeProcessor>(InputPorts{Block(header), Block(header), Block(header)}, OutputPorts{Block(header)});
auto limit = std::make_shared<LimitTransform>(Block(header), 100, 0);
auto sink = std::make_shared<PrintSink>();
connect(source1->getPort(), resize->getInputs()[0]);
connect(source2->getPort(), resize->getInputs()[1]);
connect(source0->getPort(), limit0->getInputPort());
connect(limit0->getOutputPort(), resize->getInputs()[0]);
connect(source1->getPort(), resize->getInputs()[1]);
connect(source2->getPort(), resize->getInputs()[2]);
connect(resize->getOutputs()[0], limit->getInputPort());
connect(limit->getOutputPort(), sink->getPort());
SequentialPipelineExecutor executor({source1, source2, resize, limit, sink});
SequentialPipelineExecutor executor({sink});
EventCounter watch;
while (true)