mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
support read for windowview
This commit is contained in:
parent
5877d00388
commit
3dfede3165
@ -37,6 +37,11 @@
|
||||
#include <Processors/Transforms/WatermarkTransform.h>
|
||||
#include <Processors/Transforms/SquashingChunksTransform.h>
|
||||
#include <Processors/Transforms/MaterializingTransform.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
|
||||
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
#include <Processors/Executors/PipelineExecutor.h>
|
||||
#include <Processors/Sinks/EmptySink.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
@ -965,6 +970,73 @@ void StorageWindowView::threadFuncFireEvent()
|
||||
}
|
||||
}
|
||||
|
||||
Pipe StorageWindowView::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr local_context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
QueryPlan plan;
|
||||
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
|
||||
return plan.convertToPipe(
|
||||
QueryPlanOptimizationSettings::fromContext(local_context), BuildQueryPipelineSettings::fromContext(local_context));
|
||||
}
|
||||
|
||||
void StorageWindowView::read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr local_context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const size_t max_block_size,
|
||||
const unsigned num_streams)
|
||||
{
|
||||
auto storage = getTargetStorage();
|
||||
auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
|
||||
auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();
|
||||
auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot, local_context);
|
||||
|
||||
if (query_info.order_optimizer)
|
||||
query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context);
|
||||
|
||||
storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
|
||||
|
||||
if (query_plan.isInitialized())
|
||||
{
|
||||
auto wv_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
|
||||
auto target_header = query_plan.getCurrentDataStream().header;
|
||||
|
||||
if (!blocksHaveEqualStructure(wv_header, target_header))
|
||||
{
|
||||
auto converting_actions = ActionsDAG::makeConvertingActions(
|
||||
target_header.getColumnsWithTypeAndName(), wv_header.getColumnsWithTypeAndName(), ActionsDAG::MatchColumnsMode::Name);
|
||||
auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);
|
||||
converting_step->setStepDescription("Convert Target table structure to WindowView structure");
|
||||
query_plan.addStep(std::move(converting_step));
|
||||
}
|
||||
|
||||
StreamLocalLimits limits;
|
||||
SizeLimits leaf_limits;
|
||||
|
||||
/// Add table lock for target table.
|
||||
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
storage,
|
||||
std::move(lock),
|
||||
limits,
|
||||
leaf_limits,
|
||||
nullptr,
|
||||
nullptr);
|
||||
|
||||
adding_limits_and_quota->setStepDescription("Lock target table for WindowView");
|
||||
query_plan.addStep(std::move(adding_limits_and_quota));
|
||||
}
|
||||
}
|
||||
|
||||
Pipe StorageWindowView::watch(
|
||||
const Names & /*column_names*/,
|
||||
const SelectQueryInfo & query_info,
|
||||
|
@ -137,6 +137,25 @@ public:
|
||||
void startup() override;
|
||||
void shutdown() override;
|
||||
|
||||
Pipe read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
void read(
|
||||
QueryPlan & query_plan,
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
Pipe watch(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
|
Loading…
Reference in New Issue
Block a user