Merge pull request #11004 from ClickHouse/remove-experimental-use-processors-flag-2

Remove allow_processors flag from executeQuery()
This commit is contained in:
Nikolai Kochetov 2020-05-22 10:47:15 +03:00 committed by GitHub
commit f7456f8320
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 767 additions and 211 deletions

View File

@ -28,7 +28,7 @@
#include <Compression/CompressionFactory.h>
#include <common/logger_useful.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include "TCPHandler.h"
@ -566,7 +566,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
}
{
PullingPipelineExecutor executor(pipeline);
PullingAsyncPipelineExecutor executor(pipeline);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
Block block;

View File

@ -1,9 +1,31 @@
#include <DataStreams/BlockIO.h>
#include <Interpreters/ProcessList.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
BlockInputStreamPtr BlockIO::getInputStream()
{
if (out)
throw Exception("Cannot get input stream from BlockIO because output stream is not empty",
ErrorCodes::LOGICAL_ERROR);
if (in)
return in;
if (pipeline.initialized())
return std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
throw Exception("Cannot get input stream from BlockIO because query pipeline was not initialized",
ErrorCodes::LOGICAL_ERROR);
}
void BlockIO::reset()
{
/** process_list_entry should be destroyed after in, after out and after pipeline,

View File

@ -50,6 +50,9 @@ struct BlockIO
exception_callback();
}
/// Returns in or converts pipeline to stream. Throws if out is not empty.
BlockInputStreamPtr getInputStream();
private:
void reset();
};

View File

@ -131,10 +131,10 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
*/
if (is_local)
{
BlockIO res = executeQuery(load_all_query, context, true, QueryProcessingStage::Complete, false, false);
auto stream = executeQuery(load_all_query, context, true).getInputStream();
/// FIXME res.in may implicitly use some objects owned be res, but them will be destructed after return
res.in = std::make_shared<ConvertingBlockInputStream>(res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
stream = std::make_shared<ConvertingBlockInputStream>(stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return stream;
}
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
}
@ -144,9 +144,9 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
std::string load_update_query = getUpdateFieldAndDate();
if (is_local)
{
auto res = executeQuery(load_update_query, context, true, QueryProcessingStage::Complete, false, false);
res.in = std::make_shared<ConvertingBlockInputStream>(res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
auto stream = executeQuery(load_update_query, context, true).getInputStream();
stream = std::make_shared<ConvertingBlockInputStream>(stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return stream;
}
return std::make_shared<RemoteBlockInputStream>(pool, load_update_query, sample_block, context);
}
@ -191,10 +191,10 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
{
auto res = executeQuery(query, context, true, QueryProcessingStage::Complete, false, false);
res.in = std::make_shared<ConvertingBlockInputStream>(
res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
auto res = executeQuery(query, context, true).getInputStream();
res = std::make_shared<ConvertingBlockInputStream>(
res, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res;
}
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
@ -206,8 +206,7 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
if (is_local)
{
Context query_context = context;
auto input_block = executeQuery(request, query_context, true,
QueryProcessingStage::Complete, false, false).in;
auto input_block = executeQuery(request, query_context, true).getInputStream();
return readInvalidateQuery(*input_block);
}
else

View File

@ -302,10 +302,11 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);
BlockIO block_io = executeQuery(select_query, context.getGlobalContext(), true, QueryProcessingStage::Complete, false, false);
Block res = block_io.in->read();
BlockIO block_io = executeQuery(select_query, context.getGlobalContext(), true);
auto stream = block_io.getInputStream();
Block res = stream->read();
if (res && block_io.in->read())
if (res && stream->read())
throw Exception("Expected one block from input stream", ErrorCodes::LOGICAL_ERROR);
return res;

View File

@ -197,8 +197,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail,
ReadBuffer * istr,
bool allow_processors)
ReadBuffer * istr)
{
time_t current_time = time(nullptr);
@ -318,7 +317,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.resetInputCallbacks();
auto interpreter = InterpreterFactory::get(ast, context, stage);
bool use_processors = allow_processors && interpreter->canExecuteWithProcessors();
bool use_processors = interpreter->canExecuteWithProcessors();
std::shared_ptr<const EnabledQuota> quota;
if (!interpreter->ignoreQuota())
@ -581,13 +580,12 @@ BlockIO executeQuery(
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors)
bool may_have_embedded_data)
{
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr, allow_processors);
internal, stage, !may_have_embedded_data, nullptr);
if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
@ -602,6 +600,22 @@ BlockIO executeQuery(
return streams;
}
BlockIO executeQuery(
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors)
{
BlockIO res = executeQuery(query, context, internal, stage, may_have_embedded_data);
if (!allow_processors && res.pipeline.initialized())
res.in = res.getInputStream();
return res;
}
void executeQuery(
ReadBuffer & istr,
@ -648,7 +662,7 @@ void executeQuery(
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr, true);
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr);
auto & pipeline = streams.pipeline;

View File

@ -38,16 +38,6 @@ void executeQuery(
/// Correctly formatting the results (according to INTO OUTFILE and FORMAT sections)
/// must be done separately.
BlockIO executeQuery(
const String & query, /// Query text without INSERT data. The latter must be written to BlockIO::out.
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, /// To which stage the query must be executed.
bool may_have_embedded_data = false, /// If insert query may have embedded data
bool allow_processors = true /// If can use processors pipeline
);
QueryPipeline executeQueryWithProcessors(
const String & query, /// Query text without INSERT data. The latter must be written to BlockIO::out.
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
@ -55,4 +45,14 @@ QueryPipeline executeQueryWithProcessors(
bool may_have_embedded_data = false /// If insert query may have embedded data
);
/// Old interface with allow_processors flag. For compatibility.
BlockIO executeQuery(
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors /// If can use processors pipeline
);
}

View File

@ -0,0 +1,131 @@
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
PipelineExecutingBlockInputStream::PipelineExecutingBlockInputStream(QueryPipeline pipeline_)
: pipeline(std::make_unique<QueryPipeline>(std::move(pipeline_)))
{
}
PipelineExecutingBlockInputStream::~PipelineExecutingBlockInputStream() = default;
Block PipelineExecutingBlockInputStream::getHeader() const
{
if (executor)
return executor->getHeader();
if (async_executor)
return async_executor->getHeader();
return pipeline->getHeader();
}
void PipelineExecutingBlockInputStream::createExecutor()
{
if (pipeline->getNumThreads() > 1)
async_executor = std::make_unique<PullingAsyncPipelineExecutor>(*pipeline);
else
executor = std::make_unique<PullingPipelineExecutor>(*pipeline);
is_execution_started = true;
}
void PipelineExecutingBlockInputStream::readPrefixImpl()
{
createExecutor();
}
Block PipelineExecutingBlockInputStream::readImpl()
{
if (!is_execution_started)
createExecutor();
Block block;
bool can_continue = true;
while (can_continue)
{
if (executor)
can_continue = executor->pull(block);
else
can_continue = async_executor->pull(block);
if (block)
return block;
}
totals = executor ? executor->getTotalsBlock()
: async_executor->getTotalsBlock();
extremes = executor ? executor->getExtremesBlock()
: async_executor->getExtremesBlock();
return {};
}
inline static void throwIfExecutionStarted(bool is_execution_started, const char * method)
{
if (is_execution_started)
throw Exception(String("Cannot call ") + method +
" for PipelineExecutingBlockInputStream because execution was started",
ErrorCodes::LOGICAL_ERROR);
}
void PipelineExecutingBlockInputStream::cancel(bool kill)
{
IBlockInputStream::cancel(kill);
if (is_execution_started)
{
executor ? executor->cancel()
: async_executor->cancel();
}
}
void PipelineExecutingBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
throwIfExecutionStarted(is_execution_started, "setProgressCallback");
pipeline->setProgressCallback(callback);
}
void PipelineExecutingBlockInputStream::setProcessListElement(QueryStatus * elem)
{
throwIfExecutionStarted(is_execution_started, "setProcessListElement");
IBlockInputStream::setProcessListElement(elem);
pipeline->setProcessListElement(elem);
}
void PipelineExecutingBlockInputStream::setLimits(const IBlockInputStream::LocalLimits & limits_)
{
throwIfExecutionStarted(is_execution_started, "setLimits");
if (limits_.mode == LimitsMode::LIMITS_TOTAL)
throw Exception("Total limits are not supported by PipelineExecutingBlockInputStream",
ErrorCodes::LOGICAL_ERROR);
/// Local limits may be checked by IBlockInputStream itself.
IBlockInputStream::setLimits(limits_);
}
void PipelineExecutingBlockInputStream::setQuota(const std::shared_ptr<const EnabledQuota> &)
{
throw Exception("Quota is not supported by PipelineExecutingBlockInputStream",
ErrorCodes::LOGICAL_ERROR);
}
void PipelineExecutingBlockInputStream::addTotalRowsApprox(size_t)
{
throw Exception("Progress is not supported by PipelineExecutingBlockInputStream",
ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class QueryPipeline;
class PullingAsyncPipelineExecutor;
class PullingPipelineExecutor;
/// Implement IBlockInputStream from QueryPipeline.
/// It's a temporary wrapper.
class PipelineExecutingBlockInputStream : public IBlockInputStream
{
public:
explicit PipelineExecutingBlockInputStream(QueryPipeline pipeline_);
~PipelineExecutingBlockInputStream() override;
String getName() const override { return "PipelineExecuting"; }
Block getHeader() const override;
void cancel(bool kill) override;
/// Implement IBlockInputStream methods via QueryPipeline.
void setProgressCallback(const ProgressCallback & callback) final;
void setProcessListElement(QueryStatus * elem) final;
void setLimits(const LocalLimits & limits_) final;
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final;
void addTotalRowsApprox(size_t value) final;
protected:
void readPrefixImpl() override;
Block readImpl() override;
private:
std::unique_ptr<QueryPipeline> pipeline;
/// One of executors is used.
std::unique_ptr<PullingPipelineExecutor> executor; /// for singe thread.
std::unique_ptr<PullingAsyncPipelineExecutor> async_executor; /// for many threads.
bool is_execution_started = false;
void createExecutor();
};
}

View File

@ -491,6 +491,34 @@ void PipelineExecutor::execute(size_t num_threads)
throw;
}
finalizeExecution();
}
bool PipelineExecutor::executeStep(std::atomic_bool * yield_flag)
{
if (finished)
return false;
if (!is_execution_initialized)
initializeExecution(1);
executeStepImpl(0, 1, yield_flag);
if (!finished)
return true;
/// Execution can be stopped because of exception. Check and rethrow if any.
for (auto & node : graph)
if (node.execution_state->exception)
std::rethrow_exception(node.execution_state->exception);
finalizeExecution();
return false;
}
void PipelineExecutor::finalizeExecution()
{
if (process_list_element && process_list_element->isKilled())
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
@ -506,33 +534,39 @@ void PipelineExecutor::execute(size_t num_threads)
throw Exception("Pipeline stuck. Current state:\n" + dumpPipeline(), ErrorCodes::LOGICAL_ERROR);
}
void PipelineExecutor::wakeUpExecutor(size_t thread_num)
{
std::lock_guard guard(executor_contexts[thread_num]->mutex);
executor_contexts[thread_num]->wake_flag = true;
executor_contexts[thread_num]->condvar.notify_one();
}
void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads)
{
#ifndef NDEBUG
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
UInt64 wait_time_ns = 0;
executeStepImpl(thread_num, num_threads);
#ifndef NDEBUG
auto & context = executor_contexts[thread_num];
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Thread finished."
<< " Total time: " << (context->total_time_ns / 1e9) << " sec."
<< " Execution time: " << (context->execution_time_ns / 1e9) << " sec."
<< " Processing time: " << (context->processing_time_ns / 1e9) << " sec."
<< " Wait time: " << (context->wait_time_ns / 1e9) << " sec.");
#endif
}
void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, std::atomic_bool * yield_flag)
{
#ifndef NDEBUG
Stopwatch total_time_watch;
#endif
ExecutionState * state = nullptr;
auto & context = executor_contexts[thread_num];
auto & state = context->state;
bool yield = false;
auto prepare_processor = [&](UInt64 pid, Queue & queue)
{
if (!prepareProcessor(pid, thread_num, queue, std::unique_lock<std::mutex>(*graph[pid].status_mutex)))
finish();
};
auto wake_up_executor = [&](size_t executor)
{
std::lock_guard guard(executor_contexts[executor]->mutex);
executor_contexts[executor]->wake_flag = true;
executor_contexts[executor]->condvar.notify_one();
};
while (!finished)
while (!finished && !yield)
{
/// First, find any processor to execute.
/// Just travers graph and prepare any processor.
@ -555,7 +589,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
thread_to_wake = threads_queue.pop_any();
lock.unlock();
wake_up_executor(thread_to_wake);
wakeUpExecutor(thread_to_wake);
}
break;
@ -572,21 +606,21 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
}
{
std::unique_lock lock(executor_contexts[thread_num]->mutex);
std::unique_lock lock(context->mutex);
executor_contexts[thread_num]->condvar.wait(lock, [&]
context->condvar.wait(lock, [&]
{
return finished || executor_contexts[thread_num]->wake_flag;
return finished || context->wake_flag;
});
executor_contexts[thread_num]->wake_flag = false;
context->wake_flag = false;
}
}
if (finished)
break;
while (state)
while (state && !yield)
{
if (finished)
break;
@ -601,7 +635,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
state->job();
#ifndef NDEBUG
execution_time_ns += execution_time_watch.elapsed();
context->execution_time_ns += execution_time_watch.elapsed();
#endif
}
@ -623,8 +657,13 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
while (auto * task = expand_pipeline_task.load())
doExpandPipeline(task, true);
/// Execute again if can.
prepare_processor(state->processors_id, queue);
/// Prepare processor after execution.
{
auto lock = std::unique_lock<std::mutex>(*graph[state->processors_id].status_mutex);
if (!prepareProcessor(state->processors_id, thread_num, queue, std::move(lock)))
finish();
}
state = nullptr;
/// Take local task from queue if has one.
@ -656,7 +695,7 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
lock.unlock();
wake_up_executor(thread_to_wake);
wakeUpExecutor(thread_to_wake);
}
}
@ -666,27 +705,24 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
}
#ifndef NDEBUG
processing_time_ns += processing_time_watch.elapsed();
context->processing_time_ns += processing_time_watch.elapsed();
#endif
/// We have executed single processor. Check if we need to yield execution.
if (yield_flag && *yield_flag)
yield = true;
}
}
#ifndef NDEBUG
total_time_ns = total_time_watch.elapsed();
wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns;
LOG_TRACE(log, std::fixed << std::setprecision(3)
<< "Thread finished."
<< " Total time: " << (total_time_ns / 1e9) << " sec."
<< " Execution time: " << (execution_time_ns / 1e9) << " sec."
<< " Processing time: " << (processing_time_ns / 1e9) << " sec."
<< " Wait time: " << (wait_time_ns / 1e9) << " sec.");
context->total_time_ns += total_time_watch.elapsed();
context->wait_time_ns = context->total_time_ns - context->execution_time_ns - context->processing_time_ns;
#endif
}
void PipelineExecutor::executeImpl(size_t num_threads)
void PipelineExecutor::initializeExecution(size_t num_threads)
{
Stack stack;
is_execution_initialized = true;
threads_queue.init(num_threads);
task_queue.init(num_threads);
@ -699,25 +735,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
executor_contexts.emplace_back(std::make_unique<ExecutorContext>());
}
auto thread_group = CurrentThread::getGroup();
using ThreadsData = std::vector<ThreadFromGlobalPool>;
ThreadsData threads;
threads.reserve(num_threads);
bool finished_flag = false;
SCOPE_EXIT(
if (!finished_flag)
{
finish();
for (auto & thread : threads)
if (thread.joinable())
thread.join();
}
);
Stack stack;
addChildlessProcessorsToStack(stack);
{
@ -744,9 +762,32 @@ void PipelineExecutor::executeImpl(size_t num_threads)
}
}
}
}
void PipelineExecutor::executeImpl(size_t num_threads)
{
initializeExecution(num_threads);
using ThreadsData = std::vector<ThreadFromGlobalPool>;
ThreadsData threads;
threads.reserve(num_threads);
bool finished_flag = false;
SCOPE_EXIT(
if (!finished_flag)
{
finish();
for (auto & thread : threads)
if (thread.joinable())
thread.join();
}
);
if (num_threads > 1)
{
auto thread_group = CurrentThread::getGroup();
for (size_t i = 0; i < num_threads; ++i)
{

View File

@ -35,7 +35,10 @@ public:
/// In case of exception during execution throws any occurred.
void execute(size_t num_threads);
String getName() const { return "PipelineExecutor"; }
/// Execute single step. Step will be stopped when yield_flag is true.
/// Execution is happened in a single thread.
/// Return true if execution should be continued.
bool executeStep(std::atomic_bool * yield_flag = nullptr);
const Processors & getProcessors() const { return processors; }
@ -203,6 +206,8 @@ private:
ThreadsQueue threads_queue;
std::mutex task_queue_mutex;
/// Flag that checks that initializeExecution was called.
bool is_execution_initialized = false;
std::atomic_bool cancelled;
std::atomic_bool finished;
@ -235,7 +240,17 @@ private:
std::mutex mutex;
bool wake_flag = false;
/// std::queue<ExecutionState *> pinned_tasks;
/// Currently processing state.
ExecutionState * state = nullptr;
#ifndef NDEBUG
/// Time for different processing stages.
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
UInt64 wait_time_ns = 0;
#endif
};
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
@ -267,7 +282,15 @@ private:
bool prepareProcessor(UInt64 pid, size_t thread_number, Queue & queue, std::unique_lock<std::mutex> node_lock);
bool doExpandPipeline(ExpandPipelineTask * task, bool processing);
/// Continue executor (in case there are tasks in queue).
void wakeUpExecutor(size_t thread_num);
void initializeExecution(size_t num_threads); /// Initialize executor contexts and task_queue.
void finalizeExecution(); /// Check all processors are finished.
/// Methods connected to execution.
void executeImpl(size_t num_threads);
void executeStepImpl(size_t thread_num, size_t num_threads, std::atomic_bool * yield_flag = nullptr);
void executeSingleThread(size_t thread_num, size_t num_threads);
void finish();

View File

@ -0,0 +1,203 @@
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Formats/LazyOutputFormat.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/QueryPipeline.h>
#include <Common/setThreadName.h>
#include <ext/scope_guard.h>
namespace DB
{
struct PullingAsyncPipelineExecutor::Data
{
PipelineExecutorPtr executor;
std::exception_ptr exception;
std::atomic_bool is_executed = false;
std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread;
~Data()
{
if (thread.joinable())
thread.join();
}
void rethrowExceptionIfHas()
{
if (has_exception)
{
has_exception = false;
std::rethrow_exception(std::move(exception));
}
}
};
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
}
PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor()
{
try
{
cancel();
}
catch (...)
{
tryLogCurrentException("PullingAsyncPipelineExecutor");
}
}
const Block & PullingAsyncPipelineExecutor::getHeader() const
{
return lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader();
}
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("QueryPipelineEx");
try
{
data.executor->execute(num_threads);
}
catch (...)
{
data.exception = std::current_exception();
data.has_exception = true;
}
}
bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
{
if (!data)
{
data = std::make_unique<Data>();
data->executor = pipeline.execute();
auto func = [&, thread_group = CurrentThread::getGroup()]()
{
threadFunction(*data, thread_group, pipeline.getNumThreads());
};
data->thread = ThreadFromGlobalPool(std::move(func));
}
if (data->has_exception)
{
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
lazy_format->finish();
data->has_exception = false;
std::rethrow_exception(std::move(data->exception));
}
if (lazy_format->isFinished())
{
data->is_executed = true;
/// Wait thread ant rethrow exception if any.
cancel();
return false;
}
chunk = lazy_format->getChunk(milliseconds);
return true;
}
bool PullingAsyncPipelineExecutor::pull(Block & block, uint64_t milliseconds)
{
Chunk chunk;
if (!pull(chunk, milliseconds))
return false;
if (!chunk)
{
/// In case if timeout exceeded.
block.clear();
return true;
}
block = lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns());
if (auto chunk_info = chunk.getChunkInfo())
{
if (const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(chunk_info.get()))
{
block.info.bucket_num = agg_info->bucket_num;
block.info.is_overflows = agg_info->is_overflows;
}
}
return true;
}
void PullingAsyncPipelineExecutor::cancel()
{
/// Cancel execution if it wasn't finished.
if (data && !data->is_executed && data->executor)
data->executor->cancel();
/// Finish lazy format. Otherwise thread.join() may hung.
if (!lazy_format->isFinished())
lazy_format->finish();
/// Join thread here to wait for possible exception.
if (data && data->thread.joinable())
data->thread.join();
/// Rethrow exception to not swallow it in destructor.
if (data)
data->rethrowExceptionIfHas();
}
Chunk PullingAsyncPipelineExecutor::getTotals()
{
return lazy_format->getTotals();
}
Chunk PullingAsyncPipelineExecutor::getExtremes()
{
return lazy_format->getExtremes();
}
Block PullingAsyncPipelineExecutor::getTotalsBlock()
{
auto totals = getTotals();
if (totals.empty())
return {};
const auto & header = lazy_format->getPort(IOutputFormat::PortKind::Totals).getHeader();
return header.cloneWithColumns(totals.detachColumns());
}
Block PullingAsyncPipelineExecutor::getExtremesBlock()
{
auto extremes = getExtremes();
if (extremes.empty())
return {};
const auto & header = lazy_format->getPort(IOutputFormat::PortKind::Extremes).getHeader();
return header.cloneWithColumns(extremes.detachColumns());
}
BlockStreamProfileInfo & PullingAsyncPipelineExecutor::getProfileInfo()
{
return lazy_format->getProfileInfo();
}
}

View File

@ -0,0 +1,58 @@
#pragma once
#include <memory>
namespace DB
{
class QueryPipeline;
class Block;
class Chunk;
class LazyOutputFormat;
struct BlockStreamProfileInfo;
/// Asynchronous pulling executor for QueryPipeline.
/// Always creates extra thread. If query is executed in single thread, use PullingPipelineExecutor.
/// Typical usage is:
///
/// PullingAsyncPipelineExecutor executor(query_pipeline);
/// while (executor.pull(chunk, timeout))
/// ... process chunk ...
class PullingAsyncPipelineExecutor
{
public:
explicit PullingAsyncPipelineExecutor(QueryPipeline & pipeline_);
~PullingAsyncPipelineExecutor();
/// Get structure of returned block or chunk.
const Block & getHeader() const;
/// Methods return false if query is finished.
/// If milliseconds > 0, returns empty object and `true` after timeout exceeded. Otherwise method is blocking.
/// You can use any pull method.
bool pull(Chunk & chunk, uint64_t milliseconds = 0);
bool pull(Block & block, uint64_t milliseconds = 0);
/// Stop execution. It is not necessary, but helps to stop execution before executor is destroyed.
void cancel();
/// Get totals and extremes. Returns empty chunk if doesn't have any.
Chunk getTotals();
Chunk getExtremes();
/// Get totals and extremes. Returns empty chunk if doesn't have any.
Block getTotalsBlock();
Block getExtremesBlock();
/// Get query profile info.
BlockStreamProfileInfo & getProfileInfo();
/// Internal executor data.
struct Data;
private:
QueryPipeline & pipeline;
std::shared_ptr<LazyOutputFormat> lazy_format;
std::unique_ptr<Data> data;
};
}

View File

@ -1,43 +1,15 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Formats/LazyOutputFormat.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Formats/PullingOutputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Common/setThreadName.h>
#include <ext/scope_guard.h>
#include <Processors/Transforms/AggregatingTransform.h>
namespace DB
{
struct PullingPipelineExecutor::Data
{
PipelineExecutorPtr executor;
std::exception_ptr exception;
std::atomic_bool is_executed = false;
std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread;
~Data()
{
if (thread.joinable())
thread.join();
}
void rethrowExceptionIfHas()
{
if (has_exception)
{
has_exception = false;
std::rethrow_exception(std::move(exception));
}
}
};
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
pulling_format = std::make_shared<PullingOutputFormat>(pipeline.getHeader(), has_data_flag);
pipeline.setOutput(pulling_format);
}
PullingPipelineExecutor::~PullingPipelineExecutor()
@ -52,70 +24,28 @@ PullingPipelineExecutor::~PullingPipelineExecutor()
}
}
static void threadFunction(PullingPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
const Block & PullingPipelineExecutor::getHeader() const
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
setThreadName("QueryPipelineEx");
try
{
data.executor->execute(num_threads);
}
catch (...)
{
data.exception = std::current_exception();
data.has_exception = true;
}
return pulling_format->getPort(IOutputFormat::PortKind::Main).getHeader();
}
bool PullingPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
bool PullingPipelineExecutor::pull(Chunk & chunk)
{
if (!data)
{
data = std::make_unique<Data>();
data->executor = pipeline.execute();
if (!executor)
executor = pipeline.execute();
auto func = [&, thread_group = CurrentThread::getGroup()]()
{
threadFunction(*data, thread_group, pipeline.getNumThreads());
};
data->thread = ThreadFromGlobalPool(std::move(func));
}
if (data->has_exception)
{
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
lazy_format->finish();
data->has_exception = false;
std::rethrow_exception(std::move(data->exception));
}
if (lazy_format->isFinished())
{
data->is_executed = true;
/// Wait thread ant rethrow exception if any.
cancel();
if (!executor->executeStep(&has_data_flag))
return false;
}
chunk = lazy_format->getChunk(milliseconds);
chunk = pulling_format->getChunk();
return true;
}
bool PullingPipelineExecutor::pull(Block & block, uint64_t milliseconds)
bool PullingPipelineExecutor::pull(Block & block)
{
Chunk chunk;
if (!pull(chunk, milliseconds))
if (!pull(chunk))
return false;
if (!chunk)
@ -125,7 +55,7 @@ bool PullingPipelineExecutor::pull(Block & block, uint64_t milliseconds)
return true;
}
block = lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns());
block = pulling_format->getPort(IOutputFormat::PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns());
if (auto chunk_info = chunk.getChunkInfo())
{
@ -142,30 +72,22 @@ bool PullingPipelineExecutor::pull(Block & block, uint64_t milliseconds)
void PullingPipelineExecutor::cancel()
{
/// Cancel execution if it wasn't finished.
if (data && !data->is_executed && data->executor)
data->executor->cancel();
if (executor)
executor->cancel();
/// Finish lazy format. Otherwise thread.join() may hung.
if (!lazy_format->isFinished())
lazy_format->finish();
/// Join thread here to wait for possible exception.
if (data && data->thread.joinable())
data->thread.join();
/// Rethrow exception to not swallow it in destructor.
if (data)
data->rethrowExceptionIfHas();
/// Read all data and finish execution.
Chunk chunk;
while (pull(chunk));
}
Chunk PullingPipelineExecutor::getTotals()
{
return lazy_format->getTotals();
return pulling_format->getTotals();
}
Chunk PullingPipelineExecutor::getExtremes()
{
return lazy_format->getExtremes();
return pulling_format->getExtremes();
}
Block PullingPipelineExecutor::getTotalsBlock()
@ -175,7 +97,7 @@ Block PullingPipelineExecutor::getTotalsBlock()
if (totals.empty())
return {};
const auto & header = lazy_format->getPort(IOutputFormat::PortKind::Totals).getHeader();
const auto & header = pulling_format->getPort(IOutputFormat::PortKind::Totals).getHeader();
return header.cloneWithColumns(totals.detachColumns());
}
@ -186,13 +108,13 @@ Block PullingPipelineExecutor::getExtremesBlock()
if (extremes.empty())
return {};
const auto & header = lazy_format->getPort(IOutputFormat::PortKind::Extremes).getHeader();
const auto & header = pulling_format->getPort(IOutputFormat::PortKind::Extremes).getHeader();
return header.cloneWithColumns(extremes.detachColumns());
}
BlockStreamProfileInfo & PullingPipelineExecutor::getProfileInfo()
{
return lazy_format->getProfileInfo();
return pulling_format->getProfileInfo();
}
}

View File

@ -1,16 +1,20 @@
#pragma once
#include <memory>
#include <atomic>
namespace DB
{
class QueryPipeline;
class Block;
class Chunk;
class LazyOutputFormat;
class QueryPipeline;
class PipelineExecutor;
class PullingOutputFormat;
struct BlockStreamProfileInfo;
/// Pulling executor for QueryPipeline.
using PipelineExecutorPtr = std::shared_ptr<PipelineExecutor>;
/// Pulling executor for QueryPipeline. Always execute pipeline in single thread.
/// Typical usage is:
///
/// PullingPipelineExecutor executor(query_pipeline);
@ -22,11 +26,13 @@ public:
explicit PullingPipelineExecutor(QueryPipeline & pipeline_);
~PullingPipelineExecutor();
/// Get structure of returned block or chunk.
const Block & getHeader() const;
/// Methods return false if query is finished.
/// If milliseconds > 0, returns empty object and `true` after timeout exceeded.
/// You can use any pull method.
bool pull(Chunk & chunk, uint64_t milliseconds = 0);
bool pull(Block & block, uint64_t milliseconds = 0);
bool pull(Chunk & chunk);
bool pull(Block & block);
/// Stop execution. It is not necessary, but helps to stop execution before executor is destroyed.
void cancel();
@ -42,13 +48,11 @@ public:
/// Get query profile info.
BlockStreamProfileInfo & getProfileInfo();
/// Internal executor data.
struct Data;
private:
std::atomic_bool has_data_flag = false;
QueryPipeline & pipeline;
std::shared_ptr<LazyOutputFormat> lazy_format;
std::unique_ptr<Data> data;
std::shared_ptr<PullingOutputFormat> pulling_format;
PipelineExecutorPtr executor;
};
}

View File

@ -9,7 +9,7 @@ namespace DB
/// LazyOutputFormat is used to retrieve ready data from executing pipeline.
/// You can periodically call `getChunk` from separate thread.
/// Used in PullingPipelineExecutor.
/// Used in PullingAsyncPipelineExecutor.
class LazyOutputFormat : public IOutputFormat
{

View File

@ -0,0 +1,42 @@
#include <Processors/Formats/PullingOutputFormat.h>
#include <IO/WriteBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
WriteBuffer PullingOutputFormat::out(nullptr, 0);
void PullingOutputFormat::consume(Chunk chunk)
{
if (data)
throw Exception("PullingOutputFormat cannot consume chunk because it already has data",
ErrorCodes::LOGICAL_ERROR);
if (chunk)
info.update(chunk.getNumRows(), chunk.allocatedBytes());
data = std::move(chunk);
has_data_flag = true;
}
Chunk PullingOutputFormat::getChunk()
{
auto chunk = std::move(data);
has_data_flag = false;
return chunk;
}
Chunk PullingOutputFormat::getTotals() { return std::move(totals); }
Chunk PullingOutputFormat::getExtremes() { return std::move(extremes); }
void PullingOutputFormat::setRowsBeforeLimit(size_t rows_before_limit)
{
info.setRowsBeforeLimit(rows_before_limit);
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <Processors/Formats/IOutputFormat.h>
#include <DataStreams/BlockStreamProfileInfo.h>
namespace DB
{
/// Output format which is used in PullingPipelineExecutor.
class PullingOutputFormat : public IOutputFormat
{
public:
explicit PullingOutputFormat(const Block & header, std::atomic_bool & consume_data_flag_)
: IOutputFormat(header, out)
, has_data_flag(consume_data_flag_)
{}
String getName() const override { return "PullingOutputFormat"; }
Chunk getChunk();
Chunk getTotals();
Chunk getExtremes();
BlockStreamProfileInfo & getProfileInfo() { return info; }
void setRowsBeforeLimit(size_t rows_before_limit) override;
protected:
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
private:
Chunk data;
Chunk totals;
Chunk extremes;
std::atomic_bool & has_data_flag;
BlockStreamProfileInfo info;
/// Is not used.
static WriteBuffer out;
};
}

View File

@ -10,7 +10,9 @@ SRCS(
Chunk.cpp
ConcatProcessor.cpp
DelayedPortsProcessor.cpp
Executors/PipelineExecutingBlockInputStream.cpp
Executors/PipelineExecutor.cpp
Executors/PullingAsyncPipelineExecutor.cpp
Executors/PullingPipelineExecutor.cpp
Executors/TreeExecutorBlockInputStream.cpp
ForkProcessor.cpp
@ -57,6 +59,7 @@ SRCS(
Formats/IRowOutputFormat.cpp
Formats/LazyOutputFormat.cpp
Formats/OutputStreamToOutputFormat.cpp
Formats/PullingOutputFormat.cpp
Formats/RowInputFormatWithDiagnosticInfo.cpp
IAccumulatingTransform.cpp
IInflatingTransform.cpp