Atomic Ports.

This commit is contained in:
Nikolai Kochetov 2019-06-19 21:30:02 +03:00
parent ab781f0988
commit 67b7080707
8 changed files with 500 additions and 396 deletions

View File

@ -96,13 +96,6 @@ Columns Chunk::detachColumns()
return std::move(columns);
}
void Chunk::clear()
{
num_rows = 0;
columns.clear();
chunk_info.reset();
}
void Chunk::erase(size_t position)
{
if (columns.empty())

View File

@ -45,6 +45,20 @@ public:
Chunk clone() const;
void swap(Chunk & other)
{
columns.swap(other.columns);
chunk_info.swap(other.chunk_info);
std::swap(num_rows, other.num_rows);
}
void clear()
{
num_rows = 0;
columns.clear();
chunk_info.reset();
}
const Columns & getColumns() const { return columns; }
void setColumns(Columns columns_, UInt64 num_rows_);
void setColumns(MutableColumns columns_, UInt64 num_rows_);
@ -63,7 +77,6 @@ public:
bool empty() const { return hasNoRows() && hasNoColumns(); }
operator bool() const { return !empty(); }
void clear();
void erase(size_t position);
UInt64 bytes() const;

View File

@ -117,14 +117,14 @@ void PipelineExecutor::buildGraph()
addEdges(node);
}
void PipelineExecutor::addChildlessProcessorsToQueue()
void PipelineExecutor::addChildlessProcessorsToQueue(Stack & stack)
{
UInt64 num_processors = processors.size();
for (UInt64 proc = 0; proc < num_processors; ++proc)
{
if (graph[proc].directEdges.empty())
{
prepare_stack.push(proc);
stack.push(proc);
graph[proc].status = ExecStatus::Preparing;
}
}
@ -142,7 +142,6 @@ void PipelineExecutor::processFinishedExecutionQueue()
/// ++num_waited_tasks;
++state->num_executed_jobs;
state->need_update_stream = true;
if (graph[finished_job].execution_state->exception)
std::rethrow_exception(graph[finished_job].execution_state->exception);
@ -178,24 +177,24 @@ static void executeJob(IProcessor * processor)
}
}
bool PipelineExecutor::tryAssignJob(ExecutionState * state)
{
auto current_stream = state->current_stream;
for (auto & executor_context : executor_contexts)
{
if (executor_context->current_stream == current_stream)
{
ExecutionState * expected = nullptr;
if (executor_context->next_task_to_execute.compare_exchange_strong(expected, state))
{
++num_tasks_to_wait;
return true;
}
}
}
return false;
}
//bool PipelineExecutor::tryAssignJob(ExecutionState * state)
//{
// auto current_stream = state->current_stream;
// for (auto & executor_context : executor_contexts)
// {
// if (executor_context->current_stream == current_stream)
// {
// ExecutionState * expected = nullptr;
// if (executor_context->next_task_to_execute.compare_exchange_strong(expected, state))
// {
// ++num_tasks_to_wait;
// return true;
// }
// }
// }
//
// return false;
//}
void PipelineExecutor::addJob(ExecutionState * execution_state)
{
@ -210,9 +209,9 @@ void PipelineExecutor::addJob(ExecutionState * execution_state)
try
{
/// Stopwatch watch;
Stopwatch watch;
executeJob(execution_state->processor);
/// execution_state->execution_time_ns += watch.elapsed();
execution_state->execution_time_ns += watch.elapsed();
++execution_state->num_executed_jobs;
}
@ -256,7 +255,7 @@ void PipelineExecutor::addAsyncJob(UInt64 pid)
++num_tasks_to_wait;
}
void PipelineExecutor::expandPipeline(UInt64 pid)
void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
{
auto & cur_node = graph[pid];
auto new_processors = cur_node.processor->expandPipeline();
@ -281,60 +280,88 @@ void PipelineExecutor::expandPipeline(UInt64 pid)
if (graph[node].status == ExecStatus::Idle || graph[node].status == ExecStatus::New)
{
graph[node].status = ExecStatus::Preparing;
prepare_stack.push(node);
stack.push(node);
}
}
}
}
bool PipelineExecutor::addProcessorToPrepareQueueIfUpdated(Edge & edge, bool update_stream_number, UInt64 stream_number)
bool PipelineExecutor::addProcessorToPrepareQueueIfUpdated(Edge & edge, Stack & stack)
{
/// In this method we have ownership on edge, but node can be concurrently accessed.
auto & node = graph[edge.to];
ExecStatus status = node.status.load();
/// Don't add processor if nothing was read from port.
if (node.status != ExecStatus::New && edge.version == edge.prev_version)
if (status != ExecStatus::New && edge.version == edge.prev_version)
return false;
if (status == ExecStatus::Finished)
return false;
/// Signal that node need to be prepared.
node.need_to_be_prepared = true;
edge.prev_version = edge.version;
if (node.status == ExecStatus::Idle || node.status == ExecStatus::New)
{
prepare_stack.push(edge.to);
node.status = ExecStatus::Preparing;
/// Try to get ownership for node.
if (update_stream_number)
{
node.execution_state->current_stream = stream_number;
node.execution_state->need_update_stream = false;
}
return true;
}
/// Assume that current status is New or Idle. Otherwise, can't prepare node.
if (status != ExecStatus::New)
status = ExecStatus::Idle;
/// Statuses but New and Idle are not interesting because they own node.
/// Prepare will be called in owning thread before changing status.
while (!node.status.compare_exchange_weak(status, ExecStatus::Preparing))
if (!(status == ExecStatus::New || status == ExecStatus::Idle) || !node.need_to_be_prepared)
return false;
stack.push(edge.to);
return true;
}
void PipelineExecutor::prepareProcessor(UInt64 pid, bool async)
void PipelineExecutor::prepareProcessor(UInt64 pid, Stack & stack, bool async)
{
/// In this method we have ownership on node.
auto & node = graph[pid];
{
Stopwatch watch;
auto status = node.processor->prepare();
node.execution_state->preparation_time_ns += watch.elapsed();
/// Stopwatch watch;
/// Disable flag before prepare call. Otherwise, we can skip prepare request.
/// Prepare can be called more times than needed, but it's ok.
node.need_to_be_prepared = false;
auto status = node.processor->prepare();
/// node.execution_state->preparation_time_ns += watch.elapsed();
node.last_processor_status = status;
}
auto add_neighbours_to_prepare_queue = [&, this]
auto add_neighbours_to_prepare_queue = [&, this] ()
{
auto stream_number = node.execution_state->current_stream;
for (auto & edge : node.backEdges)
addProcessorToPrepareQueueIfUpdated(edge, false, stream_number);
addProcessorToPrepareQueueIfUpdated(edge, stack);
for (auto & edge : node.directEdges)
addProcessorToPrepareQueueIfUpdated(edge, true, stream_number);
addProcessorToPrepareQueueIfUpdated(edge, stack);
};
auto try_release_ownership = [&] ()
{
ExecStatus expected = ExecStatus::Idle;
node.status = ExecStatus::Idle;
if (node.need_to_be_prepared)
{
while (!node.status.compare_exchange_weak(expected, ExecStatus::Preparing))
if (!(expected == ExecStatus::Idle) || !node.need_to_be_prepared)
return;
stack.push(pid);
}
};
switch (node.last_processor_status)
@ -342,13 +369,15 @@ void PipelineExecutor::prepareProcessor(UInt64 pid, bool async)
case IProcessor::Status::NeedData:
{
add_neighbours_to_prepare_queue();
node.status = ExecStatus::Idle;
try_release_ownership();
break;
}
case IProcessor::Status::PortFull:
{
add_neighbours_to_prepare_queue();
node.status = ExecStatus::Idle;
try_release_ownership();
break;
}
case IProcessor::Status::Finished:
@ -365,9 +394,11 @@ void PipelineExecutor::prepareProcessor(UInt64 pid, bool async)
}
case IProcessor::Status::Async:
{
node.status = ExecStatus::Executing;
addAsyncJob(pid);
break;
throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
// node.status = ExecStatus::Executing;
// addAsyncJob(pid);
// break;
}
case IProcessor::Status::Wait:
{
@ -377,61 +408,88 @@ void PipelineExecutor::prepareProcessor(UInt64 pid, bool async)
}
case IProcessor::Status::ExpandPipeline:
{
expandPipeline(pid);
/// Add node to queue again.
prepare_stack.push(pid);
/// node ref is not valid now.
graph[pid].status = ExecStatus::Preparing;
ExecutionState * desired = node.execution_state.get();
ExecutionState * expected = nullptr;
while (!node_to_expand.compare_exchange_strong(expected, desired))
{
expected = nullptr;
doExpandPipeline(stack);
}
doExpandPipeline(stack);
node.need_to_be_prepared = true;
try_release_ownership();
break;
}
}
}
void PipelineExecutor::assignJobs()
void PipelineExecutor::doExpandPipeline(Stack & stack)
{
for (auto * state : execution_states_queue)
{
if (!tryAssignJob(state))
{
while (!task_queue.push(state))
sleep(0);
std::unique_lock lock(mutex_to_expand_pipeline);
++num_waiting_threads_to_expand_pipeline;
task_condvar.notify_one();
++num_tasks_to_wait;
}
}
condvar_to_expand_pipeline.wait(lock, [&]()
{
return num_waiting_threads_to_expand_pipeline == num_preparing_threads || node_to_expand == nullptr;
});
execution_states_queue.clear();
--num_waiting_threads_to_expand_pipeline;
if (node_to_expand)
{
expandPipeline(stack, node_to_expand.load()->processors_id);
node_to_expand = nullptr;
lock.unlock();
condvar_to_expand_pipeline.notify_all();
}
}
void PipelineExecutor::processPrepareQueue()
{
while (!prepare_stack.empty())
{
UInt64 proc = prepare_stack.top();
prepare_stack.pop();
//void PipelineExecutor::assignJobs()
//{
// for (auto * state : execution_states_queue)
// {
// if (!tryAssignJob(state))
// {
// while (!task_queue.push(state))
// sleep(0);
//
// task_condvar.notify_one();
// ++num_tasks_to_wait;
// }
// }
//
// execution_states_queue.clear();
//}
prepareProcessor(proc, false);
}
assignJobs();
}
void PipelineExecutor::processAsyncQueue()
{
UInt64 num_processors = processors.size();
for (UInt64 node = 0; node < num_processors; ++node)
if (graph[node].status == ExecStatus::Async)
prepareProcessor(node, true);
assignJobs();
}
//void PipelineExecutor::processPrepareQueue()
//{
// while (!prepare_stack.empty())
// {
// UInt64 proc = prepare_stack.top();
// prepare_stack.pop();
//
// prepareProcessor(proc, false);
// }
//
// assignJobs();
//}
//
//void PipelineExecutor::processAsyncQueue()
//{
// UInt64 num_processors = processors.size();
// for (UInt64 node = 0; node < num_processors; ++node)
// if (graph[node].status == ExecStatus::Async)
// prepareProcessor(node, true);
//
// assignJobs();
//}
void PipelineExecutor::execute(size_t num_threads)
{
addChildlessProcessorsToQueue();
try
{
/// Wait for all tasks to finish in case of exception.
@ -477,9 +535,28 @@ void PipelineExecutor::executeSingleThread(size_t num_threads)
ExecutionState * state = nullptr;
auto finish_execution = [&]()
{
finished = true;
finish_condvar.notify_one();
main_executor_condvar.notify_all();
};
auto prepare_processor = [&](UInt64 pid, Stack & stack)
{
try
{
prepareProcessor(pid, stack, false);
}
catch (...)
{
graph[pid].execution_state->exception = std::current_exception();
finish_execution();
}
};
while (!finished)
{
++num_waiting_threads;
/// First, find any processor to execute.
/// Just travers graph and prepare any processor.
@ -490,7 +567,6 @@ void PipelineExecutor::executeSingleThread(size_t num_threads)
{
if (task_queue.pop(state))
{
--num_waiting_threads;
++num_waited_tasks;
break;
}
@ -501,87 +577,21 @@ void PipelineExecutor::executeSingleThread(size_t num_threads)
if (state)
break;
if (num_waiting_threads == num_threads)
{
finished = true;
finish_condvar.notify_one();
}
std::unique_lock lock(main_executor_mutex);
// if (!state)
// {
// Stopwatch processing_time_watch;
//
// {
// std::unique_lock lock(main_executor_mutex);
//
// while (!state)
// {
// if (finished)
// break;
//
// while (!prepare_stack.empty())
// {
// UInt64 proc = prepare_stack.top();
// prepare_stack.pop();
//
// prepareProcessor(proc, false);
//
// if (graph[proc].status == ExecStatus::Executing)
// {
// auto cur_state = graph[proc].execution_state.get();
//
// if (!state)
// {
// ++num_tasks_to_wait;
// while (!task_queue.push(cur_state));
// }
// else
// state = cur_state;
// }
// }
//
// if (!state)
// {
// while (num_waited_tasks < num_tasks_to_wait)
// {
// if (task_queue.pop(state))
// {
// ++num_waited_tasks;
// break;
// }
// }
// }
//
// if (num_waited_tasks < num_tasks_to_wait)
// main_executor_condvar.notify_all();
//
// if (state)
// break;
//
// if (num_waiting_threads.fetch_add(1) + 1 == num_threads && num_waited_tasks == num_tasks_to_wait)
// {
// finished = true;
// main_executor_condvar.notify_all();
// finish_condvar.notify_one();
// break;
// }
//
// main_executor_condvar.wait(lock, [&]() { return finished || !prepare_stack.empty() || num_waited_tasks < num_tasks_to_wait; });
//
// num_waiting_threads.fetch_sub(1);
// }
// }
//
// processing_time_ns += processing_time_watch.elapsed();
// }
++num_waiting_threads;
if (num_waiting_threads == num_threads)
finish_execution();
main_executor_condvar.wait(lock, [&]() { return finished || num_waited_tasks < num_tasks_to_wait; });
--num_waiting_threads;
}
if (finished)
break;
/// In case if somebody is sleeping and prepare_queue is not empty.
/// main_executor_condvar.notify_one();
while (state)
{
if (finished)
@ -596,7 +606,7 @@ void PipelineExecutor::executeSingleThread(size_t num_threads)
}
if (state->exception)
finished = true;
finish_execution();
if (finished)
break;
@ -605,25 +615,30 @@ void PipelineExecutor::executeSingleThread(size_t num_threads)
/// Try to execute neighbour processor.
{
bool expected = false;
while (!main_executor_flag.compare_exchange_strong(expected, true))
expected = false;
/// std::unique_lock lock(main_executor_mutex);
prepareProcessor(state->processors_id, false);
Stack stack;
++num_preparing_threads;
if (node_to_expand)
doExpandPipeline(stack);
prepare_processor(state->processors_id, stack);
/// Execute again if can.
if (graph[state->processors_id].status != ExecStatus::Executing)
state = nullptr;
/// Process all neighbours. Children will be on the top of stack, then parents.
while (!prepare_stack.empty())
while (!stack.empty() && !finished)
{
auto current_processor = prepare_stack.top();
prepare_stack.pop();
while (!stack.empty() && !finished)
{
auto current_processor = stack.top();
stack.pop();
prepareProcessor(current_processor, false);
prepare_processor(current_processor, stack);
if (graph[current_processor].status == ExecStatus::Executing)
{
@ -632,6 +647,7 @@ void PipelineExecutor::executeSingleThread(size_t num_threads)
if (state)
{
++num_tasks_to_wait;
main_executor_condvar.notify_one();
while (!task_queue.push(cur_state));
}
else
@ -639,7 +655,10 @@ void PipelineExecutor::executeSingleThread(size_t num_threads)
}
}
main_executor_flag = false;
if (node_to_expand)
doExpandPipeline(stack);
}
--num_preparing_threads;
}
/// Let another thread to continue.
@ -666,12 +685,16 @@ void PipelineExecutor::executeImpl(size_t num_threads)
task_queue.reserve_unsafe(8192);
finished_execution_queue.reserve_unsafe(num_threads);
while (!prepare_stack.empty())
{
UInt64 proc = prepare_stack.top();
prepare_stack.pop();
Stack stack;
prepareProcessor(proc, false);
addChildlessProcessorsToQueue(stack);
while (!stack.empty())
{
UInt64 proc = stack.top();
stack.pop();
prepareProcessor(proc, stack, false);
if (graph[proc].status == ExecStatus::Executing)
{
@ -681,18 +704,22 @@ void PipelineExecutor::executeImpl(size_t num_threads)
}
}
/// background_executor_flag = false;
num_preparing_threads = 0;
node_to_expand = nullptr;
threads.reserve(num_threads);
executor_contexts.reserve(num_threads);
/// executor_contexts.reserve(num_threads);
auto thread_group = CurrentThread::getGroup();
for (size_t i = 0; i < num_threads; ++i)
{
executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
auto * executor_context = executor_contexts.back().get();
executor_context->executor_number = i;
executor_context->next_task_to_execute = nullptr;
// executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
// auto * executor_context = executor_contexts.back().get();
//
// executor_context->executor_number = i;
// executor_context->next_task_to_execute = nullptr;
threads.emplace_back([this, thread_group, num_threads]
{

View File

@ -48,9 +48,6 @@ private:
size_t num_executed_jobs = 0;
UInt64 execution_time_ns = 0;
UInt64 preparation_time_ns = 0;
UInt64 current_stream = 0;
bool need_update_stream = true;
};
struct Node
@ -59,16 +56,24 @@ private:
Edges directEdges;
Edges backEdges;
ExecStatus status = ExecStatus::New;
std::atomic<ExecStatus> status;
std::atomic_bool need_to_be_prepared;
IProcessor::Status last_processor_status = IProcessor::Status::NeedData;
std::unique_ptr<ExecutionState> execution_state;
Node(IProcessor * processor_, UInt64 processor_id) : processor(processor_)
Node(IProcessor * processor_, UInt64 processor_id)
: processor(processor_), status(ExecStatus::New), need_to_be_prepared(false)
{
execution_state = std::make_unique<ExecutionState>();
execution_state->processor = processor;
execution_state->processors_id = processor_id;
}
Node(Node && other) noexcept
: processor(other.processor), status(ExecStatus::New)
, need_to_be_prepared(false), execution_state(std::move(other.execution_state))
{
}
};
using Nodes = std::vector<Node>;
@ -101,25 +106,32 @@ private:
Poco::Logger * log = &Poco::Logger::get("PipelineExecutor");
struct ExecutorContext
{
size_t executor_number;
std::atomic<ExecutionState *> next_task_to_execute;
std::atomic<UInt64> current_stream;
};
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
UInt64 next_stream = 0;
std::vector<ExecutionState *> execution_states_queue;
// struct ExecutorContext
// {
// size_t executor_number;
// std::atomic<ExecutionState *> next_task_to_execute;
// std::atomic<UInt64> current_stream;
// };
//
// std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
// UInt64 next_stream = 0;
//
// std::vector<ExecutionState *> execution_states_queue;
std::mutex main_executor_mutex;
std::atomic_bool main_executor_flag;
/// std::atomic_bool background_executor_flag;
std::condition_variable main_executor_condvar;
std::atomic<size_t> num_waiting_threads;
std::condition_variable finish_condvar;
std::atomic<size_t> num_preparing_threads;
std::atomic<ExecutionState *> node_to_expand;
std::mutex mutex_to_expand_pipeline;
std::condition_variable condvar_to_expand_pipeline;
size_t num_waiting_threads_to_expand_pipeline = 0;
public:
explicit PipelineExecutor(Processors processors);
void execute(size_t num_threads);
@ -137,22 +149,23 @@ private:
bool addEdges(UInt64 node);
void buildGraph();
void expandPipeline(UInt64 pid);
void expandPipeline(Stack & stack, UInt64 pid);
void doExpandPipeline(Stack & stack);
/// Pipeline execution related methods.
void addChildlessProcessorsToQueue();
void addChildlessProcessorsToQueue(Stack & stack);
void processFinishedExecutionQueue();
void processFinishedExecutionQueueSafe();
bool addProcessorToPrepareQueueIfUpdated(Edge & edge, bool update_stream_number, UInt64 stream_number);
void processPrepareQueue();
void processAsyncQueue();
bool addProcessorToPrepareQueueIfUpdated(Edge & edge, Stack & stack);
// void processPrepareQueue();
// void processAsyncQueue();
void addJob(ExecutionState * execution_state);
void addAsyncJob(UInt64 pid);
bool tryAssignJob(ExecutionState * state);
void assignJobs();
// bool tryAssignJob(ExecutionState * state);
// void assignJobs();
void prepareProcessor(size_t pid, bool async);
void prepareProcessor(size_t pid, Stack & stack, bool async);
void executeImpl(size_t num_threads);
void executeSingleThread(size_t num_threads);

View File

@ -219,10 +219,10 @@ public:
/// Debug output.
void dump() const;
std::string description;
std::string processor_description;
void setDescription(const std::string & description_) { description = description_; }
const std::string & getDescription() const { return description; }
void setDescription(const std::string & description_) { processor_description = description_; }
const std::string & getDescription() const { return processor_description; }
};

View File

@ -60,7 +60,7 @@ ISimpleTransform::Status ISimpleTransform::prepare()
current_data = input.pullData();
has_input = true;
if (current_data.second)
if (current_data.exception)
{
/// Skip transform in case of exception.
has_input = false;
@ -80,16 +80,16 @@ ISimpleTransform::Status ISimpleTransform::prepare()
void ISimpleTransform::work()
{
if (current_data.second)
if (current_data.exception)
return;
try
{
transform(current_data.first);
transform(current_data.chunk);
}
catch (DB::Exception &)
{
current_data.second = std::current_exception();
current_data.exception = std::current_exception();
transformed = true;
has_input = false;
return;
@ -97,12 +97,12 @@ void ISimpleTransform::work()
has_input = false;
if (!skip_empty_chunks || current_data.first)
if (!skip_empty_chunks || current_data.chunk)
transformed = true;
if (transformed && !current_data.first)
if (transformed && !current_data.chunk)
/// Support invariant that chunks must have the same number of columns as header.
current_data.first = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0);
current_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0);
}
}

View File

@ -45,8 +45,8 @@ void ISource::work()
{
try
{
current_chunk.first = generate();
if (!current_chunk.first)
current_chunk.chunk = generate();
if (!current_chunk.chunk)
finished = true;
else
has_input = true;

View File

@ -3,6 +3,7 @@
#include <memory>
#include <vector>
#include <variant>
#include <cstdint>
#include <Core/Block.h>
#include <Processors/Chunk.h>
@ -26,126 +27,153 @@ protected:
class State
{
public:
using Data = std::pair<Chunk, std::exception_ptr>;
State() = default;
void ALWAYS_INLINE pushData(Data data_)
struct Data
{
if (finished)
throw Exception("Cannot push block to finished port.", ErrorCodes::LOGICAL_ERROR);
if (!needed)
throw Exception("Cannot push block to port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (has_data)
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
data = std::move(data_);
has_data = true;
}
void ALWAYS_INLINE push(Chunk chunk)
{
pushData({std::move(chunk), {}});
}
void ALWAYS_INLINE push(std::exception_ptr exception)
{
pushData({Chunk(), std::move(exception)});
}
auto ALWAYS_INLINE pullData()
{
if (!needed)
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (!has_data)
throw Exception("Cannot pull block from port which has no data.", ErrorCodes::LOGICAL_ERROR);
has_data = false;
return std::move(data);
}
Chunk ALWAYS_INLINE pull()
{
auto cur_data = pullData();
if (cur_data.second)
std::rethrow_exception(std::move(cur_data.second));
return std::move(cur_data.first);
}
bool ALWAYS_INLINE hasData() const
{
// TODO: check for output port only.
// if (finished)
// throw Exception("Finished port can't has data.", ErrorCodes::LOGICAL_ERROR);
if (!needed)
throw Exception("Cannot check if not needed port has data.", ErrorCodes::LOGICAL_ERROR);
return has_data;
}
/// Only for output port.
/// If port still has data, it will be finished after pulling.
void ALWAYS_INLINE finish()
{
finished = true;
}
/// Only for input port. Removes data if has.
void ALWAYS_INLINE close()
{
finished = true;
has_data = false;
data.first.clear();
}
/// Only empty ports are finished.
bool ALWAYS_INLINE isFinished() const { return finished && !has_data; }
bool ALWAYS_INLINE isSetFinished() const { return finished; }
void ALWAYS_INLINE setNeeded()
{
if (isFinished())
throw Exception("Can't set port needed if it is finished.", ErrorCodes::LOGICAL_ERROR);
// if (has_data)
// throw Exception("Can't set port needed if it has data.", ErrorCodes::LOGICAL_ERROR);
needed = true;
}
void ALWAYS_INLINE setNotNeeded()
{
// if (finished)
// throw Exception("Can't set port not needed if it is finished.", ErrorCodes::LOGICAL_ERROR);
needed = false;
}
/// Only for output port.
bool ALWAYS_INLINE isNeeded() const { return needed && !finished; }
/// Note: std::variant can be used. But move constructor for it can't be inlined.
Chunk chunk;
std::exception_ptr exception;
};
private:
Data data;
/// Use special flag to check if block has data. This allows to send empty blocks between processors.
bool has_data = false;
static std::uintptr_t getUInt(Data * data) { return reinterpret_cast<std::uintptr_t>(data); }
static Data * getPtr(std::uintptr_t data) { return reinterpret_cast<Data *>(data); }
public:
/// Flags for Port state.
/// Will store them in least pointer bits.
/// Port was set finished or closed.
static constexpr std::uintptr_t IS_FINISHED = 1;
/// Block is not needed right now, but may be will be needed later.
/// This allows to pause calculations if we are not sure that we need more data.
bool needed = false;
/// Port was set finished or closed.
bool finished = false;
static constexpr std::uintptr_t IS_NEEDED = 2;
/// Check if port has data.
static constexpr std::uintptr_t HAS_DATA = 4;
static constexpr std::uintptr_t FLAGS_MASK = IS_FINISHED | IS_NEEDED | HAS_DATA;
static constexpr std::uintptr_t PTR_MASK = ~FLAGS_MASK;
/// Tiny smart ptr class for Data. Takes into account that ptr can have flags in least bits.
class DataPtr
{
public:
DataPtr() : data(new Data())
{
if (unlikely((getUInt(data) & FLAGS_MASK) != 0))
throw Exception("Not alignment memory for Port.", ErrorCodes::LOGICAL_ERROR);
}
/// Pointer can store flags in case of exception in swap.
~DataPtr() { delete getPtr(getUInt(data) & PTR_MASK); }
DataPtr(DataPtr const &) : data(new Data()) {}
DataPtr& operator=(DataPtr const &) = delete;
Data * operator->() const { return data; }
Data & operator*() const { return *data; }
Data * get() const { return data; }
explicit operator bool() const { return data; }
Data * release()
{
Data * result = nullptr;
std::swap(result, data);
return result;
}
uintptr_t ALWAYS_INLINE swap(std::atomic<Data *> & value, std::uintptr_t flags, std::uintptr_t mask)
{
Data * expected = nullptr;
Data * desired = getPtr(flags | getUInt(data));
while (!value.compare_exchange_weak(expected, desired))
desired = getPtr((getUInt(expected) & FLAGS_MASK & (~mask)) | flags | getUInt(data));
/// It's not very safe. In case of exception after exchange and before assigment we will get leak.
/// Don't know how to make it better.
data = getPtr(getUInt(expected) & PTR_MASK);
return getUInt(expected) & FLAGS_MASK;
}
private:
Data * data = nullptr;
};
/// Not finished, not needed, has not data.
State() : data(new Data())
{
if (unlikely((getUInt(data) & FLAGS_MASK) != 0))
throw Exception("Not alignment memory for Port.", ErrorCodes::LOGICAL_ERROR);
}
~State()
{
Data * desired = nullptr;
Data * expected = nullptr;
while (!data.compare_exchange_weak(expected, desired));
expected = getPtr(getUInt(expected) & PTR_MASK);
delete expected;
}
void ALWAYS_INLINE push(DataPtr & data_, std::uintptr_t & flags)
{
flags = data_.swap(data, HAS_DATA, HAS_DATA);
/// It's possible to push data into finished port. Will just ignore it.
/// if (flags & IS_FINISHED)
/// throw Exception("Cannot push block to finished port.", ErrorCodes::LOGICAL_ERROR);
/// It's possible to push data into port which is not needed now.
/// if ((flags & IS_NEEDED) == 0)
/// throw Exception("Cannot push block to port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (unlikely(flags & HAS_DATA))
throw Exception("Cannot push block to port which already has data.", ErrorCodes::LOGICAL_ERROR);
}
void ALWAYS_INLINE pull(DataPtr & data_, std::uintptr_t & flags)
{
flags = data_.swap(data, 0, HAS_DATA);
/// It's ok to check because this flag can be changed only by pulling thread.
if (unlikely((flags & IS_NEEDED) == 0))
throw Exception("Cannot pull block from port which is not needed.", ErrorCodes::LOGICAL_ERROR);
if (unlikely((flags & HAS_DATA) == 0))
throw Exception("Cannot pull block from port which has no data.", ErrorCodes::LOGICAL_ERROR);
}
std::uintptr_t ALWAYS_INLINE setFlags(std::uintptr_t flags, std::uintptr_t mask)
{
Data * expected = nullptr;
Data * desired = getPtr(flags);
while (!data.compare_exchange_weak(expected, desired))
desired = getPtr((getUInt(expected) & FLAGS_MASK & (~mask)) | flags | (getUInt(expected) & PTR_MASK));
return getUInt(expected) & FLAGS_MASK;
}
std::uintptr_t ALWAYS_INLINE getFlags() const
{
return getUInt(data.load()) & FLAGS_MASK;
}
private:
std::atomic<Data *> data;
};
Block header;
std::shared_ptr<State> state;
/// This object is only used for data exchange between port and shared state.
State::DataPtr data;
IProcessor * processor = nullptr;
public:
@ -159,14 +187,14 @@ public:
void ALWAYS_INLINE assumeConnected() const
{
if (!isConnected())
if (unlikely(!isConnected()))
throw Exception("Port is not connected", ErrorCodes::LOGICAL_ERROR);
}
bool ALWAYS_INLINE hasData() const
{
assumeConnected();
return state->hasData();
return state->getFlags() & State::HAS_DATA;
}
IProcessor & getProcessor()
@ -199,58 +227,87 @@ private:
/// If version was set, it will be increased on each pull.
UInt64 * version = nullptr;
mutable bool is_finished = false;
public:
using Port::Port;
void setVersion(UInt64 * value) { version = value; }
Chunk ALWAYS_INLINE pull()
{
if (version)
++(*version);
assumeConnected();
return state->pull();
}
Data ALWAYS_INLINE pullData()
{
if (version)
++(*version);
assumeConnected();
return state->pullData();
std::uintptr_t flags = 0;
state->pull(data, flags);
is_finished = flags & State::IS_FINISHED;
if (unlikely(!data->exception && data->chunk.getNumColumns() != header.columns()))
{
auto & chunk = data->chunk;
String msg = "Invalid number of columns in chunk pulled from OutputPort. Expected "
+ std::to_string(header.columns()) + ", found " + std::to_string(chunk.getNumColumns()) + '\n';
msg += "Header: " + header.dumpStructure() + '\n';
msg += "Chunk: " + chunk.dumpStructure() + '\n';
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
}
return std::move(*data);
}
Chunk ALWAYS_INLINE pull()
{
auto data_ = pullData();
if (data_.exception)
std::rethrow_exception(data_.exception);
return std::move(data_.chunk);
}
bool ALWAYS_INLINE isFinished() const
{
assumeConnected();
return state->isFinished();
if (is_finished)
return true;
auto flags = state->getFlags();
is_finished = (flags & State::IS_FINISHED) && ((flags & State::HAS_DATA) == 0);
return is_finished;
}
void ALWAYS_INLINE setNeeded()
{
assumeConnected();
if (!state->isNeeded() && version)
if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0 && version)
++(*version);
state->setNeeded();
}
void ALWAYS_INLINE setNotNeeded()
{
assumeConnected();
state->setNotNeeded();
state->setFlags(0, State::IS_NEEDED);
}
void ALWAYS_INLINE close()
{
if (version && !isFinished())
assumeConnected();
if ((state->setFlags(State::IS_FINISHED, State::IS_FINISHED) & State::IS_FINISHED) == 0 && version)
++(*version);
assumeConnected();
state->close();
is_finished = true;
}
OutputPort & getOutputPort()
@ -289,12 +346,7 @@ public:
void ALWAYS_INLINE push(Chunk chunk)
{
if (version)
++(*version);
assumeConnected();
if (chunk.getNumColumns() != header.columns())
if (unlikely(chunk.getNumColumns() != header.columns()))
{
String msg = "Invalid number of columns in chunk pushed to OutputPort. Expected "
+ std::to_string(header.columns()) + ", found " + std::to_string(chunk.getNumColumns()) + '\n';
@ -305,48 +357,54 @@ public:
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
}
state->push(std::move(chunk));
pushData({.chunk = std::move(chunk), .exception = {}});
}
void ALWAYS_INLINE push(std::exception_ptr exception)
{
pushData({.chunk = {}, .exception = std::move(exception)});
}
void ALWAYS_INLINE pushData(Data data_)
{
if (version)
++(*version);
assumeConnected();
state->push(std::move(exception));
}
void ALWAYS_INLINE pushData(Data data)
{
if (data.second)
push(std::move(data.second));
else
push(std::move(data.first));
std::uintptr_t flags = 0;
*data = std::move(data_);
state->push(data, flags);
}
void ALWAYS_INLINE finish()
{
if (version && !isFinished())
++(*version);
assumeConnected();
state->finish();
auto flags = state->setFlags(State::IS_FINISHED, State::IS_FINISHED);
if (version && (flags & State::IS_FINISHED) == 0)
++(*version);
}
bool ALWAYS_INLINE isNeeded() const
{
assumeConnected();
return state->isNeeded();
return state->getFlags() & State::IS_NEEDED;
}
bool ALWAYS_INLINE isFinished() const
{
assumeConnected();
return state->isSetFinished();
return state->getFlags() & State::IS_FINISHED;
}
bool ALWAYS_INLINE canPush() const { return isNeeded() && !hasData(); }
bool ALWAYS_INLINE canPush() const
{
assumeConnected();
auto flags = state->getFlags();
return (flags & State::IS_NEEDED) && ((flags & State::HAS_DATA) == 0);
}
InputPort & getInputPort()
{