2018-05-23 20:19:33 +00:00
|
|
|
#pragma once
|
|
|
|
|
|
|
|
#include <memory>
|
|
|
|
#include <vector>
|
|
|
|
#include <Core/Block.h>
|
2019-02-19 18:41:18 +00:00
|
|
|
#include <Processors/Chunk.h>
|
2018-05-23 20:19:33 +00:00
|
|
|
#include <Common/Exception.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
class InputPort;
|
|
|
|
class OutputPort;
|
|
|
|
class IProcessor;
|
|
|
|
|
|
|
|
|
|
|
|
class Port
|
|
|
|
{
|
2018-06-04 18:31:46 +00:00
|
|
|
friend void connect(OutputPort &, InputPort &);
|
2018-05-23 20:19:33 +00:00
|
|
|
friend class IProcessor;
|
|
|
|
|
2018-06-04 18:31:46 +00:00
|
|
|
protected:
|
2018-05-23 20:19:33 +00:00
|
|
|
/// Shared state of two connected ports.
|
2019-02-07 18:51:53 +00:00
|
|
|
class State
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2019-02-07 18:51:53 +00:00
|
|
|
public:
|
|
|
|
State() = default;
|
|
|
|
|
2019-02-18 16:36:07 +00:00
|
|
|
void push(Chunk chunk)
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
|
|
|
if (finished)
|
|
|
|
throw Exception("Cannot push block to finished port.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
if (!needed)
|
|
|
|
throw Exception("Cannot push block to port which is not needed.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
if (has_data)
|
|
|
|
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2019-02-18 16:36:07 +00:00
|
|
|
data = std::move(chunk);
|
2019-02-07 18:51:53 +00:00
|
|
|
has_data = true;
|
|
|
|
}
|
|
|
|
|
2019-02-18 16:36:07 +00:00
|
|
|
Chunk pull()
|
2019-02-07 18:51:53 +00:00
|
|
|
{
|
|
|
|
if (!needed)
|
|
|
|
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
if (!has_data)
|
|
|
|
throw Exception("Cannot pull block from port which has no data.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
has_data = false;
|
|
|
|
return std::move(data);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool hasData() const
|
|
|
|
{
|
2019-02-27 12:51:27 +00:00
|
|
|
// TODO: check for output port only.
|
|
|
|
// if (finished)
|
|
|
|
// throw Exception("Finished port can't has data.", ErrorCodes::LOGICAL_ERROR);
|
2019-02-07 18:51:53 +00:00
|
|
|
|
|
|
|
if (!needed)
|
|
|
|
throw Exception("Cannot check if not needed port has data.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
return has_data;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Only for output port.
|
|
|
|
/// If port still has data, it will be finished after pulling.
|
|
|
|
void finish()
|
|
|
|
{
|
|
|
|
finished = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Only for input port. Removes data if has.
|
|
|
|
void close()
|
|
|
|
{
|
|
|
|
finished = true;
|
|
|
|
has_data = false;
|
|
|
|
data.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Only empty ports are finished.
|
|
|
|
bool isFinished() const { return finished && !has_data; }
|
|
|
|
bool isSetFinished() const { return finished; }
|
|
|
|
|
|
|
|
void setNeeded()
|
|
|
|
{
|
2019-02-27 12:51:27 +00:00
|
|
|
if (isFinished())
|
2019-02-07 18:51:53 +00:00
|
|
|
throw Exception("Can't set port needed if it is finished.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
// if (has_data)
|
|
|
|
// throw Exception("Can't set port needed if it has data.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
needed = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void setNotNeeded()
|
|
|
|
{
|
|
|
|
// if (finished)
|
|
|
|
// throw Exception("Can't set port not needed if it is finished.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
needed = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Only for output port.
|
|
|
|
bool isNeeded() const { return needed && !finished; }
|
|
|
|
|
|
|
|
private:
|
2019-02-18 16:36:07 +00:00
|
|
|
Chunk data;
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Use special flag to check if block has data. This allows to send empty blocks between processors.
|
|
|
|
bool has_data = false;
|
|
|
|
/// Block is not needed right now, but may be will be needed later.
|
|
|
|
/// This allows to pause calculations if we are not sure that we need more data.
|
2018-05-23 20:19:33 +00:00
|
|
|
bool needed = false;
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Port was set finished or closed.
|
2018-05-23 20:19:33 +00:00
|
|
|
bool finished = false;
|
|
|
|
};
|
|
|
|
|
|
|
|
Block header;
|
|
|
|
std::shared_ptr<State> state;
|
|
|
|
|
|
|
|
IProcessor * processor = nullptr;
|
|
|
|
|
|
|
|
public:
|
2019-02-07 18:51:53 +00:00
|
|
|
Port(Block header) : header(std::move(header)) {}
|
2019-02-27 12:51:27 +00:00
|
|
|
Port(Block header, IProcessor * processor) : header(std::move(header)), processor(processor) {}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2018-05-24 01:02:16 +00:00
|
|
|
const Block & getHeader() const { return header; }
|
2018-05-23 20:19:33 +00:00
|
|
|
bool isConnected() const { return state != nullptr; }
|
|
|
|
|
|
|
|
void assumeConnected() const
|
|
|
|
{
|
|
|
|
if (!isConnected())
|
2019-02-05 13:01:40 +00:00
|
|
|
throw Exception("Port is not connected", ErrorCodes::LOGICAL_ERROR);
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool hasData() const
|
|
|
|
{
|
|
|
|
assumeConnected();
|
2019-02-07 18:51:53 +00:00
|
|
|
return state->hasData();
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
IProcessor & getProcessor()
|
|
|
|
{
|
|
|
|
if (!processor)
|
2019-02-05 13:01:40 +00:00
|
|
|
throw Exception("Port does not belong to Processor", ErrorCodes::LOGICAL_ERROR);
|
2018-05-23 20:19:33 +00:00
|
|
|
return *processor;
|
|
|
|
}
|
2018-05-24 02:39:22 +00:00
|
|
|
|
|
|
|
const IProcessor & getProcessor() const
|
|
|
|
{
|
|
|
|
if (!processor)
|
2019-02-05 13:01:40 +00:00
|
|
|
throw Exception("Port does not belong to Processor", ErrorCodes::LOGICAL_ERROR);
|
2018-05-24 02:39:22 +00:00
|
|
|
return *processor;
|
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
};
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Invariants:
|
|
|
|
/// * If you close port, it isFinished().
|
|
|
|
/// * If port isFinished(), you can do nothing with it.
|
|
|
|
/// * If port is not needed, you can only setNeeded() or close() it.
|
|
|
|
/// * You can pull only if port hasData().
|
2018-05-23 20:19:33 +00:00
|
|
|
class InputPort : public Port
|
|
|
|
{
|
|
|
|
friend void connect(OutputPort &, InputPort &);
|
|
|
|
|
|
|
|
private:
|
|
|
|
OutputPort * output_port = nullptr;
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
/// If version was set, it will be increased on each pull.
|
|
|
|
UInt64 * version = nullptr;
|
|
|
|
|
2018-05-23 20:19:33 +00:00
|
|
|
public:
|
|
|
|
using Port::Port;
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
void setVersion(UInt64 * value) { version = value; }
|
|
|
|
|
2019-02-18 16:36:07 +00:00
|
|
|
Chunk pull()
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
if (version)
|
|
|
|
++(*version);
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
assumeConnected();
|
|
|
|
return state->pull();
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool isFinished() const
|
|
|
|
{
|
|
|
|
assumeConnected();
|
2019-02-07 18:51:53 +00:00
|
|
|
return state->isFinished();
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void setNeeded()
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
if (version)
|
|
|
|
++(*version);
|
|
|
|
|
2018-05-23 20:19:33 +00:00
|
|
|
assumeConnected();
|
2019-02-07 18:51:53 +00:00
|
|
|
state->setNeeded();
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void setNotNeeded()
|
|
|
|
{
|
|
|
|
assumeConnected();
|
2019-02-07 18:51:53 +00:00
|
|
|
state->setNotNeeded();
|
|
|
|
}
|
|
|
|
|
|
|
|
void close()
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
if (version && !isFinished())
|
|
|
|
++(*version);
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
assumeConnected();
|
|
|
|
state->close();
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
OutputPort & getOutputPort()
|
|
|
|
{
|
|
|
|
assumeConnected();
|
|
|
|
return *output_port;
|
|
|
|
}
|
2018-05-24 02:39:22 +00:00
|
|
|
|
|
|
|
const OutputPort & getOutputPort() const
|
|
|
|
{
|
|
|
|
assumeConnected();
|
|
|
|
return *output_port;
|
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
/// Invariants:
|
|
|
|
/// * If you finish port, it isFinished().
|
|
|
|
/// * If port isFinished(), you can do nothing with it.
|
|
|
|
/// * If port not isNeeded(), you can only finish() it.
|
|
|
|
/// * You can hush only if port doesn't hasData().
|
2018-05-23 20:19:33 +00:00
|
|
|
class OutputPort : public Port
|
|
|
|
{
|
|
|
|
friend void connect(OutputPort &, InputPort &);
|
|
|
|
|
|
|
|
private:
|
|
|
|
InputPort * input_port = nullptr;
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
/// If version was set, it will be increased on each push.
|
|
|
|
UInt64 * version = nullptr;
|
|
|
|
|
2018-05-23 20:19:33 +00:00
|
|
|
public:
|
|
|
|
using Port::Port;
|
|
|
|
|
2019-02-08 16:10:57 +00:00
|
|
|
void setVersion(UInt64 * value) { version = value; }
|
|
|
|
|
2019-02-18 16:36:07 +00:00
|
|
|
void push(Chunk chunk)
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
if (version)
|
|
|
|
++(*version);
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
assumeConnected();
|
2019-04-08 10:37:09 +00:00
|
|
|
|
|
|
|
if (chunk.getNumColumns() != header.columns())
|
|
|
|
{
|
|
|
|
String msg = "Invalid number of columns in chunk pushed to OutputPort. Expected"
|
|
|
|
+ std::to_string(header.columns()) + ", found " + std::to_string(chunk.getNumColumns()) + '\n';
|
|
|
|
|
|
|
|
msg += "Header: " + header.dumpStructure() + '\n';
|
|
|
|
msg += "Chunk: " + chunk.dumpStructure() + '\n';
|
|
|
|
|
|
|
|
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
2019-02-18 16:36:07 +00:00
|
|
|
state->push(std::move(chunk));
|
2019-02-07 18:51:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void finish()
|
|
|
|
{
|
2019-02-08 16:10:57 +00:00
|
|
|
if (version && !isFinished())
|
|
|
|
++(*version);
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
assumeConnected();
|
|
|
|
state->finish();
|
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
bool isNeeded() const
|
|
|
|
{
|
|
|
|
assumeConnected();
|
|
|
|
return state->isNeeded();
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
bool isFinished() const
|
2018-05-23 20:19:33 +00:00
|
|
|
{
|
|
|
|
assumeConnected();
|
2019-02-07 18:51:53 +00:00
|
|
|
return state->isSetFinished();
|
2018-05-23 20:19:33 +00:00
|
|
|
}
|
|
|
|
|
2019-02-07 18:51:53 +00:00
|
|
|
bool canPush() const { return isNeeded() && !hasData(); }
|
|
|
|
|
2018-05-23 20:19:33 +00:00
|
|
|
InputPort & getInputPort()
|
|
|
|
{
|
|
|
|
assumeConnected();
|
|
|
|
return *input_port;
|
|
|
|
}
|
2018-05-24 02:39:22 +00:00
|
|
|
|
|
|
|
const InputPort & getInputPort() const
|
|
|
|
{
|
|
|
|
assumeConnected();
|
|
|
|
return *input_port;
|
|
|
|
}
|
2018-05-23 20:19:33 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
2019-02-27 11:24:14 +00:00
|
|
|
using InputPorts = std::list<InputPort>;
|
|
|
|
using OutputPorts = std::list<OutputPort>;
|
2018-05-23 20:19:33 +00:00
|
|
|
|
|
|
|
|
|
|
|
void connect(OutputPort & output, InputPort & input);
|
|
|
|
|
|
|
|
}
|