mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #7988 from ClickHouse/processors-4.4
Better updated ports check in processors
This commit is contained in:
commit
86ff01d3aa
@ -7,7 +7,6 @@
|
||||
#include <ext/scope_guard.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
|
||||
#include <boost/lockfree/queue.hpp>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Processors/ISource.h>
|
||||
#include <Common/setThreadName.h>
|
||||
@ -52,26 +51,25 @@ bool PipelineExecutor::addEdges(UInt64 node)
|
||||
|
||||
const IProcessor * cur = graph[node].processor;
|
||||
|
||||
auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges)
|
||||
auto add_edge = [&](auto & from_port, const IProcessor * to_proc, Edges & edges,
|
||||
bool is_backward, UInt64 input_port_number, UInt64 output_port_number,
|
||||
std::vector<void *> * update_list)
|
||||
{
|
||||
auto it = processors_map.find(to_proc);
|
||||
if (it == processors_map.end())
|
||||
throwUnknownProcessor(to_proc, cur, true);
|
||||
|
||||
UInt64 proc_num = it->second;
|
||||
Edge * edge_ptr = nullptr;
|
||||
|
||||
for (auto & edge : edges)
|
||||
if (edge.to == proc_num)
|
||||
edge_ptr = &edge;
|
||||
|
||||
if (!edge_ptr)
|
||||
{
|
||||
edge_ptr = &edges.emplace_back();
|
||||
edge_ptr->to = proc_num;
|
||||
if (edge.to == proc_num)
|
||||
throw Exception("Multiple edges are not allowed for the same processors.", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
from_port.setVersion(&edge_ptr->version);
|
||||
auto & edge = edges.emplace_back(proc_num, is_backward, input_port_number, output_port_number, update_list);
|
||||
|
||||
from_port.setUpdateInfo(&edge.update_info);
|
||||
};
|
||||
|
||||
bool was_edge_added = false;
|
||||
@ -83,10 +81,11 @@ bool PipelineExecutor::addEdges(UInt64 node)
|
||||
{
|
||||
was_edge_added = true;
|
||||
|
||||
for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it)
|
||||
for (auto it = std::next(inputs.begin(), from_input); it != inputs.end(); ++it, ++from_input)
|
||||
{
|
||||
const IProcessor * proc = &it->getOutputPort().getProcessor();
|
||||
add_edge(*it, proc, graph[node].backEdges);
|
||||
auto output_port_number = proc->getOutputPortNumber(&it->getOutputPort());
|
||||
add_edge(*it, proc, graph[node].backEdges, true, from_input, output_port_number, &graph[node].post_updated_input_ports);
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,10 +96,11 @@ bool PipelineExecutor::addEdges(UInt64 node)
|
||||
{
|
||||
was_edge_added = true;
|
||||
|
||||
for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it)
|
||||
for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output)
|
||||
{
|
||||
const IProcessor * proc = &it->getInputPort().getProcessor();
|
||||
add_edge(*it, proc, graph[node].directEdges);
|
||||
auto input_port_number = proc->getInputPortNumber(&it->getInputPort());
|
||||
add_edge(*it, proc, graph[node].directEdges, false, input_port_number, from_output, &graph[node].post_updated_output_ports);
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,6 +131,7 @@ void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
|
||||
if (graph[proc].directEdges.empty())
|
||||
{
|
||||
stack.push(proc);
|
||||
/// do not lock mutex, as this function is executedin single thread
|
||||
graph[proc].status = ExecStatus::Preparing;
|
||||
}
|
||||
}
|
||||
@ -195,9 +196,20 @@ void PipelineExecutor::expandPipeline(Stack & stack, UInt64 pid)
|
||||
UInt64 num_processors = processors.size();
|
||||
for (UInt64 node = 0; node < num_processors; ++node)
|
||||
{
|
||||
size_t num_direct_edges = graph[node].directEdges.size();
|
||||
size_t num_back_edges = graph[node].backEdges.size();
|
||||
|
||||
if (addEdges(node))
|
||||
{
|
||||
if (graph[node].status == ExecStatus::Idle || graph[node].status == ExecStatus::New)
|
||||
std::lock_guard guard(graph[node].status_mutex);
|
||||
|
||||
for (; num_back_edges < graph[node].backEdges.size(); ++num_back_edges)
|
||||
graph[node].updated_input_ports.emplace_back(num_back_edges);
|
||||
|
||||
for (; num_direct_edges < graph[node].directEdges.size(); ++num_direct_edges)
|
||||
graph[node].updated_output_ports.emplace_back(num_direct_edges);
|
||||
|
||||
if (graph[node].status == ExecStatus::Idle)
|
||||
{
|
||||
graph[node].status = ExecStatus::Preparing;
|
||||
stack.push(node);
|
||||
@ -212,34 +224,26 @@ bool PipelineExecutor::tryAddProcessorToStackIfUpdated(Edge & edge, Stack & stac
|
||||
|
||||
auto & node = graph[edge.to];
|
||||
|
||||
ExecStatus status = node.status.load();
|
||||
std::lock_guard guard(node.status_mutex);
|
||||
|
||||
/// Don't add processor if nothing was read from port.
|
||||
if (status != ExecStatus::New && edge.version == edge.prev_version)
|
||||
return false;
|
||||
ExecStatus status = node.status;
|
||||
|
||||
if (status == ExecStatus::Finished)
|
||||
return false;
|
||||
|
||||
/// Signal that node need to be prepared.
|
||||
node.need_to_be_prepared = true;
|
||||
edge.prev_version = edge.version;
|
||||
if (edge.backward)
|
||||
node.updated_output_ports.push_back(edge.output_port_number);
|
||||
else
|
||||
node.updated_input_ports.push_back(edge.input_port_number);
|
||||
|
||||
/// Try to get ownership for node.
|
||||
|
||||
/// Assume that current status is New or Idle. Otherwise, can't prepare node.
|
||||
if (status != ExecStatus::New)
|
||||
status = ExecStatus::Idle;
|
||||
|
||||
/// Statuses but New and Idle are not interesting because they own node.
|
||||
/// Prepare will be called in owning thread before changing status.
|
||||
while (!node.status.compare_exchange_weak(status, ExecStatus::Preparing))
|
||||
if (!(status == ExecStatus::New || status == ExecStatus::Idle) || !node.need_to_be_prepared)
|
||||
return false;
|
||||
|
||||
stack.push(edge.to);
|
||||
return true;
|
||||
if (status == ExecStatus::Idle)
|
||||
{
|
||||
node.status = ExecStatus::Preparing;
|
||||
stack.push(edge.to);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & parents, size_t thread_number, bool async)
|
||||
@ -247,105 +251,117 @@ bool PipelineExecutor::prepareProcessor(UInt64 pid, Stack & children, Stack & pa
|
||||
/// In this method we have ownership on node.
|
||||
auto & node = graph[pid];
|
||||
|
||||
bool need_traverse = false;
|
||||
bool need_expand_pipeline = false;
|
||||
|
||||
std::vector<Edge *> updated_back_edges;
|
||||
std::vector<Edge *> updated_direct_edges;
|
||||
|
||||
{
|
||||
/// Stopwatch watch;
|
||||
|
||||
/// Disable flag before prepare call. Otherwise, we can skip prepare request.
|
||||
/// Prepare can be called more times than needed, but it's ok.
|
||||
node.need_to_be_prepared = false;
|
||||
std::lock_guard guard(node.status_mutex);
|
||||
|
||||
auto status = node.processor->prepare();
|
||||
auto status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
|
||||
node.updated_input_ports.clear();
|
||||
node.updated_output_ports.clear();
|
||||
|
||||
/// node.execution_state->preparation_time_ns += watch.elapsed();
|
||||
node.last_processor_status = status;
|
||||
}
|
||||
|
||||
auto add_neighbours_to_prepare_queue = [&] ()
|
||||
{
|
||||
for (auto & edge : node.backEdges)
|
||||
tryAddProcessorToStackIfUpdated(edge, parents);
|
||||
|
||||
for (auto & edge : node.directEdges)
|
||||
tryAddProcessorToStackIfUpdated(edge, children);
|
||||
};
|
||||
|
||||
auto try_release_ownership = [&] ()
|
||||
{
|
||||
/// This function can be called after expand pipeline, where node from outer scope is not longer valid.
|
||||
auto & node_ = graph[pid];
|
||||
ExecStatus expected = ExecStatus::Idle;
|
||||
node_.status = ExecStatus::Idle;
|
||||
|
||||
if (node_.need_to_be_prepared)
|
||||
switch (node.last_processor_status)
|
||||
{
|
||||
while (!node_.status.compare_exchange_weak(expected, ExecStatus::Preparing))
|
||||
if (!(expected == ExecStatus::Idle) || !node_.need_to_be_prepared)
|
||||
return;
|
||||
|
||||
children.push(pid);
|
||||
}
|
||||
};
|
||||
|
||||
switch (node.last_processor_status)
|
||||
{
|
||||
case IProcessor::Status::NeedData:
|
||||
case IProcessor::Status::PortFull:
|
||||
{
|
||||
add_neighbours_to_prepare_queue();
|
||||
try_release_ownership();
|
||||
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::Finished:
|
||||
{
|
||||
add_neighbours_to_prepare_queue();
|
||||
node.status = ExecStatus::Finished;
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::Ready:
|
||||
{
|
||||
node.status = ExecStatus::Executing;
|
||||
return true;
|
||||
}
|
||||
case IProcessor::Status::Async:
|
||||
{
|
||||
throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
|
||||
case IProcessor::Status::NeedData:
|
||||
case IProcessor::Status::PortFull:
|
||||
{
|
||||
need_traverse = true;
|
||||
node.status = ExecStatus::Idle;
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::Finished:
|
||||
{
|
||||
need_traverse = true;
|
||||
node.status = ExecStatus::Finished;
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::Ready:
|
||||
{
|
||||
node.status = ExecStatus::Executing;
|
||||
return true;
|
||||
}
|
||||
case IProcessor::Status::Async:
|
||||
{
|
||||
throw Exception("Async is temporary not supported.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
// node.status = ExecStatus::Executing;
|
||||
// addAsyncJob(pid);
|
||||
// break;
|
||||
}
|
||||
case IProcessor::Status::Wait:
|
||||
{
|
||||
if (!async)
|
||||
throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::ExpandPipeline:
|
||||
{
|
||||
executor_contexts[thread_number]->task_list.emplace_back(
|
||||
node.execution_state.get(),
|
||||
&parents
|
||||
);
|
||||
|
||||
ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back();
|
||||
ExpandPipelineTask * expected = nullptr;
|
||||
|
||||
while (!expand_pipeline_task.compare_exchange_strong(expected, desired))
|
||||
}
|
||||
case IProcessor::Status::Wait:
|
||||
{
|
||||
doExpandPipeline(expected, true);
|
||||
expected = nullptr;
|
||||
if (!async)
|
||||
throw Exception("Processor returned status Wait before Async.", ErrorCodes::LOGICAL_ERROR);
|
||||
break;
|
||||
}
|
||||
case IProcessor::Status::ExpandPipeline:
|
||||
{
|
||||
need_expand_pipeline = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (need_traverse)
|
||||
{
|
||||
for (auto & edge_id : node.post_updated_input_ports)
|
||||
{
|
||||
auto edge = static_cast<Edge *>(edge_id);
|
||||
updated_back_edges.emplace_back(edge);
|
||||
edge->update_info.trigger();
|
||||
}
|
||||
|
||||
doExpandPipeline(desired, true);
|
||||
for (auto & edge_id : node.post_updated_output_ports)
|
||||
{
|
||||
auto edge = static_cast<Edge *>(edge_id);
|
||||
updated_direct_edges.emplace_back(edge);
|
||||
edge->update_info.trigger();
|
||||
}
|
||||
|
||||
/// node is not longer valid after pipeline was expanded
|
||||
graph[pid].need_to_be_prepared = true;
|
||||
try_release_ownership();
|
||||
break;
|
||||
node.post_updated_input_ports.clear();
|
||||
node.post_updated_output_ports.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (need_traverse)
|
||||
{
|
||||
for (auto & edge : updated_back_edges)
|
||||
tryAddProcessorToStackIfUpdated(*edge, parents);
|
||||
|
||||
for (auto & edge : updated_direct_edges)
|
||||
tryAddProcessorToStackIfUpdated(*edge, children);
|
||||
}
|
||||
|
||||
if (need_expand_pipeline)
|
||||
{
|
||||
executor_contexts[thread_number]->task_list.emplace_back(
|
||||
node.execution_state.get(),
|
||||
&parents
|
||||
);
|
||||
|
||||
ExpandPipelineTask * desired = &executor_contexts[thread_number]->task_list.back();
|
||||
ExpandPipelineTask * expected = nullptr;
|
||||
|
||||
while (!expand_pipeline_task.compare_exchange_strong(expected, desired))
|
||||
{
|
||||
doExpandPipeline(expected, true);
|
||||
expected = nullptr;
|
||||
}
|
||||
|
||||
doExpandPipeline(desired, true);
|
||||
|
||||
/// Add itself back to be prepared again.
|
||||
children.push(pid);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -427,7 +443,7 @@ void PipelineExecutor::execute(size_t num_threads)
|
||||
|
||||
bool all_processors_finished = true;
|
||||
for (auto & node : graph)
|
||||
if (node.status != ExecStatus::Finished)
|
||||
if (node.status != ExecStatus::Finished) /// Single thread, do not hold mutex
|
||||
all_processors_finished = false;
|
||||
|
||||
if (!all_processors_finished)
|
||||
|
@ -43,12 +43,23 @@ private:
|
||||
|
||||
struct Edge
|
||||
{
|
||||
Edge(UInt64 to_, bool backward_,
|
||||
UInt64 input_port_number_, UInt64 output_port_number_, std::vector<void *> * update_list)
|
||||
: to(to_), backward(backward_)
|
||||
, input_port_number(input_port_number_), output_port_number(output_port_number_)
|
||||
{
|
||||
update_info.update_list = update_list;
|
||||
update_info.id = this;
|
||||
}
|
||||
|
||||
UInt64 to = std::numeric_limits<UInt64>::max();
|
||||
bool backward;
|
||||
UInt64 input_port_number;
|
||||
UInt64 output_port_number;
|
||||
|
||||
/// Edge version is increased when port's state is changed (e.g. when data is pushed). See Port.h for details.
|
||||
/// To compare version with prev_version we can decide if neighbour processor need to be prepared.
|
||||
UInt64 version = 0;
|
||||
UInt64 prev_version = 0;
|
||||
Port::UpdateInfo update_info;
|
||||
};
|
||||
|
||||
/// Use std::list because new ports can be added to processor during execution.
|
||||
@ -58,7 +69,6 @@ private:
|
||||
/// Can be owning or not. Owning means that executor who set this status can change node's data and nobody else can.
|
||||
enum class ExecStatus
|
||||
{
|
||||
New, /// prepare wasn't called yet. Initial state. Non-owning.
|
||||
Idle, /// prepare returned NeedData or PortFull. Non-owning.
|
||||
Preparing, /// some executor is preparing processor, or processor is in task_queue. Owning.
|
||||
Executing, /// prepare returned Ready and task is executing. Owning.
|
||||
@ -87,17 +97,22 @@ private:
|
||||
Edges directEdges;
|
||||
Edges backEdges;
|
||||
|
||||
std::atomic<ExecStatus> status;
|
||||
/// This flag can be set by any executor.
|
||||
/// When enabled, any executor can try to atomically set Preparing state to status.
|
||||
std::atomic_bool need_to_be_prepared;
|
||||
ExecStatus status;
|
||||
std::mutex status_mutex;
|
||||
|
||||
std::vector<void *> post_updated_input_ports;
|
||||
std::vector<void *> post_updated_output_ports;
|
||||
|
||||
/// Last state for profiling.
|
||||
IProcessor::Status last_processor_status = IProcessor::Status::NeedData;
|
||||
|
||||
std::unique_ptr<ExecutionState> execution_state;
|
||||
|
||||
IProcessor::PortNumbers updated_input_ports;
|
||||
IProcessor::PortNumbers updated_output_ports;
|
||||
|
||||
Node(IProcessor * processor_, UInt64 processor_id)
|
||||
: processor(processor_), status(ExecStatus::New), need_to_be_prepared(false)
|
||||
: processor(processor_), status(ExecStatus::Idle)
|
||||
{
|
||||
execution_state = std::make_unique<ExecutionState>();
|
||||
execution_state->processor = processor;
|
||||
@ -105,8 +120,8 @@ private:
|
||||
}
|
||||
|
||||
Node(Node && other) noexcept
|
||||
: processor(other.processor), status(other.status.load())
|
||||
, need_to_be_prepared(other.need_to_be_prepared.load()), execution_state(std::move(other.execution_state))
|
||||
: processor(other.processor), status(other.status)
|
||||
, execution_state(std::move(other.execution_state))
|
||||
{
|
||||
}
|
||||
};
|
||||
|
@ -171,7 +171,15 @@ public:
|
||||
* - method 'prepare' cannot be executed in parallel even for different objects,
|
||||
* if they are connected (including indirectly) to each other by their ports;
|
||||
*/
|
||||
virtual Status prepare() = 0;
|
||||
virtual Status prepare()
|
||||
{
|
||||
throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
using PortNumbers = std::vector<UInt64>;
|
||||
|
||||
/// Optimization for prepare in case we know ports were updated.
|
||||
virtual Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) { return prepare(); }
|
||||
|
||||
/** You may call this method if 'prepare' returned Ready.
|
||||
* This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
|
||||
@ -183,11 +191,6 @@ public:
|
||||
throw Exception("Method 'work' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
virtual void work(size_t /*thread_num*/)
|
||||
{
|
||||
work();
|
||||
}
|
||||
|
||||
/** You may call this method if 'prepare' returned Async.
|
||||
* This method cannot access any ports. It should use only data that was prepared by 'prepare' method.
|
||||
*
|
||||
@ -226,6 +229,34 @@ public:
|
||||
auto & getInputs() { return inputs; }
|
||||
auto & getOutputs() { return outputs; }
|
||||
|
||||
UInt64 getInputPortNumber(const InputPort * input_port) const
|
||||
{
|
||||
UInt64 number = 0;
|
||||
for (auto & port : inputs)
|
||||
{
|
||||
if (&port == input_port)
|
||||
return number;
|
||||
|
||||
++number;
|
||||
}
|
||||
|
||||
throw Exception("Can't find input port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
UInt64 getOutputPortNumber(const OutputPort * output_port) const
|
||||
{
|
||||
UInt64 number = 0;
|
||||
for (auto & port : outputs)
|
||||
{
|
||||
if (&port == output_port)
|
||||
return number;
|
||||
|
||||
++number;
|
||||
}
|
||||
|
||||
throw Exception("Can't find output port for " + getName() + " processor", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
const auto & getInputs() const { return inputs; }
|
||||
const auto & getOutputs() const { return outputs; }
|
||||
|
||||
|
@ -28,6 +28,25 @@ class Port
|
||||
friend void connect(OutputPort &, InputPort &);
|
||||
friend class IProcessor;
|
||||
|
||||
public:
|
||||
struct UpdateInfo
|
||||
{
|
||||
std::vector<void *> * update_list = nullptr;
|
||||
void * id = nullptr;
|
||||
UInt64 version = 0;
|
||||
UInt64 prev_version = 0;
|
||||
|
||||
void inline ALWAYS_INLINE update()
|
||||
{
|
||||
if (version == prev_version && update_list)
|
||||
update_list->push_back(id);
|
||||
|
||||
++version;
|
||||
}
|
||||
|
||||
void inline ALWAYS_INLINE trigger() { prev_version = version; }
|
||||
};
|
||||
|
||||
protected:
|
||||
/// Shared state of two connected ports.
|
||||
class State
|
||||
@ -182,12 +201,17 @@ protected:
|
||||
|
||||
IProcessor * processor = nullptr;
|
||||
|
||||
/// If update_info was set, will call update() for it in case port's state have changed.
|
||||
UpdateInfo * update_info = nullptr;
|
||||
|
||||
public:
|
||||
using Data = State::Data;
|
||||
|
||||
Port(Block header_) : header(std::move(header_)) {}
|
||||
Port(Block header_, IProcessor * processor_) : header(std::move(header_)), processor(processor_) {}
|
||||
|
||||
void setUpdateInfo(UpdateInfo * info) { update_info = info; }
|
||||
|
||||
const Block & getHeader() const { return header; }
|
||||
bool ALWAYS_INLINE isConnected() const { return state != nullptr; }
|
||||
|
||||
@ -216,6 +240,13 @@ public:
|
||||
throw Exception("Port does not belong to Processor", ErrorCodes::LOGICAL_ERROR);
|
||||
return *processor;
|
||||
}
|
||||
|
||||
protected:
|
||||
void inline ALWAYS_INLINE updateVersion()
|
||||
{
|
||||
if (likely(update_info))
|
||||
update_info->update();
|
||||
}
|
||||
};
|
||||
|
||||
/// Invariants:
|
||||
@ -230,20 +261,14 @@ class InputPort : public Port
|
||||
private:
|
||||
OutputPort * output_port = nullptr;
|
||||
|
||||
/// If version was set, it will be increased on each pull.
|
||||
UInt64 * version = nullptr;
|
||||
|
||||
mutable bool is_finished = false;
|
||||
|
||||
public:
|
||||
using Port::Port;
|
||||
|
||||
void setVersion(UInt64 * value) { version = value; }
|
||||
|
||||
Data ALWAYS_INLINE pullData()
|
||||
{
|
||||
if (version)
|
||||
++(*version);
|
||||
updateVersion();
|
||||
|
||||
assumeConnected();
|
||||
|
||||
@ -296,8 +321,8 @@ public:
|
||||
{
|
||||
assumeConnected();
|
||||
|
||||
if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0 && version)
|
||||
++(*version);
|
||||
if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0)
|
||||
updateVersion();
|
||||
}
|
||||
|
||||
void ALWAYS_INLINE setNotNeeded()
|
||||
@ -310,8 +335,8 @@ public:
|
||||
{
|
||||
assumeConnected();
|
||||
|
||||
if ((state->setFlags(State::IS_FINISHED, State::IS_FINISHED) & State::IS_FINISHED) == 0 && version)
|
||||
++(*version);
|
||||
if ((state->setFlags(State::IS_FINISHED, State::IS_FINISHED) & State::IS_FINISHED) == 0)
|
||||
updateVersion();
|
||||
|
||||
is_finished = true;
|
||||
}
|
||||
@ -353,14 +378,9 @@ class OutputPort : public Port
|
||||
private:
|
||||
InputPort * input_port = nullptr;
|
||||
|
||||
/// If version was set, it will be increased on each push.
|
||||
UInt64 * version = nullptr;
|
||||
|
||||
public:
|
||||
using Port::Port;
|
||||
|
||||
void setVersion(UInt64 * value) { version = value; }
|
||||
|
||||
void ALWAYS_INLINE push(Chunk chunk)
|
||||
{
|
||||
pushData({.chunk = std::move(chunk), .exception = {}});
|
||||
@ -385,8 +405,7 @@ public:
|
||||
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
if (version)
|
||||
++(*version);
|
||||
updateVersion();
|
||||
|
||||
assumeConnected();
|
||||
|
||||
@ -401,8 +420,8 @@ public:
|
||||
|
||||
auto flags = state->setFlags(State::IS_FINISHED, State::IS_FINISHED);
|
||||
|
||||
if (version && (flags & State::IS_FINISHED) == 0)
|
||||
++(*version);
|
||||
if ((flags & State::IS_FINISHED) == 0)
|
||||
updateVersion();
|
||||
}
|
||||
|
||||
bool ALWAYS_INLINE isNeeded() const
|
||||
|
@ -153,5 +153,109 @@ ResizeProcessor::Status ResizeProcessor::prepare()
|
||||
return get_status_if_no_inputs();
|
||||
}
|
||||
|
||||
IProcessor::Status ResizeProcessor::prepare(const PortNumbers & updated_inputs, const PortNumbers & updated_outputs)
|
||||
{
|
||||
if (!initialized)
|
||||
{
|
||||
initialized = true;
|
||||
|
||||
for (auto & input : inputs)
|
||||
{
|
||||
input.setNeeded();
|
||||
input_ports.push_back({.port = &input, .status = InputStatus::NotActive});
|
||||
}
|
||||
|
||||
for (auto & output : outputs)
|
||||
output_ports.push_back({.port = &output, .status = OutputStatus::NotActive});
|
||||
}
|
||||
|
||||
for (auto & output_number : updated_outputs)
|
||||
{
|
||||
auto & output = output_ports[output_number];
|
||||
if (output.port->isFinished())
|
||||
{
|
||||
if (output.status != OutputStatus::Finished)
|
||||
{
|
||||
++num_finished_outputs;
|
||||
output.status = OutputStatus::Finished;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (output.port->canPush())
|
||||
{
|
||||
if (output.status != OutputStatus::NeedData)
|
||||
{
|
||||
output.status = OutputStatus::NeedData;
|
||||
waiting_outputs.push(output_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (num_finished_outputs == outputs.size())
|
||||
{
|
||||
for (auto & input : inputs)
|
||||
input.close();
|
||||
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
for (auto & input_number : updated_inputs)
|
||||
{
|
||||
auto & input = input_ports[input_number];
|
||||
if (input.port->isFinished())
|
||||
{
|
||||
if (input.status != InputStatus::Finished)
|
||||
{
|
||||
input.status = InputStatus::Finished;
|
||||
++num_finished_inputs;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (input.port->hasData())
|
||||
{
|
||||
if (input.status != InputStatus::HasData)
|
||||
{
|
||||
input.status = InputStatus::HasData;
|
||||
inputs_with_data.push(input_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (!waiting_outputs.empty() && !inputs_with_data.empty())
|
||||
{
|
||||
auto & waiting_output = output_ports[waiting_outputs.front()];
|
||||
waiting_outputs.pop();
|
||||
|
||||
auto & input_with_data = input_ports[inputs_with_data.front()];
|
||||
inputs_with_data.pop();
|
||||
|
||||
waiting_output.port->pushData(input_with_data.port->pullData());
|
||||
input_with_data.status = InputStatus::NotActive;
|
||||
waiting_output.status = OutputStatus::NotActive;
|
||||
|
||||
if (input_with_data.port->isFinished())
|
||||
{
|
||||
input_with_data.status = InputStatus::Finished;
|
||||
++num_finished_inputs;
|
||||
}
|
||||
}
|
||||
|
||||
if (num_finished_inputs == inputs.size())
|
||||
{
|
||||
for (auto & output : outputs)
|
||||
output.finish();
|
||||
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
if (!waiting_outputs.empty())
|
||||
return Status::NeedData;
|
||||
|
||||
return Status::PortFull;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Processors/IProcessor.h>
|
||||
#include <queue>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -31,10 +32,46 @@ public:
|
||||
String getName() const override { return "Resize"; }
|
||||
|
||||
Status prepare() override;
|
||||
Status prepare(const PortNumbers &, const PortNumbers &) override;
|
||||
|
||||
private:
|
||||
InputPorts::iterator current_input;
|
||||
OutputPorts::iterator current_output;
|
||||
|
||||
size_t num_finished_inputs = 0;
|
||||
size_t num_finished_outputs = 0;
|
||||
std::queue<UInt64> waiting_outputs;
|
||||
std::queue<UInt64> inputs_with_data;
|
||||
bool initialized = false;
|
||||
|
||||
enum class OutputStatus
|
||||
{
|
||||
NotActive,
|
||||
NeedData,
|
||||
Finished,
|
||||
};
|
||||
|
||||
enum class InputStatus
|
||||
{
|
||||
NotActive,
|
||||
HasData,
|
||||
Finished,
|
||||
};
|
||||
|
||||
struct InputPortWithStatus
|
||||
{
|
||||
InputPort * port;
|
||||
InputStatus status;
|
||||
};
|
||||
|
||||
struct OutputPortWithStatus
|
||||
{
|
||||
OutputPort * port;
|
||||
OutputStatus status;
|
||||
};
|
||||
|
||||
std::vector<InputPortWithStatus> input_ports;
|
||||
std::vector<OutputPortWithStatus> output_ports;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -65,14 +65,23 @@ FilterTransform::FilterTransform(
|
||||
IProcessor::Status FilterTransform::prepare()
|
||||
{
|
||||
if (constant_filter_description.always_false
|
||||
|| expression->checkColumnIsAlwaysFalse(filter_column_name))
|
||||
/// Optimization for `WHERE column in (empty set)`.
|
||||
/// The result will not change after set was created, so we can skip this check.
|
||||
/// It is implemented in prepare() stop pipeline before reading from input port.
|
||||
|| (!are_prepared_sets_initialized && expression->checkColumnIsAlwaysFalse(filter_column_name)))
|
||||
{
|
||||
input.close();
|
||||
output.finish();
|
||||
return Status::Finished;
|
||||
}
|
||||
|
||||
return ISimpleTransform::prepare();
|
||||
auto status = ISimpleTransform::prepare();
|
||||
|
||||
/// Until prepared sets are initialized, output port will be unneeded, and prepare will return PortFull.
|
||||
if (status != IProcessor::Status::PortFull)
|
||||
are_prepared_sets_initialized = true;
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
|
@ -36,6 +36,8 @@ private:
|
||||
/// Header after expression, but before removing filter column.
|
||||
Block transformed_header;
|
||||
|
||||
bool are_prepared_sets_initialized = false;
|
||||
|
||||
void removeFilterIfNeed(Chunk & chunk);
|
||||
};
|
||||
|
||||
|
@ -59,8 +59,11 @@ protected:
|
||||
auto num_rows = chunk.getNumRows();
|
||||
columns = chunk.mutateColumns();
|
||||
if (limit_rows && num_rows > limit_rows)
|
||||
{
|
||||
num_rows = limit_rows;
|
||||
for (auto & column : columns)
|
||||
column = (*column->cut(0, limit_rows)->convertToFullColumnIfConst()).mutate();
|
||||
column = (*column->cut(0, num_rows)->convertToFullColumnIfConst()).mutate();
|
||||
}
|
||||
|
||||
total_merged_rows += num_rows;
|
||||
merged_rows = num_rows;
|
||||
|
Loading…
Reference in New Issue
Block a user