add metadata_snapshot to getQueryProcessingStage

This commit is contained in:
Amos Bird 2021-04-22 21:32:17 +08:00
parent 264cff6415
commit cd6414639e
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
17 changed files with 65 additions and 21 deletions

View File

@ -432,6 +432,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
// ugly but works
metadata_snapshot->selected_projection = query_info.aggregate_projection;
// TODO In expression analyzer we need to check if storage mayBenefitFromIndexForIn. In case
// we use some projection to execute the query, we need to provide what projection we are
// going to use here.
// It analyzes aggregation
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context, metadata_snapshot,
NameSet(required_result_column_names.begin(), required_result_column_names.end()),

View File

@ -39,6 +39,7 @@ protected:
private:
Block projection;
// Hold projection's parent parts duration execution
MergeTreeData::DataPartsVector parent_parts;
};

View File

@ -235,7 +235,8 @@ public:
* QueryProcessingStage::Enum required for Distributed over Distributed,
* since it cannot return Complete for intermediate queries never.
*/
virtual QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const
virtual QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const
{
return QueryProcessingStage::FetchColumns;
}

View File

@ -33,7 +33,11 @@ public:
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override { return to_stage; }
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override
{
return to_stage;
}
Pipe read(
const Names & /*column_names*/,

View File

@ -113,6 +113,8 @@ struct InputOrderInfo
bool operator !=(const InputOrderInfo & other) const { return !(*this == other); }
};
class IMergeTreeDataPart;
/** Query along with some additional data,
* that can be used during query processing
* inside storage engines.
@ -144,10 +146,19 @@ struct SelectQueryInfo
ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; }
/// If not null, it means we choose an aggregate projection to execute current query.
const ProjectionDescription * aggregate_projection{};
ProjectionKeyActions key_actions;
Names projection_names;
Block projection_block;
/// Store to-be-scanned data parts if some aggregate projection is used
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
DataPartsVector projection_parts;
DataPartsVector parent_parts;
DataPartsVector normal_parts;
};
}

View File

@ -178,7 +178,11 @@ private:
};
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(ContextPtr local_context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
SelectQueryInfo & query_info) const
{
if (destination_id)
{
@ -187,7 +191,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(ContextPtr loc
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->getQueryProcessingStage(local_context, to_stage, query_info);
return destination->getQueryProcessingStage(local_context, to_stage, destination->getInMemoryMetadataPtr(), query_info);
}
return QueryProcessingStage::FetchColumns;

View File

@ -58,7 +58,8 @@ public:
std::string getName() const override { return "Buffer"; }
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,

View File

@ -459,10 +459,12 @@ StorageDistributed::StorageDistributed(
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
ContextPtr local_context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info) const
{
const auto & settings = local_context->getSettingsRef();
auto metadata_snapshot = getInMemoryMetadataPtr();
ClusterPtr cluster = getCluster();
query_info.cluster = cluster;

View File

@ -55,7 +55,8 @@ public:
bool isRemote() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,

View File

@ -119,9 +119,12 @@ StorageMaterializedView::StorageMaterializedView(
}
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(
ContextPtr local_context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
SelectQueryInfo & query_info) const
{
return getTargetTable()->getQueryProcessingStage(local_context, to_stage, query_info);
return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getInMemoryMetadataPtr(), query_info);
}
Pipe StorageMaterializedView::read(

View File

@ -66,7 +66,8 @@ public:
void shutdown() override;
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;

View File

@ -179,8 +179,11 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, Cont
}
QueryProcessingStage::Enum
StorageMerge::getQueryProcessingStage(ContextPtr local_context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & query_info) const
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
SelectQueryInfo & query_info) const
{
/// In case of JOIN the first stage (which includes JOIN)
/// should be done on the initiator always.
@ -204,7 +207,9 @@ StorageMerge::getQueryProcessingStage(ContextPtr local_context, QueryProcessingS
if (table && table.get() != this)
{
++selected_table_size;
stage_in_source_tables = std::max(stage_in_source_tables, table->getQueryProcessingStage(local_context, to_stage, query_info));
stage_in_source_tables = std::max(
stage_in_source_tables,
table->getQueryProcessingStage(local_context, to_stage, table->getInMemoryMetadataPtr(), query_info));
}
iterator->next();
@ -352,7 +357,8 @@ Pipe StorageMerge::createSources(
return pipe;
}
auto storage_stage = storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, modified_query_info);
auto storage_stage
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, false, metadata_snapshot, modified_query_info);
if (processed_stage <= storage_stage)
{
/// If there are only virtual columns in query, you must request at least one other column.

View File

@ -27,7 +27,8 @@ public:
bool supportsIndexForIn() const override { return true; }
bool supportsSubcolumns() const override { return true; }
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,

View File

@ -32,9 +32,12 @@ public:
NamesAndTypesList getVirtuals() const override { return getNested()->getVirtuals(); }
QueryProcessingStage::Enum getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, SelectQueryInfo & ast) const override
ContextPtr context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
SelectQueryInfo & info) const override
{
return getNested()->getQueryProcessingStage(context, to_stage, ast);
return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getInMemoryMetadataPtr(), info);
}
BlockInputStreams watch(

View File

@ -140,7 +140,7 @@ Pipe StorageS3Cluster::read(
}
QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, SelectQueryInfo &) const
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageMetadataPtr &, SelectQueryInfo &) const
{
/// Initiator executes query on remote node.
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)

View File

@ -36,7 +36,8 @@ public:
Pipe read(const Names &, const StorageMetadataPtr &, SelectQueryInfo &,
ContextPtr, QueryProcessingStage::Enum, size_t /*max_block_size*/, unsigned /*num_streams*/) override;
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, SelectQueryInfo &) const override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
NamesAndTypesList getVirtuals() const override;

View File

@ -118,7 +118,7 @@ std::string readData(DB::StoragePtr & table, const DB::ContextPtr context)
SelectQueryInfo query_info;
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
context, QueryProcessingStage::Complete, metadata_snapshot, query_info);
QueryPipeline pipeline;
pipeline.init(table->read(column_names, metadata_snapshot, query_info, context, stage, 8192, 1));