Fix explain for ISourceStep.

This commit is contained in:
Nikolai Kochetov 2020-09-25 16:19:26 +03:00
parent dea90009e3
commit 576ffadb17
11 changed files with 43 additions and 12 deletions

View File

@ -90,6 +90,8 @@ std::unique_ptr<QueryPlan> createLocalPlan(
auto converting = std::make_unique<ConvertingStep>(query_plan->getCurrentDataStream(), header, true); auto converting = std::make_unique<ConvertingStep>(query_plan->getCurrentDataStream(), header, true);
converting->setStepDescription("Convert block structure for query from local replica"); converting->setStepDescription("Convert block structure for query from local replica");
query_plan->addStep(std::move(converting)); query_plan->addStep(std::move(converting));
return query_plan;
} }
String formattedAST(const ASTPtr & ast) String formattedAST(const ASTPtr & ast)

View File

@ -39,7 +39,7 @@ public:
const String & query, const ASTPtr & query_ast, const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler, const Context & context, const ThrottlerPtr & throttler,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
std::vector<QueryPlanPtr> & res, std::vector<QueryPlanPtr> & plans,
Pipes & remote_pipes, Pipes & remote_pipes,
Pipes & delayed_pipes) override; Pipes & delayed_pipes) override;

View File

@ -1107,9 +1107,9 @@ static StreamLocalLimits getLimitsForStorage(const Settings & settings, const Se
return limits; return limits;
} }
static void addEmptySource(QueryPlan & query_plan, const Block & header, SelectQueryInfo & query_info) void InterpreterSelectQuery::addEmptySourceToQueryPlan(QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info)
{ {
Pipe pipe(std::make_shared<NullSource>(header)); Pipe pipe(std::make_shared<NullSource>(source_header));
if (query_info.prewhere_info) if (query_info.prewhere_info)
{ {
@ -1511,7 +1511,7 @@ void InterpreterSelectQuery::executeFetchColumns(
{ {
auto header = metadata_snapshot->getSampleBlockForColumns( auto header = metadata_snapshot->getSampleBlockForColumns(
required_columns, storage->getVirtuals(), storage->getStorageID()); required_columns, storage->getVirtuals(), storage->getStorageID());
addEmptySource(query_plan, header, query_info); addEmptySourceToQueryPlan(query_plan, header, query_info);
} }
/// Extend lifetime of context, table lock, storage. Set limits and quota. /// Extend lifetime of context, table lock, storage. Set limits and quota.

View File

@ -94,6 +94,8 @@ public:
const SelectQueryInfo & getQueryInfo() const { return query_info; } const SelectQueryInfo & getQueryInfo() const { return query_info; }
static void addEmptySourceToQueryPlan(QueryPlan & query_plan, const Block & source_header, const SelectQueryInfo & query_info);
private: private:
InterpreterSelectQuery( InterpreterSelectQuery(
const ASTPtr & query_ptr_, const ASTPtr & query_ptr_,

View File

@ -14,7 +14,8 @@ QueryPipelinePtr ISourceStep::updatePipeline(QueryPipelines)
auto pipeline = std::make_unique<QueryPipeline>(); auto pipeline = std::make_unique<QueryPipeline>();
QueryPipelineProcessorsCollector collector(*pipeline, this); QueryPipelineProcessorsCollector collector(*pipeline, this);
initializePipeline(*pipeline); initializePipeline(*pipeline);
processors = collector.detachProcessors(); auto added_processors = collector.detachProcessors();
processors.insert(processors.end(), added_processors.begin(), added_processors.end());
return pipeline; return pipeline;
} }

View File

@ -16,7 +16,7 @@ public:
void describePipeline(FormatSettings & settings) const override; void describePipeline(FormatSettings & settings) const override;
private: protected:
/// We collect processors got after pipeline transformation. /// We collect processors got after pipeline transformation.
Processors processors; Processors processors;
}; };

View File

@ -5,7 +5,7 @@ namespace DB
{ {
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Context> context_) ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Context> context_)
: ISourceStep(DataStream{.header = pipe_.getHeader(), .has_single_port = true}) : ISourceStep(DataStream{.header = pipe_.getHeader()})
, pipe(std::move(pipe_)) , pipe(std::move(pipe_))
, context(std::move(context_)) , context(std::move(context_))
{ {
@ -13,6 +13,9 @@ ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Conte
void ReadFromPreparedSource::initializePipeline(QueryPipeline & pipeline) void ReadFromPreparedSource::initializePipeline(QueryPipeline & pipeline)
{ {
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe)); pipeline.init(std::move(pipe));
if (context) if (context)

View File

@ -23,8 +23,8 @@ private:
class ReadFromStorageStep : public ReadFromPreparedSource class ReadFromStorageStep : public ReadFromPreparedSource
{ {
public: public:
ReadFromStorageStep(Pipe pipe, String storage_name) ReadFromStorageStep(Pipe pipe_, String storage_name)
: ReadFromPreparedSource(std::move(pipe)) : ReadFromPreparedSource(std::move(pipe_))
{ {
setStepDescription(storage_name); setStepDescription(storage_name);
} }

View File

@ -12,6 +12,7 @@
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Interpreters/ExpressionActions.h> #include <Interpreters/ExpressionActions.h>
#include <Interpreters/InterpreterSelectQuery.h>
namespace DB namespace DB
@ -103,8 +104,16 @@ void IStorage::read(
unsigned num_streams) unsigned num_streams)
{ {
auto pipe = read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); auto pipe = read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
auto read_step = std::make_unique<ReadFromStorageStep>(std::move(pipe), getName()); if (pipe.empty())
query_plan.addStep(std::move(read_step)); {
auto header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info);
}
else
{
auto read_step = std::make_unique<ReadFromStorageStep>(std::move(pipe), getName());
query_plan.addStep(std::move(read_step));
}
} }
Pipe IStorage::alterPartition( Pipe IStorage::alterPartition(

View File

@ -495,6 +495,20 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Con
} }
Pipe StorageDistributed::read( Pipe StorageDistributed::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline()));
}
void StorageDistributed::read(
QueryPlan & query_plan, QueryPlan & query_plan,
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,

View File

@ -77,7 +77,7 @@ public:
size_t max_block_size, size_t max_block_size,
unsigned num_streams) override; unsigned num_streams) override;
Pipe StorageDistributed::read( void read(
QueryPlan & query_plan, QueryPlan & query_plan,
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,