From 93370410fc53af3d54aa74b45e6c6fe6bbef7b1f Mon Sep 17 00:00:00 2001 From: Sariel <1059293451@qq.com> Date: Mon, 29 Apr 2024 19:44:44 +0800 Subject: [PATCH] add loopsource --- src/Processors/QueryPlan/ReadFromLoopStep.cpp | 109 +++++++++++++----- src/Storages/StorageLoop.cpp | 8 -- src/TableFunctions/TableFunctionLoop.cpp | 7 +- 3 files changed, 85 insertions(+), 39 deletions(-) diff --git a/src/Processors/QueryPlan/ReadFromLoopStep.cpp b/src/Processors/QueryPlan/ReadFromLoopStep.cpp index 10932db3f08..79ed10327cd 100644 --- a/src/Processors/QueryPlan/ReadFromLoopStep.cpp +++ b/src/Processors/QueryPlan/ReadFromLoopStep.cpp @@ -2,13 +2,85 @@ #include #include #include +#include #include #include #include +#include +#include +#include +#include namespace DB { +class LoopSource : public ISource +{ +public: + + LoopSource( + const Names & column_names_, + const SelectQueryInfo & query_info_, + const StorageSnapshotPtr & storage_snapshot_, + ContextPtr & context_, + QueryProcessingStage::Enum processed_stage_, + StoragePtr inner_storage_, + size_t max_block_size_, + size_t num_streams_) + : ISource(storage_snapshot_->getSampleBlockForColumns(column_names_)) + , column_names(column_names_) + , query_info(query_info_) + , storage_snapshot(storage_snapshot_) + , processed_stage(processed_stage_) + , context(context_) + , inner_storage(std::move(inner_storage_)) + , max_block_size(max_block_size_) + , num_streams(num_streams_) + { + } + + String getName() const override { return "Loop"; } + + Chunk generate() override + { + QueryPlan plan; + inner_storage->read( + plan, + column_names, + storage_snapshot, + query_info, + context, + processed_stage, + max_block_size, + num_streams); + auto builder = plan.buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(context), + BuildQueryPipelineSettings::fromContext(context)); + QueryPipeline query_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + PullingPipelineExecutor executor(query_pipeline); + + Chunk chunk; + while (executor.pull(chunk)) + { + if (chunk) + return chunk; + } + + return {}; + } + +private: + + const Names column_names; + SelectQueryInfo query_info; + const StorageSnapshotPtr storage_snapshot; + QueryProcessingStage::Enum processed_stage; + ContextPtr context; + StoragePtr inner_storage; + size_t max_block_size; + size_t num_streams; +}; + ReadFromLoopStep::ReadFromLoopStep( const Names & column_names_, const SelectQueryInfo & query_info_, @@ -34,36 +106,21 @@ ReadFromLoopStep::ReadFromLoopStep( Pipe ReadFromLoopStep::makePipe() { - Pipes res_pipe; - - for (size_t i = 0; i < 10; ++i) - { - QueryPlan plan; - inner_storage->read( - plan, - column_names, - storage_snapshot, - query_info, - context, - processed_stage, - max_block_size, - num_streams); - auto builder = plan.buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(context), - BuildQueryPipelineSettings::fromContext(context)); - - QueryPlanResourceHolder resources; - auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources); - - res_pipe.emplace_back(std::move(pipe)); - } - - return Pipe::unitePipes(std::move(res_pipe)); + return Pipe(std::make_shared( + column_names, query_info, storage_snapshot, context, processed_stage, inner_storage, max_block_size, num_streams)); } void ReadFromLoopStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { - pipeline.init(makePipe()); + auto pipe = makePipe(); + + if (pipe.empty()) + { + assert(output_stream != std::nullopt); + pipe = Pipe(std::make_shared(output_stream->header)); + } + + pipeline.init(std::move(pipe)); } } diff --git a/src/Storages/StorageLoop.cpp b/src/Storages/StorageLoop.cpp index 935ab8bc401..6a319fc9741 100644 --- a/src/Storages/StorageLoop.cpp +++ b/src/Storages/StorageLoop.cpp @@ -38,14 +38,6 @@ void StorageLoop::read( query_plan.addStep(std::make_unique( column_names, query_info, storage_snapshot, context, processed_stage, inner_storage, max_block_size, num_streams )); - /*inner_storage->read(query_plan, - column_names, - storage_snapshot, - query_info, - context, - processed_stage, - max_block_size, - num_streams);*/ } void registerStorageLoop(StorageFactory & factory) diff --git a/src/TableFunctions/TableFunctionLoop.cpp b/src/TableFunctions/TableFunctionLoop.cpp index bfe0711384d..1a0b2c3552d 100644 --- a/src/TableFunctions/TableFunctionLoop.cpp +++ b/src/TableFunctions/TableFunctionLoop.cpp @@ -93,12 +93,9 @@ void TableFunctionLoop::parseArguments(const ASTPtr & ast_function, ContextPtr c } } -ColumnsDescription TableFunctionLoop::getActualTableStructure(ContextPtr context, bool is_insert_query) const +ColumnsDescription TableFunctionLoop::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const { - auto inner_table_function = TableFunctionFactory::instance().get(inner_table_function_ast, context); - - return inner_table_function->getActualTableStructure(context, is_insert_query); - + return ColumnsDescription(); } StoragePtr TableFunctionLoop::executeImpl(