Support reading with QueryPlan for StorageView.

This commit is contained in:
Nikolai Kochetov 2020-09-17 16:22:24 +03:00
parent 193b572a05
commit 0bf4e8e6e9
7 changed files with 164 additions and 1 deletions

View File

@ -99,6 +99,8 @@ public:
void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); }
void setLimits(const StreamLocalLimits & limits) { pipe.setLimits(limits); }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota) { pipe.setQuota(quota); };
/// For compatibility with IBlockInputStream.
void setProgressCallback(const ProgressCallback & callback);

View File

@ -4,7 +4,7 @@
namespace DB
{
/// Convert one block structure to another. See ConvertingTransform.
/// Materialize constants. See MaterializingTransform.
class MaterializingStep : public ITransformingStep
{
public:

View File

@ -0,0 +1,53 @@
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}
SettingQuotaAndLimitsStep::SettingQuotaAndLimitsStep(
const DataStream & input_stream_,
StoragePtr storage_,
TableLockHolder table_lock_,
StreamLocalLimits limits_,
std::shared_ptr<const EnabledQuota> quota_,
std::shared_ptr<Context> context_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, storage(std::move(storage_))
, table_lock(std::move(table_lock_))
, limits(std::move(limits_))
, quota(std::move(quota_))
, context(std::move(context_))
{
}
void SettingQuotaAndLimitsStep::transformPipeline(QueryPipeline & pipeline)
{
/// Table lock is stored inside pipeline here.
pipeline.addTableLock(table_lock);
pipeline.setLimits(limits);
if (quota)
pipeline.setQuota(quota);
pipeline.addInterpreterContext(std::move(context));
pipeline.addStorageHolder(std::move(storage));
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Storages/TableLockHolder.h>
#include <DataStreams/StreamLocalLimits.h>
namespace DB
{
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
class EnabledQuota;
/// Add limits, quota, table_lock and other stuff to pipeline.
/// Doesn't change DataStream.
class SettingQuotaAndLimitsStep : public ITransformingStep
{
public:
SettingQuotaAndLimitsStep(
const DataStream & input_stream_,
StoragePtr storage_,
TableLockHolder table_lock_,
StreamLocalLimits limits_,
std::shared_ptr<const EnabledQuota> quota_,
std::shared_ptr<Context> context_);
String getName() const override { return "SettingQuotaAndLimits"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
StoragePtr storage;
TableLockHolder table_lock;
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota;
std::shared_ptr<Context> context;
};
}

View File

@ -116,6 +116,7 @@ SRCS(
QueryPlan/ReadFromStorageStep.cpp
QueryPlan/ReadNothingStep.cpp
QueryPlan/RollupStep.cpp
QueryPlan/SettingQuotaAndLimitsStep.cpp
QueryPlan/TotalsHavingStep.cpp
QueryPlan/UnionStep.cpp
ResizeProcessor.cpp

View File

@ -16,6 +16,9 @@
#include <Processors/Pipe.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/QueryPlan/MaterializingStep.h>
#include <Processors/QueryPlan/ConvertingStep.h>
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
namespace DB
{
@ -89,6 +92,55 @@ Pipe StorageView::read(
return QueryPipeline::getPipe(std::move(pipeline));
}
void StorageView::read(
QueryPlan & query_plan,
TableLockHolder table_lock,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
std::shared_ptr<const EnabledQuota> quota,
const Names & column_names,
const SelectQueryInfo & query_info,
std::shared_ptr<Context> context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
ASTPtr current_inner_query = metadata_snapshot->getSelectQuery().inner_query;
if (query_info.view_query)
{
if (!query_info.view_query->as<ASTSelectWithUnionQuery>())
throw Exception("Unexpected optimized VIEW query", ErrorCodes::LOGICAL_ERROR);
current_inner_query = query_info.view_query->clone();
}
InterpreterSelectWithUnionQuery interpreter(current_inner_query, *context, {}, column_names);
interpreter.buildQueryPlan(query_plan);
/// It's expected that the columns read from storage are not constant.
/// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.
auto materializing = std::make_unique<MaterializingStep>(query_plan.getCurrentDataStream());
materializing->setStepDescription("Materialize constants after VIEW subquery");
query_plan.addStep(std::move(materializing));
/// And also convert to expected structure.
auto header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
auto converting = std::make_unique<ConvertingStep>(query_plan.getCurrentDataStream(), header);
converting->setStepDescription("Convert VIEW subquery result to VIEW table structure");
query_plan.addStep(std::move(converting));
/// Extend lifetime of context, table lock, storage. Set limits and quota.
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
query_plan.getCurrentDataStream(),
shared_from_this(),
std::move(table_lock),
limits,
std::move(quota),
std::move(context));
adding_limits_and_quota->setStepDescription("Set limits and quota for VIEW subquery");
query_plan.addStep(std::move(adding_limits_and_quota));
}
static ASTTableExpression * getFirstTableExpression(ASTSelectQuery & select_query)
{
auto * select_element = select_query.tables()->children[0]->as<ASTTablesInSelectQueryElement>();

View File

@ -30,6 +30,19 @@ public:
size_t max_block_size,
unsigned num_streams) override;
void read(
QueryPlan & query_plan,
TableLockHolder table_lock,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
std::shared_ptr<const EnabledQuota> quota,
const Names & column_names,
const SelectQueryInfo & query_info,
std::shared_ptr<Context> context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
void replaceWithSubquery(ASTSelectQuery & select_query, ASTPtr & view_name, const StorageMetadataPtr & metadata_snapshot) const
{
replaceWithSubquery(select_query, metadata_snapshot->getSelectQuery().inner_query->clone(), view_name);