ClickHouse/dbms/src/Processors/Port.h

372 lines
8.7 KiB
C++
Raw Normal View History

#pragma once
#include <memory>
#include <vector>
#include <variant>
#include <Core/Block.h>
2019-02-19 18:41:18 +00:00
#include <Processors/Chunk.h>
#include <Common/Exception.h>
namespace DB
{
class InputPort;
class OutputPort;
class IProcessor;
class Port
{
friend void connect(OutputPort &, InputPort &);
friend class IProcessor;
protected:
/// Shared state of two connected ports.
2019-02-07 18:51:53 +00:00
class State
{
2019-02-07 18:51:53 +00:00
public:
2019-04-29 18:43:50 +00:00
using Data = std::variant<Chunk, std::exception_ptr>;
2019-02-07 18:51:53 +00:00
State() = default;
2019-04-29 18:43:50 +00:00
void pushData(Data data_)
2019-02-07 18:51:53 +00:00
{
if (finished)
throw Exception("Cannot push block to finished port.", ErrorCodes::LOGICAL_ERROR);
if (!needed)
throw Exception("Cannot push block to port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (has_data)
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
data = std::move(data_);
2019-02-07 18:51:53 +00:00
has_data = true;
}
void push(Chunk chunk)
{
pushData(std::move(chunk));
}
void push(std::exception_ptr exception)
{
pushData(std::move(exception));
}
auto pullData()
2019-02-07 18:51:53 +00:00
{
if (!needed)
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (!has_data)
throw Exception("Cannot pull block from port which has no data.", ErrorCodes::LOGICAL_ERROR);
has_data = false;
2019-02-07 18:51:53 +00:00
return std::move(data);
}
Chunk pull()
{
auto cur_data = pullData();
if (std::holds_alternative<std::exception_ptr>(cur_data))
std::rethrow_exception(std::move(std::get<std::exception_ptr>(cur_data)));
return std::move(std::get<Chunk>(cur_data));
}
2019-02-07 18:51:53 +00:00
bool hasData() const
{
// TODO: check for output port only.
// if (finished)
// throw Exception("Finished port can't has data.", ErrorCodes::LOGICAL_ERROR);
2019-02-07 18:51:53 +00:00
if (!needed)
throw Exception("Cannot check if not needed port has data.", ErrorCodes::LOGICAL_ERROR);
return has_data;
}
/// Only for output port.
/// If port still has data, it will be finished after pulling.
void finish()
{
finished = true;
}
/// Only for input port. Removes data if has.
void close()
{
finished = true;
has_data = false;
if (std::holds_alternative<Chunk>(data))
std::get<Chunk>(data).clear();
2019-02-07 18:51:53 +00:00
}
/// Only empty ports are finished.
bool isFinished() const { return finished && !has_data; }
bool isSetFinished() const { return finished; }
void setNeeded()
{
if (isFinished())
2019-02-07 18:51:53 +00:00
throw Exception("Can't set port needed if it is finished.", ErrorCodes::LOGICAL_ERROR);
// if (has_data)
// throw Exception("Can't set port needed if it has data.", ErrorCodes::LOGICAL_ERROR);
needed = true;
}
void setNotNeeded()
{
// if (finished)
// throw Exception("Can't set port not needed if it is finished.", ErrorCodes::LOGICAL_ERROR);
needed = false;
}
/// Only for output port.
bool isNeeded() const { return needed && !finished; }
private:
2019-04-29 18:43:50 +00:00
Data data;
2019-02-07 18:51:53 +00:00
/// Use special flag to check if block has data. This allows to send empty blocks between processors.
bool has_data = false;
/// Block is not needed right now, but may be will be needed later.
/// This allows to pause calculations if we are not sure that we need more data.
bool needed = false;
2019-02-07 18:51:53 +00:00
/// Port was set finished or closed.
bool finished = false;
};
Block header;
std::shared_ptr<State> state;
IProcessor * processor = nullptr;
public:
2019-04-29 18:43:50 +00:00
using Data = State::Data;
2019-02-07 18:51:53 +00:00
Port(Block header) : header(std::move(header)) {}
Port(Block header, IProcessor * processor) : header(std::move(header)), processor(processor) {}
const Block & getHeader() const { return header; }
bool isConnected() const { return state != nullptr; }
void assumeConnected() const
{
if (!isConnected())
2019-02-05 13:01:40 +00:00
throw Exception("Port is not connected", ErrorCodes::LOGICAL_ERROR);
}
bool hasData() const
{
assumeConnected();
2019-02-07 18:51:53 +00:00
return state->hasData();
}
IProcessor & getProcessor()
{
if (!processor)
2019-02-05 13:01:40 +00:00
throw Exception("Port does not belong to Processor", ErrorCodes::LOGICAL_ERROR);
return *processor;
}
const IProcessor & getProcessor() const
{
if (!processor)
2019-02-05 13:01:40 +00:00
throw Exception("Port does not belong to Processor", ErrorCodes::LOGICAL_ERROR);
return *processor;
}
};
2019-02-07 18:51:53 +00:00
/// Invariants:
/// * If you close port, it isFinished().
/// * If port isFinished(), you can do nothing with it.
/// * If port is not needed, you can only setNeeded() or close() it.
/// * You can pull only if port hasData().
class InputPort : public Port
{
friend void connect(OutputPort &, InputPort &);
private:
OutputPort * output_port = nullptr;
2019-02-08 16:10:57 +00:00
/// If version was set, it will be increased on each pull.
UInt64 * version = nullptr;
public:
using Port::Port;
2019-02-08 16:10:57 +00:00
void setVersion(UInt64 * value) { version = value; }
Chunk pull()
{
2019-02-08 16:10:57 +00:00
if (version)
++(*version);
2019-02-07 18:51:53 +00:00
assumeConnected();
return state->pull();
}
2019-04-29 18:43:50 +00:00
Data pullData()
{
if (version)
++(*version);
assumeConnected();
return state->pullData();
}
bool isFinished() const
{
assumeConnected();
2019-02-07 18:51:53 +00:00
return state->isFinished();
}
void setNeeded()
{
2019-02-08 16:10:57 +00:00
if (version)
++(*version);
assumeConnected();
2019-02-07 18:51:53 +00:00
state->setNeeded();
}
void setNotNeeded()
{
assumeConnected();
2019-02-07 18:51:53 +00:00
state->setNotNeeded();
}
void close()
{
2019-02-08 16:10:57 +00:00
if (version && !isFinished())
++(*version);
2019-02-07 18:51:53 +00:00
assumeConnected();
state->close();
}
OutputPort & getOutputPort()
{
assumeConnected();
return *output_port;
}
const OutputPort & getOutputPort() const
{
assumeConnected();
return *output_port;
}
};
2019-02-07 18:51:53 +00:00
/// Invariants:
/// * If you finish port, it isFinished().
/// * If port isFinished(), you can do nothing with it.
/// * If port not isNeeded(), you can only finish() it.
/// * You can hush only if port doesn't hasData().
class OutputPort : public Port
{
friend void connect(OutputPort &, InputPort &);
private:
InputPort * input_port = nullptr;
2019-02-08 16:10:57 +00:00
/// If version was set, it will be increased on each push.
UInt64 * version = nullptr;
public:
using Port::Port;
2019-02-08 16:10:57 +00:00
void setVersion(UInt64 * value) { version = value; }
void push(Chunk chunk)
{
2019-02-08 16:10:57 +00:00
if (version)
++(*version);
2019-02-07 18:51:53 +00:00
assumeConnected();
if (chunk.getNumColumns() != header.columns())
{
2019-04-08 13:29:24 +00:00
String msg = "Invalid number of columns in chunk pushed to OutputPort. Expected "
+ std::to_string(header.columns()) + ", found " + std::to_string(chunk.getNumColumns()) + '\n';
msg += "Header: " + header.dumpStructure() + '\n';
msg += "Chunk: " + chunk.dumpStructure() + '\n';
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
}
state->push(std::move(chunk));
2019-02-07 18:51:53 +00:00
}
void push(std::exception_ptr exception)
{
if (version)
++(*version);
assumeConnected();
state->push(std::move(exception));
}
2019-04-29 18:43:50 +00:00
void pushData(Data data)
{
if (std::holds_alternative<std::exception_ptr>(data))
push(std::get<std::exception_ptr>(std::move(data)));
else
push(std::get<Chunk>(std::move(data)));
}
2019-02-07 18:51:53 +00:00
void finish()
{
2019-02-08 16:10:57 +00:00
if (version && !isFinished())
++(*version);
2019-02-07 18:51:53 +00:00
assumeConnected();
state->finish();
}
2019-02-07 18:51:53 +00:00
bool isNeeded() const
{
assumeConnected();
return state->isNeeded();
}
2019-02-07 18:51:53 +00:00
bool isFinished() const
{
assumeConnected();
2019-02-07 18:51:53 +00:00
return state->isSetFinished();
}
2019-02-07 18:51:53 +00:00
bool canPush() const { return isNeeded() && !hasData(); }
InputPort & getInputPort()
{
assumeConnected();
return *input_port;
}
const InputPort & getInputPort() const
{
assumeConnected();
return *input_port;
}
};
using InputPorts = std::list<InputPort>;
using OutputPorts = std::list<OutputPort>;
void connect(OutputPort & output, InputPort & input);
}