add loopsource

This commit is contained in:
Sariel 2024-04-29 19:44:44 +08:00
parent f0ca96fddf
commit 93370410fc
3 changed files with 85 additions and 39 deletions

View File

@ -2,13 +2,85 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Storages/IStorage.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPlanResourceHolder.h>
#include <Processors/ISource.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Common/FieldVisitors.h>
namespace DB
{
class LoopSource : public ISource
{
public:
LoopSource(
const Names & column_names_,
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr & context_,
QueryProcessingStage::Enum processed_stage_,
StoragePtr inner_storage_,
size_t max_block_size_,
size_t num_streams_)
: ISource(storage_snapshot_->getSampleBlockForColumns(column_names_))
, column_names(column_names_)
, query_info(query_info_)
, storage_snapshot(storage_snapshot_)
, processed_stage(processed_stage_)
, context(context_)
, inner_storage(std::move(inner_storage_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
}
String getName() const override { return "Loop"; }
Chunk generate() override
{
QueryPlan plan;
inner_storage->read(
plan,
column_names,
storage_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams);
auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
QueryPipeline query_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
PullingPipelineExecutor executor(query_pipeline);
Chunk chunk;
while (executor.pull(chunk))
{
if (chunk)
return chunk;
}
return {};
}
private:
const Names column_names;
SelectQueryInfo query_info;
const StorageSnapshotPtr storage_snapshot;
QueryProcessingStage::Enum processed_stage;
ContextPtr context;
StoragePtr inner_storage;
size_t max_block_size;
size_t num_streams;
};
ReadFromLoopStep::ReadFromLoopStep(
const Names & column_names_,
const SelectQueryInfo & query_info_,
@ -34,36 +106,21 @@ ReadFromLoopStep::ReadFromLoopStep(
Pipe ReadFromLoopStep::makePipe()
{
Pipes res_pipe;
for (size_t i = 0; i < 10; ++i)
{
QueryPlan plan;
inner_storage->read(
plan,
column_names,
storage_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams);
auto builder = plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context));
QueryPlanResourceHolder resources;
auto pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
res_pipe.emplace_back(std::move(pipe));
}
return Pipe::unitePipes(std::move(res_pipe));
return Pipe(std::make_shared<LoopSource>(
column_names, query_info, storage_snapshot, context, processed_stage, inner_storage, max_block_size, num_streams));
}
void ReadFromLoopStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
pipeline.init(makePipe());
auto pipe = makePipe();
if (pipe.empty())
{
assert(output_stream != std::nullopt);
pipe = Pipe(std::make_shared<NullSource>(output_stream->header));
}
pipeline.init(std::move(pipe));
}
}

View File

@ -38,14 +38,6 @@ void StorageLoop::read(
query_plan.addStep(std::make_unique<ReadFromLoopStep>(
column_names, query_info, storage_snapshot, context, processed_stage, inner_storage, max_block_size, num_streams
));
/*inner_storage->read(query_plan,
column_names,
storage_snapshot,
query_info,
context,
processed_stage,
max_block_size,
num_streams);*/
}
void registerStorageLoop(StorageFactory & factory)

View File

@ -93,12 +93,9 @@ void TableFunctionLoop::parseArguments(const ASTPtr & ast_function, ContextPtr c
}
}
ColumnsDescription TableFunctionLoop::getActualTableStructure(ContextPtr context, bool is_insert_query) const
ColumnsDescription TableFunctionLoop::getActualTableStructure(ContextPtr /*context*/, bool /*is_insert_query*/) const
{
auto inner_table_function = TableFunctionFactory::instance().get(inner_table_function_ast, context);
return inner_table_function->getActualTableStructure(context, is_insert_query);
return ColumnsDescription();
}
StoragePtr TableFunctionLoop::executeImpl(