Merge pull request #49326 from k-morozov/refactoring/pipe-into-step

move pipe compute into initializePipeline
This commit is contained in:
Nikolai Kochetov 2023-05-03 12:43:21 +02:00 committed by GitHub
commit 6388be2062
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 57 deletions

View File

@ -6,12 +6,14 @@
#include <Interpreters/getColumnFromBlock.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/StorageMemory.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/ISource.h>
#include <Processors/Sources/NullSource.h>
namespace DB
{
@ -93,29 +95,39 @@ private:
InitializerFunc initializer_func;
};
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(Pipe pipe_) :
SourceStepWithFilter(DataStream{.header = pipe_.getHeader()}),
pipe(std::move(pipe_))
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
const size_t num_streams_,
const bool delay_read_for_global_sub_queries_) :
SourceStepWithFilter(DataStream{.header=storage_snapshot_->getSampleBlockForColumns(columns_to_read_)}),
columns_to_read(columns_to_read_),
storage_snapshot(storage_snapshot_),
num_streams(num_streams_),
delay_read_for_global_sub_queries(delay_read_for_global_sub_queries_)
{
}
void ReadFromMemoryStorageStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
// use move - make sure that the call will only be made once.
auto pipe = makePipe();
if (pipe.empty())
{
assert(output_stream != std::nullopt);
pipe = Pipe(std::make_shared<NullSource>(output_stream->header));
}
pipeline.init(std::move(pipe));
}
Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
const bool delay_read_for_global_sub_queries_)
Pipe ReadFromMemoryStorageStep::makePipe()
{
storage_snapshot_->check(columns_to_read_);
storage_snapshot->check(columns_to_read);
const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot_->data);
const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot->data);
auto current_data = snapshot_data.blocks;
if (delay_read_for_global_sub_queries_)
if (delay_read_for_global_sub_queries)
{
/// Note: for global subquery we use single source.
/// Mainly, the reason is that at this point table is empty,
@ -126,8 +138,8 @@ Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
return Pipe(std::make_shared<MemorySource>(
columns_to_read_,
storage_snapshot_,
columns_to_read,
storage_snapshot,
nullptr /* data */,
nullptr /* parallel execution index */,
[current_data](std::shared_ptr<const Blocks> & data_to_initialize)
@ -138,16 +150,16 @@ Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
size_t size = current_data->size();
if (num_streams_ > size)
num_streams_ = size;
if (num_streams > size)
num_streams = size;
Pipes pipes;
auto parallel_execution_index = std::make_shared<std::atomic<size_t>>(0);
for (size_t stream = 0; stream < num_streams_; ++stream)
for (size_t stream = 0; stream < num_streams; ++stream)
{
pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read_, storage_snapshot_, current_data, parallel_execution_index));
pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read, storage_snapshot, current_data, parallel_execution_index));
}
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -5,6 +5,7 @@
#include <Interpreters/TreeRewriter.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -14,7 +15,10 @@ class QueryPipelineBuilder;
class ReadFromMemoryStorageStep final : public SourceStepWithFilter
{
public:
explicit ReadFromMemoryStorageStep(Pipe pipe_);
ReadFromMemoryStorageStep(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
bool delay_read_for_global_sub_queries_);
ReadFromMemoryStorageStep() = delete;
ReadFromMemoryStorageStep(const ReadFromMemoryStorageStep &) = delete;
@ -27,14 +31,15 @@ public:
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
static Pipe makePipe(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
bool delay_read_for_global_sub_queries_);
private:
static constexpr auto name = "ReadFromMemoryStorage";
Pipe pipe;
Names columns_to_read;
StorageSnapshotPtr storage_snapshot;
size_t num_streams;
bool delay_read_for_global_sub_queries;
Pipe makePipe();
};
}

View File

@ -144,7 +144,8 @@ StorageSnapshotPtr StorageMemory::getStorageSnapshot(const StorageMetadataPtr &
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns, std::move(snapshot_data));
}
Pipe StorageMemory::read(
void StorageMemory::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
@ -153,29 +154,7 @@ Pipe StorageMemory::read(
size_t /*max_block_size*/,
size_t num_streams)
{
return ReadFromMemoryStorageStep::makePipe(column_names, storage_snapshot, num_streams, delay_read_for_global_subqueries);
}
void StorageMemory::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
// @TODO it looks like IStorage::readFromPipe. different only step's type.
auto pipe = read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
if (pipe.empty())
{
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
return;
}
auto read_step = std::make_unique<ReadFromMemoryStorageStep>(std::move(pipe));
query_plan.addStep(std::move(read_step));
query_plan.addStep(std::make_unique<ReadFromMemoryStorageStep>(column_names, storage_snapshot, num_streams, delay_read_for_global_subqueries));
}

View File

@ -45,15 +45,6 @@ public:
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,