Merge pull request #10932 from ClickHouse/pulling-executor

Add PullingPipelineExecutor.
This commit is contained in:
Nikolai Kochetov 2020-05-16 23:09:18 +03:00 committed by GitHub
commit caab379602
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 285 additions and 359 deletions

View File

@ -28,7 +28,7 @@
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Processors/Formats/LazyOutputFormat.h> #include <Processors/Executors/PullingPipelineExecutor.h>
#include "TCPHandler.h" #include "TCPHandler.h"
@ -553,68 +553,23 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
/// Send header-block, to allow client to prepare output format for data to send. /// Send header-block, to allow client to prepare output format for data to send.
{ {
auto & header = pipeline.getHeader(); const auto & header = pipeline.getHeader();
if (header) if (header)
sendData(header); sendData(header);
} }
auto lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
{ {
auto thread_group = CurrentThread::getGroup(); PullingPipelineExecutor executor(pipeline);
ThreadPool pool(1); CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
auto executor = pipeline.execute();
std::atomic_bool exception = false;
pool.scheduleOrThrowOnError([&]() Block block;
{ while (executor.pull(block, query_context->getSettingsRef().interactive_delay / 1000))
/// ThreadStatus thread_status;
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
setThreadName("QueryPipelineEx");
try
{
executor->execute(pipeline.getNumThreads());
}
catch (...)
{
exception = true;
throw;
}
});
/// Wait in case of exception happened outside of pool.
SCOPE_EXIT(
lazy_format->finish();
try
{
pool.wait();
}
catch (...)
{
/// If exception was thrown during pipeline execution, skip it while processing other exception.
tryLogCurrentException(log);
}
);
while (!lazy_format->isFinished() && !exception)
{ {
if (isQueryCancelled()) if (isQueryCancelled())
{ {
/// A packet was received requesting to stop execution of the request. /// A packet was received requesting to stop execution of the request.
executor->cancel(); executor.cancel();
break; break;
} }
@ -627,17 +582,13 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
sendLogs(); sendLogs();
if (auto block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000)) if (block)
{ {
if (!state.io.null_format) if (!state.io.null_format)
sendData(block); sendData(block);
} }
} }
/// Finish lazy_format before waiting. Otherwise some thread may write into it, and waiting will lock.
lazy_format->finish();
pool.wait();
/** If data has run out, we will send the profiling data and total values to /** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use * the last zero block to be able to use
* this information in the suffix output of stream. * this information in the suffix output of stream.
@ -647,9 +598,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
*/ */
if (!isQueryCancelled()) if (!isQueryCancelled())
{ {
sendTotals(lazy_format->getTotals()); sendTotals(executor.getTotalsBlock());
sendExtremes(lazy_format->getExtremes()); sendExtremes(executor.getExtremesBlock());
sendProfileInfo(lazy_format->getProfileInfo()); sendProfileInfo(executor.getProfileInfo());
sendProgress(); sendProgress();
sendLogs(); sendLogs();
} }

View File

@ -62,10 +62,15 @@ bool BlockStreamProfileInfo::hasAppliedLimit() const
void BlockStreamProfileInfo::update(Block & block) void BlockStreamProfileInfo::update(Block & block)
{
update(block.rows(), block.bytes());
}
void BlockStreamProfileInfo::update(size_t num_rows, size_t num_bytes)
{ {
++blocks; ++blocks;
rows += block.rows(); rows += num_rows;
bytes += block.bytes(); bytes += num_bytes;
} }

View File

@ -40,6 +40,7 @@ struct BlockStreamProfileInfo
bool hasAppliedLimit() const; bool hasAppliedLimit() const;
void update(Block & block); void update(Block & block);
void update(size_t num_rows, size_t num_bytes);
/// Binary serialization and deserialization of main fields. /// Binary serialization and deserialization of main fields.
/// Writes only main fields i.e. fields that required by internal transmission protocol. /// Writes only main fields i.e. fields that required by internal transmission protocol.

View File

@ -1,105 +0,0 @@
#include <Common/EventCounter.h>
#include <Common/ThreadPool.h>
#include <Processors/Executors/ParallelPipelineExecutor.h>
#include <Processors/Executors/traverse.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
//
//ParallelPipelineExecutor::ParallelPipelineExecutor(const std::vector<ProcessorPtr> & processors, ThreadPool & pool)
// : processors(processors), pool(pool)
//{
//}
//
//
//ParallelPipelineExecutor::Status ParallelPipelineExecutor::prepare()
//{
// current_processor = nullptr;
//
// bool has_someone_to_wait = false;
//
// for (auto & element : processors)
// {
// traverse(*element,
// [&] (IProcessor & processor)
// {
// {
// std::lock_guard lock(mutex);
// if (active_processors.count(&processor))
// {
// has_someone_to_wait = true;
// return Status::Wait;
// }
// }
//
// Status status = processor.prepare();
//
// if (status == Status::Wait)
// has_someone_to_wait = true;
//
// if (status == Status::Ready || status == Status::Async)
// {
// current_processor = &processor;
// current_status = status;
// }
//
// return status;
// });
//
// if (current_processor)
// break;
// }
//
// if (current_processor)
// return Status::Async;
//
// if (has_someone_to_wait)
// return Status::Wait;
//
// for (auto & element : processors)
// {
// if (element->prepare() == Status::NeedData)
// throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR);
// if (element->prepare() == Status::PortFull)
// throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR);
// }
//
// return Status::Finished;
//}
//
//
//void ParallelPipelineExecutor::schedule(EventCounter & watch)
//{
// if (!current_processor)
// throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR);
//
// if (current_status == Status::Async)
// {
// current_processor->schedule(watch);
// }
// else
// {
// {
// std::lock_guard lock(mutex);
// active_processors.insert(current_processor);
// }
//
// pool.scheduleOrThrowOnError([processor = current_processor, &watch, this]
// {
// processor->work();
// {
// std::lock_guard lock(mutex);
// active_processors.erase(processor);
// }
// watch.notify();
// });
// }
//}
}

View File

@ -1,41 +0,0 @@
#pragma once
#include <vector>
#include <set>
#include <mutex>
#include <Processors/IProcessor.h>
template <typename>
class ThreadPoolImpl;
class ThreadFromGlobalPool;
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPool>;
namespace DB
{
/** Wraps pipeline in a single processor.
* This processor has no inputs and outputs and just executes the pipeline,
* performing all synchronous work within a threadpool.
*/
//class ParallelPipelineExecutor : public IProcessor
//{
//private:
// Processors processors;
// ThreadPool & pool;
//
// std::set<IProcessor *> active_processors;
// std::mutex mutex;
//
// IProcessor * current_processor = nullptr;
// Status current_status;
//
//public:
// ParallelPipelineExecutor(const Processors & processors, ThreadPool & pool);
//
// String getName() const override { return "ParallelPipelineExecutor"; }
//
// Status prepare() override;
// void schedule(EventCounter & watch) override;
//};
}

View File

@ -0,0 +1,198 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Formats/LazyOutputFormat.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/QueryPipeline.h>
#include <Common/setThreadName.h>
#include <ext/scope_guard.h>
namespace DB
{
struct PullingPipelineExecutor::Data
{
PipelineExecutorPtr executor;
std::exception_ptr exception;
std::atomic_bool is_executed = false;
std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread;
~Data()
{
if (thread.joinable())
thread.join();
}
void rethrowExceptionIfHas()
{
if (has_exception)
{
has_exception = false;
std::rethrow_exception(std::move(exception));
}
}
};
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
}
PullingPipelineExecutor::~PullingPipelineExecutor()
{
try
{
cancel();
}
catch (...)
{
tryLogCurrentException("PullingPipelineExecutor");
}
}
static void threadFunction(PullingPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("QueryPipelineEx");
try
{
data.executor->execute(num_threads);
}
catch (...)
{
data.exception = std::current_exception();
data.has_exception = true;
}
}
bool PullingPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
{
if (!data)
{
data = std::make_unique<Data>();
data->executor = pipeline.execute();
auto func = [&, thread_group = CurrentThread::getGroup()]()
{
threadFunction(*data, thread_group, pipeline.getNumThreads());
};
data->thread = ThreadFromGlobalPool(std::move(func));
}
if (data->has_exception)
{
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
lazy_format->finish();
data->has_exception = false;
std::rethrow_exception(std::move(data->exception));
}
if (lazy_format->isFinished())
{
data->is_executed = true;
/// Wait thread ant rethrow exception if any.
cancel();
return false;
}
chunk = lazy_format->getChunk(milliseconds);
return true;
}
bool PullingPipelineExecutor::pull(Block & block, uint64_t milliseconds)
{
Chunk chunk;
if (!pull(chunk, milliseconds))
return false;
if (!chunk)
{
/// In case if timeout exceeded.
block.clear();
return true;
}
block = lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns());
if (auto chunk_info = chunk.getChunkInfo())
{
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(chunk_info.get()))
{
block.info.bucket_num = agg_info->bucket_num;
block.info.is_overflows = agg_info->is_overflows;
}
}
return true;
}
void PullingPipelineExecutor::cancel()
{
/// Cancel execution if it wasn't finished.
if (data && !data->is_executed && data->executor)
data->executor->cancel();
/// Finish lazy format. Otherwise thread.join() may hung.
if (!lazy_format->isFinished())
lazy_format->finish();
/// Join thread here to wait for possible exception.
if (data && data->thread.joinable())
data->thread.join();
/// Rethrow exception to not swallow it in destructor.
if (data)
data->rethrowExceptionIfHas();
}
Chunk PullingPipelineExecutor::getTotals()
{
return lazy_format->getTotals();
}
Chunk PullingPipelineExecutor::getExtremes()
{
return lazy_format->getExtremes();
}
Block PullingPipelineExecutor::getTotalsBlock()
{
auto totals = getTotals();
if (totals.empty())
return {};
const auto & header = lazy_format->getPort(IOutputFormat::PortKind::Totals).getHeader();
return header.cloneWithColumns(totals.detachColumns());
}
Block PullingPipelineExecutor::getExtremesBlock()
{
auto extremes = getExtremes();
if (extremes.empty())
return {};
const auto & header = lazy_format->getPort(IOutputFormat::PortKind::Extremes).getHeader();
return header.cloneWithColumns(extremes.detachColumns());
}
BlockStreamProfileInfo & PullingPipelineExecutor::getProfileInfo()
{
return lazy_format->getProfileInfo();
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <memory>
namespace DB
{
class QueryPipeline;
class Block;
class Chunk;
class LazyOutputFormat;
struct BlockStreamProfileInfo;
/// Pulling executor for QueryPipeline.
/// Typical usage is:
///
/// PullingPipelineExecutor executor(query_pipeline);
/// while (executor.pull(chunk))
/// ... process chunk ...
class PullingPipelineExecutor
{
public:
explicit PullingPipelineExecutor(QueryPipeline & pipeline_);
~PullingPipelineExecutor();
/// Methods return false if query is finished.
/// If milliseconds > 0, returns empty object and `true` after timeout exceeded.
/// You can use any pull method.
bool pull(Chunk & chunk, uint64_t milliseconds = 0);
bool pull(Block & block, uint64_t milliseconds = 0);
/// Stop execution. It is not necessary, but helps to stop execution before executor is destroyed.
void cancel();
/// Get totals and extremes. Returns empty chunk if doesn't have any.
Chunk getTotals();
Chunk getExtremes();
/// Get totals and extremes. Returns empty chunk if doesn't have any.
Block getTotalsBlock();
Block getExtremesBlock();
/// Get query profile info.
BlockStreamProfileInfo & getProfileInfo();
/// Internal executor data.
struct Data;
private:
QueryPipeline & pipeline;
std::shared_ptr<LazyOutputFormat> lazy_format;
std::unique_ptr<Data> data;
};
}

View File

@ -1,83 +0,0 @@
#include <Processors/Executors/SequentialPipelineExecutor.h>
#include <Processors/Executors/traverse.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
//SequentialPipelineExecutor::SequentialPipelineExecutor(const Processors & processors)
// : processors(processors)
//{
//}
//
//
//SequentialPipelineExecutor::Status SequentialPipelineExecutor::prepare()
//{
// current_processor = nullptr;
//
// bool has_someone_to_wait = false;
// Status found_status = Status::Finished;
//
// for (auto & element : processors)
// {
// traverse(*element,
// [&] (IProcessor & processor)
// {
// Status status = processor.prepare();
//
// if (status == Status::Wait)
// has_someone_to_wait = true;
//
// if (status == Status::Ready || status == Status::Async)
// {
// current_processor = &processor;
// found_status = status;
// }
//
// return status;
// });
//
// if (current_processor)
// break;
// }
//
// if (current_processor)
// return found_status;
// if (has_someone_to_wait)
// return Status::Wait;
//
// for (auto & element : processors)
// {
// if (element->prepare() == Status::NeedData)
// throw Exception("Pipeline stuck: " + element->getName() + " processor needs input data but no one is going to generate it", ErrorCodes::LOGICAL_ERROR);
// if (element->prepare() == Status::PortFull)
// throw Exception("Pipeline stuck: " + element->getName() + " processor has data in output port but no one is going to consume it", ErrorCodes::LOGICAL_ERROR);
// }
//
// return Status::Finished;
//}
//
//
//void SequentialPipelineExecutor::work()
//{
// if (!current_processor)
// throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR);
//
// current_processor->work();
//}
//
//
//void SequentialPipelineExecutor::schedule(EventCounter & watch)
//{
// if (!current_processor)
// throw Exception("Bad pipeline", ErrorCodes::LOGICAL_ERROR);
//
// current_processor->schedule(watch);
//}
}

View File

@ -1,31 +0,0 @@
#pragma once
#include <vector>
#include <Processors/IProcessor.h>
namespace DB
{
/** Wraps pipeline in a single processor.
* This processor has no inputs and outputs and just executes the pipeline,
* performing all synchronous work from the current thread.
*/
//class SequentialPipelineExecutor : public IProcessor
//{
//private:
// Processors processors;
// IProcessor * current_processor = nullptr;
//
//public:
// SequentialPipelineExecutor(const Processors & processors);
//
// String getName() const override { return "SequentialPipelineExecutor"; }
//
// Status prepare() override;
// void work() override;
// void schedule(EventCounter & watch) override;
//};
}

View File

@ -7,7 +7,7 @@ namespace DB
WriteBuffer LazyOutputFormat::out(nullptr, 0); WriteBuffer LazyOutputFormat::out(nullptr, 0);
Block LazyOutputFormat::getBlock(UInt64 milliseconds) Chunk LazyOutputFormat::getChunk(UInt64 milliseconds)
{ {
if (finished_processing) if (finished_processing)
{ {
@ -19,38 +19,20 @@ Block LazyOutputFormat::getBlock(UInt64 milliseconds)
if (!queue.tryPop(chunk, milliseconds)) if (!queue.tryPop(chunk, milliseconds))
return {}; return {};
if (!chunk) if (chunk)
return {}; info.update(chunk.getNumRows(), chunk.allocatedBytes());
auto block = getPort(PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns()); return chunk;
info.update(block);
if (auto chunk_info = chunk.getChunkInfo())
{
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(chunk_info.get()))
{
block.info.bucket_num = agg_info->bucket_num;
block.info.is_overflows = agg_info->is_overflows;
}
}
return block;
} }
Block LazyOutputFormat::getTotals() Chunk LazyOutputFormat::getTotals()
{ {
if (!totals) return std::move(totals);
return {};
return getPort(PortKind::Totals).getHeader().cloneWithColumns(totals.detachColumns());
} }
Block LazyOutputFormat::getExtremes() Chunk LazyOutputFormat::getExtremes()
{ {
if (!extremes) return std::move(extremes);
return {};
return getPort(PortKind::Extremes).getHeader().cloneWithColumns(extremes.detachColumns());
} }
void LazyOutputFormat::setRowsBeforeLimit(size_t rows_before_limit) void LazyOutputFormat::setRowsBeforeLimit(size_t rows_before_limit)

View File

@ -8,8 +8,8 @@ namespace DB
{ {
/// LazyOutputFormat is used to retrieve ready data from executing pipeline. /// LazyOutputFormat is used to retrieve ready data from executing pipeline.
/// You can periodically call `getBlock` from separate thread. /// You can periodically call `getChunk` from separate thread.
/// Used in TCPHandler. /// Used in PullingPipelineExecutor.
class LazyOutputFormat : public IOutputFormat class LazyOutputFormat : public IOutputFormat
{ {
@ -19,9 +19,9 @@ public:
String getName() const override { return "LazyOutputFormat"; } String getName() const override { return "LazyOutputFormat"; }
Block getBlock(UInt64 milliseconds = 0); Chunk getChunk(UInt64 milliseconds = 0);
Block getTotals(); Chunk getTotals();
Block getExtremes(); Chunk getExtremes();
bool isFinished() { return finished_processing && queue.size() == 0; } bool isFinished() { return finished_processing && queue.size() == 0; }

View File

@ -9,8 +9,6 @@
#include <Processors/ForkProcessor.h> #include <Processors/ForkProcessor.h>
#include <Processors/LimitTransform.h> #include <Processors/LimitTransform.h>
#include <Processors/QueueBuffer.h> #include <Processors/QueueBuffer.h>
#include <Processors/Executors/SequentialPipelineExecutor.h>
#include <Processors/Executors/ParallelPipelineExecutor.h>
#include <Processors/printPipeline.h> #include <Processors/printPipeline.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>

View File

@ -9,8 +9,6 @@
#include <Processors/ForkProcessor.h> #include <Processors/ForkProcessor.h>
#include <Processors/LimitTransform.h> #include <Processors/LimitTransform.h>
#include <Processors/QueueBuffer.h> #include <Processors/QueueBuffer.h>
#include <Processors/Executors/SequentialPipelineExecutor.h>
#include <Processors/Executors/ParallelPipelineExecutor.h>
#include <Processors/printPipeline.h> #include <Processors/printPipeline.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>

View File

@ -10,9 +10,8 @@ SRCS(
Chunk.cpp Chunk.cpp
ConcatProcessor.cpp ConcatProcessor.cpp
DelayedPortsProcessor.cpp DelayedPortsProcessor.cpp
Executors/ParallelPipelineExecutor.cpp
Executors/PipelineExecutor.cpp Executors/PipelineExecutor.cpp
Executors/SequentialPipelineExecutor.cpp Executors/PullingPipelineExecutor.cpp
Executors/TreeExecutorBlockInputStream.cpp Executors/TreeExecutorBlockInputStream.cpp
ForkProcessor.cpp ForkProcessor.cpp
Formats/IInputFormat.cpp Formats/IInputFormat.cpp