diff --git a/src/Processors/QueryPlan/AddingMissedStep.cpp b/src/Processors/QueryPlan/AddingMissedStep.cpp new file mode 100644 index 00000000000..f2d06c033b1 --- /dev/null +++ b/src/Processors/QueryPlan/AddingMissedStep.cpp @@ -0,0 +1,45 @@ +#include +#include +#include +#include + +namespace DB +{ + +static ITransformingStep::Traits getTraits() +{ + return ITransformingStep::Traits + { + { + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = true, + } + }; +} + +AddingMissedStep::AddingMissedStep( + const DataStream & input_stream_, + Block result_header_, + const ColumnDefaults & column_defaults_, + const Context & context_) + : ITransformingStep(input_stream_, result_header_, getTraits()) + , column_defaults(column_defaults_) + , context(context_) +{ + updateDistinctColumns(output_stream->header, output_stream->distinct_columns); +} + +void AddingMissedStep::transformPipeline(QueryPipeline & pipeline) +{ + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, output_stream->header, column_defaults, context); + }); +} + +} diff --git a/src/Processors/QueryPlan/AddingMissedStep.h b/src/Processors/QueryPlan/AddingMissedStep.h new file mode 100644 index 00000000000..77075a410a5 --- /dev/null +++ b/src/Processors/QueryPlan/AddingMissedStep.h @@ -0,0 +1,28 @@ +#pragma once +#include + +namespace DB +{ + +struct ColumnDefault; +using ColumnDefaults = std::unordered_map; + +/// Convert one block structure to another. See ConvertingTransform. +class AddingMissedStep : public ITransformingStep +{ +public: + AddingMissedStep(const DataStream & input_stream_, + Block result_header_, + const ColumnDefaults & column_defaults_, + const Context & context_); + + String getName() const override { return "AddingMissed"; } + + void transformPipeline(QueryPipeline & pipeline) override; + +private: + const ColumnDefaults column_defaults; + const Context & context; +}; + +} diff --git a/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp index 2a03d1fd82f..5b05ad77d6c 100644 --- a/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp +++ b/src/Processors/QueryPlan/SettingQuotaAndLimitsStep.cpp @@ -59,8 +59,11 @@ void SettingQuotaAndLimitsStep::transformPipeline(QueryPipeline & pipeline) if (quota) pipeline.setQuota(quota); - pipeline.addInterpreterContext(std::move(context)); - pipeline.addStorageHolder(std::move(storage)); + if (context) + pipeline.addInterpreterContext(std::move(context)); + + if (storage) + pipeline.addStorageHolder(std::move(storage)); } } diff --git a/src/Processors/ya.make b/src/Processors/ya.make index cd18ea3deb0..b5afc1ada3a 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -89,6 +89,7 @@ SRCS( printPipeline.cpp QueryPipeline.cpp QueryPlan/AddingDelayedSourceStep.cpp + QueryPlan/AddingMissedStep.cpp QueryPlan/AggregatingStep.cpp QueryPlan/ArrayJoinStep.cpp QueryPlan/ConvertingStep.cpp diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 14f188275e5..162463a4e33 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -22,10 +22,13 @@ #include #include #include -#include +#include #include #include #include +#include +#include +#include namespace ProfileEvents @@ -147,6 +150,21 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context Pipe StorageBuffer::read( + const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + const size_t max_block_size, + const unsigned num_streams) +{ + QueryPlan plan; + read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); +} + +void StorageBuffer::read( + QueryPlan & query_plan, const Names & column_names, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, @@ -155,8 +173,6 @@ Pipe StorageBuffer::read( size_t max_block_size, unsigned num_streams) { - Pipe pipe_from_dst; - if (destination_id) { auto destination = DatabaseCatalog::instance().getTable(destination_id, context); @@ -182,8 +198,8 @@ Pipe StorageBuffer::read( query_info.input_order_info = query_info.order_optimizer->getInputOrder(destination, destination_metadata_snapshot); /// The destination table has the same structure of the requested columns and we can simply read blocks from there. - pipe_from_dst = destination->read( - column_names, destination_metadata_snapshot, query_info, + destination->read( + query_plan, column_names, destination_metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); } else @@ -217,25 +233,45 @@ Pipe StorageBuffer::read( } else { - pipe_from_dst = destination->read( - columns_intersection, destination_metadata_snapshot, query_info, - context, processed_stage, max_block_size, num_streams); + destination->read( + query_plan, columns_intersection, destination_metadata_snapshot, query_info, + context, processed_stage, max_block_size, num_streams); - pipe_from_dst.addSimpleTransform([&](const Block & stream_header) - { - return std::make_shared(stream_header, header_after_adding_defaults, + auto adding_missed = std::make_unique( + query_plan.getCurrentDataStream(), + header_after_adding_defaults, metadata_snapshot->getColumns().getDefaults(), context); - }); - pipe_from_dst.addSimpleTransform([&](const Block & stream_header) - { - return std::make_shared( - stream_header, header, ConvertingTransform::MatchColumnsMode::Name); - }); + adding_missed->setStepDescription("Add columns missing in destination table"); + query_plan.addStep(std::move(adding_missed)); + + auto converting = std::make_unique( + query_plan.getCurrentDataStream(), + header); + + converting->setStepDescription("Convert destination table columns to Buffer table structure"); + query_plan.addStep(std::move(converting)); } } - pipe_from_dst.addTableLock(destination_lock); + if (query_plan.isInitialized()) + { + StreamLocalLimits limits; + SizeLimits leaf_limits; + + /// Add table lock for destination table. + auto adding_limits_and_quota = std::make_unique( + query_plan.getCurrentDataStream(), + destination, + std::move(destination_lock), + limits, + leaf_limits, + nullptr, + nullptr); + + adding_limits_and_quota->setStepDescription("Lock destination table for Buffer"); + query_plan.addStep(std::move(adding_limits_and_quota)); + } } Pipe pipe_from_buffers; @@ -248,49 +284,73 @@ Pipe StorageBuffer::read( pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers)); } - /// Convert pipes from table to structure from buffer. - if (!pipe_from_buffers.empty() && !pipe_from_dst.empty() - && !blocksHaveEqualStructure(pipe_from_buffers.getHeader(), pipe_from_dst.getHeader())) - { - pipe_from_dst.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, - pipe_from_buffers.getHeader(), - ConvertingTransform::MatchColumnsMode::Name); - }); - } + if (pipe_from_buffers.empty()) + return; + + QueryPlan buffers_plan; /** If the sources from the table were processed before some non-initial stage of query execution, * then sources from the buffers must also be wrapped in the processing pipeline before the same stage. */ if (processed_stage > QueryProcessingStage::FetchColumns) - pipe_from_buffers = QueryPipeline::getPipe( - InterpreterSelectQuery(query_info.query, context, std::move(pipe_from_buffers), - SelectQueryOptions(processed_stage)).execute().pipeline); - - if (query_info.prewhere_info) { - pipe_from_buffers.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, query_info.prewhere_info->prewhere_actions, - query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column); - }); - - if (query_info.prewhere_info->alias_actions) + auto interpreter = InterpreterSelectQuery( + query_info.query, context, std::move(pipe_from_buffers), + SelectQueryOptions(processed_stage)); + interpreter.buildQueryPlan(buffers_plan); + } + else + { + if (query_info.prewhere_info) { pipe_from_buffers.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, query_info.prewhere_info->alias_actions); + return std::make_shared( + header, query_info.prewhere_info->prewhere_actions, + query_info.prewhere_info->prewhere_column_name, query_info.prewhere_info->remove_prewhere_column); }); + + if (query_info.prewhere_info->alias_actions) + { + pipe_from_buffers.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, query_info.prewhere_info->alias_actions); + }); + } } + + auto read_from_buffers = std::make_unique(std::move(pipe_from_buffers)); + read_from_buffers->setStepDescription("Read from buffers of Buffer table"); + buffers_plan.addStep(std::move(read_from_buffers)); } - Pipes pipes; - pipes.emplace_back(std::move(pipe_from_dst)); - pipes.emplace_back(std::move(pipe_from_buffers)); - return Pipe::unitePipes(std::move(pipes)); + if (!query_plan.isInitialized()) + { + query_plan = std::move(buffers_plan); + return; + } + + auto result_header = buffers_plan.getCurrentDataStream().header; + + /// Convert structure from table to structure from buffer. + if (!blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header)) + { + auto converting = std::make_unique(query_plan.getCurrentDataStream(), result_header); + query_plan.addStep(std::move(converting)); + } + + DataStreams input_streams; + input_streams.emplace_back(query_plan.getCurrentDataStream()); + input_streams.emplace_back(buffers_plan.getCurrentDataStream()); + + std::vector> plans; + plans.emplace_back(std::make_unique(std::move(query_plan))); + plans.emplace_back(std::make_unique(std::move(buffers_plan))); + query_plan = QueryPlan(); + + auto union_step = std::make_unique(std::move(input_streams), result_header); + union_step->setStepDescription("Unite sources from Buffer table"); + query_plan.unitePlans(std::move(union_step), std::move(plans)); } diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index b18b574ec6c..406e6a51fdb 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -65,6 +65,16 @@ public: size_t max_block_size, unsigned num_streams) override; + void read( + QueryPlan & query_plan, + const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; + bool supportsParallelInsert() const override { return true; } BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override; diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index a2e3fae0951..69669c0b680 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -107,6 +107,21 @@ QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(cons } Pipe StorageMaterializedView::read( + const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + const size_t max_block_size, + const unsigned num_streams) +{ + QueryPlan plan; + read(plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + return QueryPipeline::getPipe(std::move(*plan.buildQueryPipeline())); +} + +void StorageMaterializedView::read( + QueryPlan & query_plan, const Names & column_names, const StorageMetadataPtr & /*metadata_snapshot*/, const SelectQueryInfo & query_info, @@ -122,10 +137,7 @@ Pipe StorageMaterializedView::read( if (query_info.order_optimizer) query_info.input_order_info = query_info.order_optimizer->getInputOrder(storage, metadata_snapshot); - Pipe pipe = storage->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); - pipe.addTableLock(lock); - - return pipe; + storage->read(query_plan, column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); } BlockOutputStreamPtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) diff --git a/src/Storages/StorageMaterializedView.h b/src/Storages/StorageMaterializedView.h index 1ee4246c7f1..b7e60649601 100644 --- a/src/Storages/StorageMaterializedView.h +++ b/src/Storages/StorageMaterializedView.h @@ -80,6 +80,16 @@ public: size_t max_block_size, unsigned num_streams) override; + void read( + QueryPlan & query_plan, + const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; + Strings getDataPaths() const override; private: diff --git a/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.sql b/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.sql index e7174c5b56b..cb572cc542f 100644 --- a/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.sql +++ b/tests/queries/0_stateless/00184_shard_distributed_group_by_no_merge.sql @@ -10,9 +10,9 @@ SET max_threads=1; SET optimize_move_functions_out_of_any=0; SELECT 'LIMIT'; -SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1 SETTINGS distributed_group_by_no_merge=2; +SELECT * FROM (SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1 ) ORDER BY shard_num SETTINGS distributed_group_by_no_merge=2; SELECT 'OFFSET'; -SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1, 1 SETTINGS distributed_group_by_no_merge=2; +SELECT * FROM (SELECT any(_shard_num) shard_num, count(), uniq(dummy) FROM remote('127.0.0.{2,3}', system.one) LIMIT 1, 1) ORDER BY shard_num SETTINGS distributed_group_by_no_merge=2; SELECT 'ALIAS'; SELECT dummy AS d FROM remote('127.0.0.{2,3}', system.one) ORDER BY d SETTINGS distributed_group_by_no_merge=2;