Add stream enumeration to Processors.

This commit is contained in:
Nikolai Kochetov 2019-09-02 17:49:05 +03:00
parent bf0c0ed4f1
commit 0016d99f5e
5 changed files with 44 additions and 44 deletions

View File

@ -474,32 +474,13 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
/// Just travers graph and prepare any processor.
while (!finished)
{
auto pushed = task_queue.num_pushed.load();
bool found = false;
while (pushed > task_queue.num_popped.load())
{
/// Fast branch.
if (task_queue.pop(state))
{
found = true;
break;
}
}
if (found)
break;
std::unique_lock lock(task_queue_mutex);
auto popped = task_queue.num_popped.load();
// if (!task_queue.empty())
// {
// state = task_queue.front();
// task_queue.pop();
// break;
// }
if (!task_queue.empty())
{
state = task_queue.pop(thread_num);
break;
}
++num_waiting_threads;
@ -513,7 +494,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
task_queue_condvar.wait(lock, [&]()
{
return finished || popped < task_queue.num_pushed.load();
return finished || !task_queue.empty();
});
--num_waiting_threads;

View File

@ -120,26 +120,33 @@ private:
struct TaskQueue
{
bool pop(ExecutionState *& state)
{
if (stack.pop(state))
{
num_popped.fetch_add(1);
return true;
}
return false;
}
void push(ExecutionState * state)
{
stack.push(state);
num_pushed.fetch_add(1);
map.emplace(state->processor->getStream(), state);
}
lfs::LFStack<ExecutionState> stack;
std::atomic<UInt64> num_pushed {0};
std::atomic<UInt64> num_popped {0};
ExecutionState * pop(size_t stream)
{
auto it = map.find(stream);
if (it == map.end())
it = map.find(IProcessor::NO_STREAM);
if (it == map.end())
it = map.begin();
if (it == map.end())
return nullptr;
auto res = it->second;
map.erase(it);
return res;
}
bool empty() const { return map.empty(); }
std::unordered_multimap<size_t, ExecutionState *> map;
};
/// Queue with pointers to tasks. Each thread will concurrently read from it until finished flag is set.

View File

@ -228,10 +228,17 @@ public:
void setDescription(const std::string & description_) { processor_description = description_; }
const std::string & getDescription() const { return processor_description; }
/// Helpers for pipeline executor.
void setStream(size_t value) { stream = value; }
size_t getStream() const { return stream; }
constexpr static size_t NO_STREAM = std::numeric_limits<size_t>::max();
private:
std::atomic<bool> is_cancelled{false};
std::string processor_description;
size_t stream = NO_STREAM;
};

View File

@ -75,6 +75,7 @@ void QueryPipeline::init(Processors sources)
totals.emplace_back(&source->getOutputs().back());
}
source->setStream(streams.size());
streams.emplace_back(&source->getOutputs().front());
processors.emplace_back(std::move(source));
}
@ -115,7 +116,7 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
Block header;
auto add_transform = [&](OutputPort *& stream, StreamType stream_type)
auto add_transform = [&](OutputPort *& stream, StreamType stream_type, size_t stream_num = IProcessor::NO_STREAM)
{
if (!stream)
return;
@ -148,14 +149,17 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
if (transform)
{
if (stream_type == StreamType::Main)
transform->setStream(stream_num);
connect(*stream, transform->getInputs().front());
stream = &transform->getOutputs().front();
processors.emplace_back(std::move(transform));
}
};
for (auto & stream : streams)
add_transform(stream, StreamType::Main);
for (size_t stream_num = 0; stream_num < streams.size(); ++stream_num)
add_transform(streams[stream_num], StreamType::Main, stream_num);
add_transform(delayed_stream_port, StreamType::Main);
add_transform(totals_having_port, StreamType::Totals);

View File

@ -507,6 +507,7 @@ Processors createMergingAggregatedMemoryEfficientPipe(
for (size_t i = 0; i < num_merging_processors; ++i, ++in, ++out)
{
auto transform = std::make_shared<MergingAggregatedBucketTransform>(params);
transform->setStream(i);
connect(*out, transform->getInputPort());
connect(transform->getOutputPort(), *in);
processors.emplace_back(std::move(transform));