Pass query to the IStorage::getQueryProcessingStage()

Will be used to return correct stage with optimize_skip_unused_shards.
This commit is contained in:
Azat Khuzhin 2020-03-22 20:43:01 +03:00
parent df19d6aac1
commit 66ccbf5d11
11 changed files with 19 additions and 16 deletions

View File

@ -508,7 +508,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl(bool try_move_to_prewhere)
}
if (storage && !options.only_analyze)
from_stage = storage->getQueryProcessingStage(*context);
from_stage = storage->getQueryProcessingStage(*context, query_ptr);
/// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing.
bool first_stage = from_stage < QueryProcessingStage::WithMergeableState

View File

@ -220,8 +220,11 @@ public:
/** Returns stage to which query is going to be processed in read() function.
* (Normally, the function only reads the columns from the list, but in other cases,
* for example, the request can be partially processed on a remote server.)
*
* SelectQueryInfo is required since the stage can depends on the query
* (see Distributed() engine and optimize_skip_unused_shards).
*/
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const { return QueryProcessingStage::FetchColumns; }
virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr & = nullptr) const { return QueryProcessingStage::FetchColumns; }
/** Watch live changes to the table.
* Accepts a list of columns to read, as well as a description of the query,

View File

@ -26,7 +26,7 @@ public:
return std::make_shared<StorageBlocks>(table_id, columns, std::move(pipes), to_stage);
}
std::string getName() const override { return "Blocks"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr & = nullptr) const override { return to_stage; }
Pipes read(
const Names & /*column_names*/,

View File

@ -135,7 +135,7 @@ private:
};
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
if (destination_id)
{
@ -144,7 +144,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->getQueryProcessingStage(context);
return destination->getQueryProcessingStage(context, query_ptr);
}
return QueryProcessingStage::FetchColumns;

View File

@ -54,7 +54,7 @@ public:
std::string getName() const override { return "Buffer"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr & = nullptr) const override;
Pipes read(
const Names & column_names,

View File

@ -369,7 +369,7 @@ static QueryProcessingStage::Enum getQueryProcessingStageImpl(const Context & co
: QueryProcessingStage::WithMergeableState;
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context, const ASTPtr &) const
{
auto cluster = getCluster();
return getQueryProcessingStageImpl(context, cluster);

View File

@ -66,7 +66,7 @@ public:
bool isRemote() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr & = nullptr) const override;
Pipes read(
const Names & column_names,

View File

@ -171,9 +171,9 @@ StorageInMemoryMetadata StorageMaterializedView::getInMemoryMetadata() const
return result;
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
return getTargetTable()->getQueryProcessingStage(context);
return getTargetTable()->getQueryProcessingStage(context, query_ptr);
}
Pipes StorageMaterializedView::read(

View File

@ -59,7 +59,7 @@ public:
void checkTableCanBeDropped() const override;
void checkPartitionCanBeDropped(const ASTPtr & partition) override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, const ASTPtr & = nullptr) const override;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;

View File

@ -136,7 +136,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, cons
}
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context, const ASTPtr & query_ptr) const
{
auto stage_in_source_tables = QueryProcessingStage::FetchColumns;
@ -150,7 +150,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
if (table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context));
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(context, query_ptr));
}
iterator->next();
@ -287,7 +287,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
return pipes;
}
if (processed_stage <= storage->getQueryProcessingStage(*modified_context))
if (processed_stage <= storage->getQueryProcessingStage(*modified_context, query_info.query))
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
@ -295,7 +295,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
pipes = storage->read(real_column_names, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
}
else if (processed_stage > storage->getQueryProcessingStage(*modified_context))
else if (processed_stage > storage->getQueryProcessingStage(*modified_context, query_info.query))
{
modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, table_name);

View File

@ -31,7 +31,7 @@ public:
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const override;
QueryProcessingStage::Enum getQueryProcessingStage(const Context &, const ASTPtr & = nullptr) const override;
Pipes read(
const Names & column_names,