Removed allow_processors flag from executeQuery().

This commit is contained in:
Nikolai Kochetov 2020-05-18 16:55:07 +03:00
parent cdb742dd4b
commit 0e48cb1f80
10 changed files with 198 additions and 23 deletions

View File

@ -1,9 +1,31 @@
#include <DataStreams/BlockIO.h>
#include <Interpreters/ProcessList.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
BlockInputStreamPtr BlockIO::getInputStream()
{
if (out)
throw Exception("Cannot get input stream from BlockIO because output stream is not empty",
ErrorCodes::LOGICAL_ERROR);
if (in)
return in;
if (pipeline.initialized())
return std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
throw Exception("Cannot get input stream from BlockIO because query pipeline was not initialized",
ErrorCodes::LOGICAL_ERROR);
}
void BlockIO::reset()
{
/** process_list_entry should be destroyed after in, after out and after pipeline,

View File

@ -50,6 +50,9 @@ struct BlockIO
exception_callback();
}
/// Returns in or converts pipeline to stream. Throws if out is not empty.
BlockInputStreamPtr getInputStream();
private:
void reset();
};

View File

@ -131,10 +131,10 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
*/
if (is_local)
{
BlockIO res = executeQuery(load_all_query, context, true, QueryProcessingStage::Complete, false, false);
auto stream = executeQuery(load_all_query, context, true).getInputStream();
/// FIXME res.in may implicitly use some objects owned be res, but them will be destructed after return
res.in = std::make_shared<ConvertingBlockInputStream>(res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
stream = std::make_shared<ConvertingBlockInputStream>(stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return stream;
}
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
}
@ -144,9 +144,9 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
std::string load_update_query = getUpdateFieldAndDate();
if (is_local)
{
auto res = executeQuery(load_update_query, context, true, QueryProcessingStage::Complete, false, false);
res.in = std::make_shared<ConvertingBlockInputStream>(res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
auto stream = executeQuery(load_update_query, context, true).getInputStream();
stream = std::make_shared<ConvertingBlockInputStream>(stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return stream;
}
return std::make_shared<RemoteBlockInputStream>(pool, load_update_query, sample_block, context);
}
@ -191,10 +191,10 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
{
auto res = executeQuery(query, context, true, QueryProcessingStage::Complete, false, false);
res.in = std::make_shared<ConvertingBlockInputStream>(
res.in, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res.in;
auto res = executeQuery(query, context, true).getInputStream();
res = std::make_shared<ConvertingBlockInputStream>(
res, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res;
}
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
@ -206,8 +206,7 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
if (is_local)
{
Context query_context = context;
auto input_block = executeQuery(request, query_context, true,
QueryProcessingStage::Complete, false, false).in;
auto input_block = executeQuery(request, query_context, true).getInputStream();
return readInvalidateQuery(*input_block);
}
else

View File

@ -302,8 +302,9 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);
BlockIO block_io = executeQuery(select_query, context.getGlobalContext(), true, QueryProcessingStage::Complete, false, false);
Block res = block_io.in->read();
BlockIO block_io = executeQuery(select_query, context.getGlobalContext(), true);
auto stream = block_io.getInputStream();
Block res = stream->read();
if (res && block_io.in->read())
throw Exception("Expected one block from input stream", ErrorCodes::LOGICAL_ERROR);

View File

@ -196,8 +196,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail,
ReadBuffer * istr,
bool allow_processors)
ReadBuffer * istr)
{
time_t current_time = time(nullptr);
@ -317,7 +316,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.resetInputCallbacks();
auto interpreter = InterpreterFactory::get(ast, context, stage);
bool use_processors = allow_processors && interpreter->canExecuteWithProcessors();
bool use_processors = interpreter->canExecuteWithProcessors();
std::shared_ptr<const EnabledQuota> quota;
if (!interpreter->ignoreQuota())
@ -580,13 +579,12 @@ BlockIO executeQuery(
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors)
bool may_have_embedded_data)
{
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr, allow_processors);
internal, stage, !may_have_embedded_data, nullptr);
if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()))
{
@ -647,7 +645,7 @@ void executeQuery(
ASTPtr ast;
BlockIO streams;
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr, true);
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr);
auto & pipeline = streams.pipeline;

View File

@ -42,8 +42,7 @@ BlockIO executeQuery(
Context & context, /// DB, tables, data types, storage engines, functions, aggregate functions...
bool internal = false, /// If true, this query is caused by another query and thus needn't be registered in the ProcessList.
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, /// To which stage the query must be executed.
bool may_have_embedded_data = false, /// If insert query may have embedded data
bool allow_processors = true /// If can use processors pipeline
bool may_have_embedded_data = false /// If insert query may have embedded data
);

View File

@ -0,0 +1,106 @@
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
PipelineExecutingBlockInputStream::PipelineExecutingBlockInputStream(QueryPipeline pipeline_)
: pipeline(std::make_unique<QueryPipeline>(std::move(pipeline_)))
{
}
PipelineExecutingBlockInputStream::~PipelineExecutingBlockInputStream() = default;
Block PipelineExecutingBlockInputStream::getHeader() const
{
return executor ? executor->getHeader()
: pipeline->getHeader();
}
void PipelineExecutingBlockInputStream::readPrefixImpl()
{
executor = std::make_unique<PullingPipelineExecutor>(*pipeline);
}
Block PipelineExecutingBlockInputStream::readImpl()
{
Block block;
while (executor->pull(block))
{
if (block)
return block;
}
return {};
}
inline static void throwIfExecutionStarted(bool is_execution_started, const char * method)
{
if (is_execution_started)
throw Exception(String("Cannot call ") + method +
" for PipelineExecutingBlockInputStream because execution was started",
ErrorCodes::LOGICAL_ERROR);
}
inline static void throwIfExecutionNotStarted(bool is_execution_started, const char * method)
{
if (!is_execution_started)
throw Exception(String("Cannot call ") + method +
" for PipelineExecutingBlockInputStream because execution was not started",
ErrorCodes::LOGICAL_ERROR);
}
void PipelineExecutingBlockInputStream::cancel(bool kill)
{
throwIfExecutionNotStarted(executor != nullptr, "cancel");
IBlockInputStream::cancel(kill);
executor->cancel();
}
void PipelineExecutingBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
throwIfExecutionStarted(executor != nullptr, "setProgressCallback");
pipeline->setProgressCallback(callback);
}
void PipelineExecutingBlockInputStream::setProcessListElement(QueryStatus * elem)
{
throwIfExecutionStarted(executor != nullptr, "setProcessListElement");
IBlockInputStream::setProcessListElement(elem);
pipeline->setProcessListElement(elem);
}
void PipelineExecutingBlockInputStream::setLimits(const IBlockInputStream::LocalLimits & limits_)
{
throwIfExecutionStarted(executor != nullptr, "setLimits");
if (limits_.mode == LimitsMode::LIMITS_TOTAL)
throw Exception("Total limits are not supported by PipelineExecutingBlockInputStream",
ErrorCodes::LOGICAL_ERROR);
/// Local limits may be checked by IBlockInputStream itself.
IBlockInputStream::setLimits(limits_);
}
void PipelineExecutingBlockInputStream::setQuota(const std::shared_ptr<const EnabledQuota> &)
{
throw Exception("Quota is not supported by PipelineExecutingBlockInputStream",
ErrorCodes::LOGICAL_ERROR);
}
void PipelineExecutingBlockInputStream::addTotalRowsApprox(size_t)
{
throw Exception("Progress is not supported by PipelineExecutingBlockInputStream",
ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class QueryPipeline;
class PullingPipelineExecutor;
/// Implement IBlockInputStream from QueryPipeline.
/// It's a temporary wrapper.
class PipelineExecutingBlockInputStream : public IBlockInputStream
{
public:
explicit PipelineExecutingBlockInputStream(QueryPipeline pipeline_);
~PipelineExecutingBlockInputStream();
String getName() const override { return "PipelineExecuting"; }
Block getHeader() const override;
void cancel(bool kill) override;
/// Implement IBlockInputStream methods via QueryPipeline.
void setProgressCallback(const ProgressCallback & callback) final;
void setProcessListElement(QueryStatus * elem) final;
void setLimits(const LocalLimits & limits_) final;
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final;
void addTotalRowsApprox(size_t value) final;
protected:
void readPrefixImpl() override;
Block readImpl() override;
private:
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<QueryPipeline> pipeline;
};
}

View File

@ -52,6 +52,11 @@ PullingPipelineExecutor::~PullingPipelineExecutor()
}
}
const Block & PullingPipelineExecutor::getHeader() const
{
return lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader();
}
static void threadFunction(PullingPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
{
if (thread_group)

View File

@ -22,6 +22,9 @@ public:
explicit PullingPipelineExecutor(QueryPipeline & pipeline_);
~PullingPipelineExecutor();
/// Get structure of returned block or chunk.
const Block & getHeader() const;
/// Methods return false if query is finished.
/// If milliseconds > 0, returns empty object and `true` after timeout exceeded.
/// You can use any pull method.