diff --git a/src/Interpreters/IInterpreter.h b/src/Interpreters/IInterpreter.h index 2425d015e56..63f36f1b18a 100644 --- a/src/Interpreters/IInterpreter.h +++ b/src/Interpreters/IInterpreter.h @@ -22,10 +22,6 @@ public: */ virtual BlockIO execute() = 0; - virtual QueryPipeline executeWithProcessors() { throw Exception("executeWithProcessors not implemented", ErrorCodes::NOT_IMPLEMENTED); } - - virtual bool canExecuteWithProcessors() const { return false; } - virtual bool ignoreQuota() const { return false; } virtual bool ignoreLimits() const { return false; } diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 4f717eda706..3af128ffb00 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1,39 +1,15 @@ #include -#include -#include -#include -#include -#include -#include -#include #include #include -#include -#include -#include -#include -#include -#include -#include #include #include -#include -#include -#include -#include -#include #include -#include -#include -#include - #include #include #include #include #include #include -#include #include #include @@ -59,7 +35,6 @@ #include #include -#include #include #include @@ -68,7 +43,6 @@ #include #include #include -#include #include #include #include @@ -98,7 +72,7 @@ #include #include #include -#include +#include namespace DB @@ -465,38 +439,13 @@ Block InterpreterSelectQuery::getSampleBlock() BlockIO InterpreterSelectQuery::execute() { - Pipeline pipeline; BlockIO res; - executeImpl(pipeline, input, std::move(input_pipe), res.pipeline); - executeUnion(pipeline, getSampleBlock()); - - res.in = pipeline.firstStream(); + executeImpl(res.pipeline, input, std::move(input_pipe)); res.pipeline.addInterpreterContext(context); res.pipeline.addStorageHolder(storage); return res; } -BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline) -{ - ///FIXME pipeline must be alive until query is finished - Pipeline pipeline; - executeImpl(pipeline, input, std::move(input_pipe), parent_pipeline); - unifyStreams(pipeline, getSampleBlock()); - parent_pipeline.addInterpreterContext(context); - parent_pipeline.addStorageHolder(storage); - return pipeline.streams; -} - -QueryPipeline InterpreterSelectQuery::executeWithProcessors() -{ - QueryPipeline query_pipeline; - executeImpl(query_pipeline, input, std::move(input_pipe), query_pipeline); - query_pipeline.addInterpreterContext(context); - query_pipeline.addStorageHolder(storage); - return query_pipeline; -} - - Block InterpreterSelectQuery::getSampleBlockImpl() { if (storage && !options.only_analyze) @@ -687,9 +636,7 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c return 0; } - -template -void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe, QueryPipeline & save_context_and_storage) +void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe) { /** Streams of data. When the query is executed in parallel, we have several data streams. * If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then @@ -701,8 +648,6 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS * then perform the remaining operations with one resulting stream. */ - constexpr bool pipeline_with_processors = std::is_same::value; - /// Now we will compose block streams that perform the necessary actions. auto & query = getSelectQuery(); const Settings & settings = context->getSettingsRef(); @@ -712,40 +657,27 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (options.only_analyze) { - if constexpr (pipeline_with_processors) - pipeline.init(Pipe(std::make_shared(source_header))); - else - pipeline.streams.emplace_back(std::make_shared(source_header)); + pipeline.init(Pipe(std::make_shared(source_header))); if (expressions.prewhere_info) { - if constexpr (pipeline_with_processors) - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared( - header, - expressions.prewhere_info->prewhere_actions, - expressions.prewhere_info->prewhere_column_name, - expressions.prewhere_info->remove_prewhere_column); - }); - else - pipeline.streams.back() = std::make_shared( - pipeline.streams.back(), expressions.prewhere_info->prewhere_actions, - expressions.prewhere_info->prewhere_column_name, expressions.prewhere_info->remove_prewhere_column); + pipeline.addSimpleTransform([&](const Block & header) + { + return std::make_shared( + header, + expressions.prewhere_info->prewhere_actions, + expressions.prewhere_info->prewhere_column_name, + expressions.prewhere_info->remove_prewhere_column); + }); // To remove additional columns in dry run // For example, sample column which can be removed in this stage if (expressions.prewhere_info->remove_columns_actions) { - if constexpr (pipeline_with_processors) + pipeline.addSimpleTransform([&](const Block & header) { - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, expressions.prewhere_info->remove_columns_actions); - }); - } - else - pipeline.streams.back() = std::make_shared(pipeline.streams.back(), expressions.prewhere_info->remove_columns_actions); + return std::make_shared(header, expressions.prewhere_info->remove_columns_actions); + }); } } } @@ -753,17 +685,11 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS { if (prepared_input) { - if constexpr (pipeline_with_processors) - pipeline.init(Pipe(std::make_shared(prepared_input))); - else - pipeline.streams.push_back(prepared_input); + pipeline.init(Pipe(std::make_shared(prepared_input))); } else if (prepared_pipe) { - if constexpr (pipeline_with_processors) - pipeline.init(std::move(*prepared_pipe)); - else - pipeline.streams.push_back(std::make_shared(std::move(*prepared_pipe))); + pipeline.init(std::move(*prepared_pipe)); } if (from_stage == QueryProcessingStage::WithMergeableState && @@ -774,7 +700,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE); /** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */ - executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere, save_context_and_storage); + executeFetchColumns(from_stage, pipeline, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere); LOG_TRACE(log, "{} -> {}", QueryProcessingStage::toString(from_stage), QueryProcessingStage::toString(options.to_stage)); } @@ -817,12 +743,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS } if (query.limitLength()) - { - if constexpr (pipeline_with_processors) - executePreLimit(pipeline, true); - else - executePreLimit(pipeline); - } + executePreLimit(pipeline, true); } }; @@ -840,31 +761,17 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS { if (expressions.hasFilter()) { - if constexpr (pipeline_with_processors) + pipeline.addSimpleTransform([&](const Block & block, QueryPipeline::StreamType stream_type) -> ProcessorPtr { - pipeline.addSimpleTransform([&](const Block & block, QueryPipeline::StreamType stream_type) -> ProcessorPtr - { - bool on_totals = stream_type == QueryPipeline::StreamType::Totals; + bool on_totals = stream_type == QueryPipeline::StreamType::Totals; - return std::make_shared( - block, - expressions.filter_info->actions, - expressions.filter_info->column_name, - expressions.filter_info->do_remove_column, - on_totals); - }); - } - else - { - pipeline.transform([&](auto & stream) - { - stream = std::make_shared( - stream, - expressions.filter_info->actions, - expressions.filter_info->column_name, - expressions.filter_info->do_remove_column); - }); - } + return std::make_shared( + block, + expressions.filter_info->actions, + expressions.filter_info->column_name, + expressions.filter_info->do_remove_column, + on_totals); + }); } if (expressions.hasJoin()) @@ -872,59 +779,43 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS Block join_result_sample; JoinPtr join = expressions.before_join->getTableJoinAlgo(); - if constexpr (pipeline_with_processors) + join_result_sample = ExpressionBlockInputStream( + std::make_shared(pipeline.getHeader()), expressions.before_join).getHeader(); + + /// In case joined subquery has totals, and we don't, add default chunk to totals. + bool default_totals = false; + if (!pipeline.hasTotals()) { - join_result_sample = ExpressionBlockInputStream( - std::make_shared(pipeline.getHeader()), expressions.before_join).getHeader(); - - /// In case joined subquery has totals, and we don't, add default chunk to totals. - bool default_totals = false; - if (!pipeline.hasTotals()) - { - pipeline.addDefaultTotals(); - default_totals = true; - } - - bool inflating_join = false; - if (join) - { - inflating_join = true; - if (auto * hash_join = typeid_cast(join.get())) - inflating_join = isCross(hash_join->getKind()); - } - - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType type) - { - bool on_totals = type == QueryPipeline::StreamType::Totals; - std::shared_ptr ret; - if (inflating_join) - ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); - else - ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); - - return ret; - }); + pipeline.addDefaultTotals(); + default_totals = true; } - else + + bool inflating_join = false; + if (join) { - /// Applies to all sources except stream_with_non_joined_data. - for (auto & stream : pipeline.streams) - stream = std::make_shared(stream, expressions.before_join); - - join_result_sample = pipeline.firstStream()->getHeader(); + inflating_join = true; + if (auto * hash_join = typeid_cast(join.get())) + inflating_join = isCross(hash_join->getKind()); } + pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType type) + { + bool on_totals = type == QueryPipeline::StreamType::Totals; + std::shared_ptr ret; + if (inflating_join) + ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); + else + ret = std::make_shared(header, expressions.before_join, on_totals, default_totals); + + return ret; + }); + if (join) { if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size)) { - if constexpr (pipeline_with_processors) - { - auto source = std::make_shared(std::move(stream)); - pipeline.addDelayedStream(source); - } - else - pipeline.stream_with_non_joined_data = std::move(stream); + auto source = std::make_shared(std::move(stream)); + pipeline.addDelayedStream(source); } } } @@ -1009,28 +900,14 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.hasLimitBy() && !settings.extremes) { - if constexpr (pipeline_with_processors) - executePreLimit(pipeline, false); - else - executePreLimit(pipeline); - + executePreLimit(pipeline, false); has_prelimit = true; } - bool need_merge_streams = need_second_distinct_pass || query.limitBy() - || (!pipeline_with_processors && query.limitLength()); /// Don't merge streams for pre-limit more. - - if constexpr (!pipeline_with_processors) - if (pipeline.hasDelayedStream()) - need_merge_streams = true; + bool need_merge_streams = need_second_distinct_pass || query.limitBy(); if (need_merge_streams) - { - if constexpr (pipeline_with_processors) - pipeline.resize(1); - else - executeUnion(pipeline, {}); - } + pipeline.resize(1); /** If there was more than one stream, * then DISTINCT needs to be performed once again after merging all streams. @@ -1054,7 +931,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS */ executeExtremes(pipeline); - if (!(pipeline_with_processors && has_prelimit)) /// Limit is no longer needed if there is prelimit. + if (!has_prelimit) /// Limit is no longer needed if there is prelimit. executeLimit(pipeline); executeOffset(pipeline); @@ -1065,14 +942,10 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS executeSubqueriesInSetsAndJoins(pipeline, subqueries_for_sets); } -template void InterpreterSelectQuery::executeFetchColumns( - QueryProcessingStage::Enum processing_stage, TPipeline & pipeline, - const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere, - QueryPipeline & save_context_and_storage) + QueryProcessingStage::Enum processing_stage, QueryPipeline & pipeline, + const PrewhereInfoPtr & prewhere_info, const Names & columns_to_remove_after_prewhere) { - constexpr bool pipeline_with_processors = std::is_same::value; - auto & query = getSelectQuery(); const Settings & settings = context->getSettingsRef(); @@ -1123,13 +996,13 @@ void InterpreterSelectQuery::executeFetchColumns( argument_types[j] = header.getByName(desc->argument_names[j]).type; Block block_with_count{ - {std::move(column), std::make_shared(func, argument_types, desc->parameters), desc->column_name}}; + {nullptr, std::make_shared(func, argument_types, desc->parameters), desc->column_name}}; - auto istream = std::make_shared(block_with_count); - if constexpr (pipeline_with_processors) - pipeline.init(Pipe(std::make_shared(istream))); - else - pipeline.streams.emplace_back(istream); + Chunk chunk(Columns(), column->size()); + chunk.addColumn(std::move(column)); + + auto source = std::make_shared(std::move(block_with_count), std::move(chunk)); + pipeline.init(Pipe(std::move(source))); from_stage = QueryProcessingStage::WithMergeableState; analysis_result.first_stage = false; return; @@ -1266,7 +1139,7 @@ void InterpreterSelectQuery::executeFetchColumns( /// Remove columns which will be added by prewhere. required_columns.erase(std::remove_if(required_columns.begin(), required_columns.end(), [&](const String & name) { - return !!required_columns_after_prewhere_set.count(name); + return required_columns_after_prewhere_set.count(name) != 0; }), required_columns.end()); if (prewhere_info) @@ -1377,11 +1250,7 @@ void InterpreterSelectQuery::executeFetchColumns( interpreter_subquery->ignoreWithTotals(); } - if constexpr (pipeline_with_processors) - /// Just use pipeline from subquery. - pipeline = interpreter_subquery->executeWithProcessors(); - else - pipeline.streams = interpreter_subquery->executeWithMultipleStreams(save_context_and_storage); + pipeline = interpreter_subquery->execute().pipeline; } else if (storage) { @@ -1411,48 +1280,9 @@ void InterpreterSelectQuery::executeFetchColumns( query_info.input_sorting_info = query_info.order_by_optimizer->getInputOrder(storage); } + Pipes pipes = storage->read(required_columns, query_info, *context, processing_stage, max_block_size, max_streams); - BlockInputStreams streams; - Pipes pipes; - - if (pipeline_with_processors) - pipes = storage->read(required_columns, query_info, *context, processing_stage, max_block_size, max_streams); - else - streams = storage->readStreams(required_columns, query_info, *context, processing_stage, max_block_size, max_streams); - - if (streams.empty() && !pipeline_with_processors) - { - streams = {std::make_shared(storage->getSampleBlockForColumns(required_columns))}; - - if (query_info.prewhere_info) - { - if (query_info.prewhere_info->alias_actions) - { - streams.back() = std::make_shared( - streams.back(), - query_info.prewhere_info->alias_actions); - } - - streams.back() = std::make_shared( - streams.back(), - prewhere_info->prewhere_actions, - prewhere_info->prewhere_column_name, - prewhere_info->remove_prewhere_column); - - // To remove additional columns - // In some cases, we did not read any marks so that the pipeline.streams is empty - // Thus, some columns in prewhere are not removed as expected - // This leads to mismatched header in distributed table - if (query_info.prewhere_info->remove_columns_actions) - { - streams.back() = std::make_shared(streams.back(), query_info.prewhere_info->remove_columns_actions); - } - } - } - - /// Copy-paste from prev if. - /// Code is temporarily copy-pasted while moving to new pipeline. - if (pipes.empty() && pipeline_with_processors) + if (pipes.empty()) { Pipe pipe(std::make_shared(storage->getSampleBlockForColumns(required_columns))); @@ -1468,6 +1298,10 @@ void InterpreterSelectQuery::executeFetchColumns( prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column)); + // To remove additional columns + // In some cases, we did not read any marks so that the pipeline.streams is empty + // Thus, some columns in prewhere are not removed as expected + // This leads to mismatched header in distributed table if (query_info.prewhere_info->remove_columns_actions) pipe.addSimpleTransform(std::make_shared(pipe.getHeader(), query_info.prewhere_info->remove_columns_actions)); } @@ -1475,14 +1309,8 @@ void InterpreterSelectQuery::executeFetchColumns( pipes.emplace_back(std::move(pipe)); } - for (auto & stream : streams) - stream->addTableLock(table_lock); - - if constexpr (pipeline_with_processors) - { - /// Table lock is stored inside pipeline here. - pipeline.addTableLock(table_lock); - } + /// Table lock is stored inside pipeline here. + pipeline.addTableLock(table_lock); /// Set the limits and quota for reading data, the speed and time of the query. { @@ -1510,16 +1338,6 @@ void InterpreterSelectQuery::executeFetchColumns( auto quota = context->getQuota(); - for (auto & stream : streams) - { - if (!options.ignore_limits) - stream->setLimits(limits); - - if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete)) - stream->setQuota(quota); - } - - /// Copy-paste for (auto & pipe : pipes) { if (!options.ignore_limits) @@ -1530,47 +1348,13 @@ void InterpreterSelectQuery::executeFetchColumns( } } - if constexpr (pipeline_with_processors) - { - if (streams.size() == 1 || pipes.size() == 1) - pipeline.setMaxThreads(1); + if (pipes.size() == 1) + pipeline.setMaxThreads(1); - /// Unify streams. They must have same headers. - if (streams.size() > 1) - { - /// Unify streams in case they have different headers. - auto first_header = streams.at(0)->getHeader(); + for (auto & pipe : pipes) + pipe.enableQuota(); - if (first_header.columns() > 1 && first_header.has("_dummy")) - first_header.erase("_dummy"); - - for (auto & stream : streams) - { - auto header = stream->getHeader(); - auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name; - if (!blocksHaveEqualStructure(first_header, header)) - stream = std::make_shared(stream, first_header, mode); - } - } - - for (auto & stream : streams) - { - bool force_add_agg_info = processing_stage == QueryProcessingStage::WithMergeableState; - auto source = std::make_shared(stream, force_add_agg_info); - - if (processing_stage == QueryProcessingStage::Complete) - source->addTotalsPort(); - - pipes.emplace_back(std::move(source)); - } - - for (auto & pipe : pipes) - pipe.enableQuota(); - - pipeline.init(std::move(pipes)); - } - else - pipeline.streams = std::move(streams); + pipeline.init(std::move(pipes)); } else throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR); @@ -1578,32 +1362,14 @@ void InterpreterSelectQuery::executeFetchColumns( /// Aliases in table declaration. if (processing_stage == QueryProcessingStage::FetchColumns && alias_actions) { - if constexpr (pipeline_with_processors) + pipeline.addSimpleTransform([&](const Block & header) { - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, alias_actions); - }); - } - else - { - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, alias_actions); - }); - } + return std::make_shared(header, alias_actions); + }); } } -void InterpreterSelectQuery::executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter) -{ - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, expression, getSelectQuery().where()->getColumnName(), remove_filter); - }); -} - void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter) { pipeline.addSimpleTransform([&](const Block & block, QueryPipeline::StreamType stream_type) @@ -1613,69 +1379,6 @@ void InterpreterSelectQuery::executeWhere(QueryPipeline & pipeline, const Expres }); } -void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final) -{ - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, expression); - }); - - Block header = pipeline.firstStream()->getHeader(); - ColumnNumbers keys; - for (const auto & key : query_analyzer->aggregationKeys()) - keys.push_back(header.getPositionByName(key.name)); - - AggregateDescriptions aggregates = query_analyzer->aggregates(); - for (auto & descr : aggregates) - if (descr.arguments.empty()) - for (const auto & name : descr.argument_names) - descr.arguments.push_back(header.getPositionByName(name)); - - const Settings & settings = context->getSettingsRef(); - - /** Two-level aggregation is useful in two cases: - * 1. Parallel aggregation is done, and the results should be merged in parallel. - * 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way. - */ - bool allow_to_use_two_level_group_by = pipeline.streams.size() > 1 || settings.max_bytes_before_external_group_by != 0; - - Aggregator::Params params(header, keys, aggregates, - overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode, - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), - settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); - - /// If there are several sources, then we perform parallel aggregation - if (pipeline.streams.size() > 1) - { - pipeline.firstStream() = std::make_shared( - pipeline.streams, pipeline.stream_with_non_joined_data, params, final, - max_streams, - settings.aggregation_memory_efficient_merge_threads - ? static_cast(settings.aggregation_memory_efficient_merge_threads) - : static_cast(settings.max_threads)); - - pipeline.stream_with_non_joined_data = nullptr; - pipeline.streams.resize(1); - } - else - { - BlockInputStreams inputs; - if (!pipeline.streams.empty()) - inputs.push_back(pipeline.firstStream()); - else - pipeline.streams.resize(1); - - if (pipeline.stream_with_non_joined_data) - inputs.push_back(pipeline.stream_with_non_joined_data); - - pipeline.firstStream() = std::make_shared(std::make_shared(inputs), params, final); - - pipeline.stream_with_non_joined_data = nullptr; - } -} - void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final) { @@ -1749,53 +1452,6 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const } -void InterpreterSelectQuery::executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final) -{ - Block header = pipeline.firstStream()->getHeader(); - - ColumnNumbers keys; - for (const auto & key : query_analyzer->aggregationKeys()) - keys.push_back(header.getPositionByName(key.name)); - - /** There are two modes of distributed aggregation. - * - * 1. In different threads read from the remote servers blocks. - * Save all the blocks in the RAM. Merge blocks. - * If the aggregation is two-level - parallelize to the number of buckets. - * - * 2. In one thread, read blocks from different servers in order. - * RAM stores only one block from each server. - * If the aggregation is a two-level aggregation, we consistently merge the blocks of each next level. - * - * The second option consumes less memory (up to 256 times less) - * in the case of two-level aggregation, which is used for large results after GROUP BY, - * but it can work more slowly. - */ - - const Settings & settings = context->getSettingsRef(); - - Aggregator::Params params(header, keys, query_analyzer->aggregates(), overflow_row, settings.max_threads); - - if (!settings.distributed_aggregation_memory_efficient) - { - /// We union several sources into one, parallelizing the work. - executeUnion(pipeline, {}); - - /// Now merge the aggregated blocks - pipeline.firstStream() = std::make_shared(pipeline.firstStream(), params, final, settings.max_threads); - } - else - { - pipeline.firstStream() = std::make_shared(pipeline.streams, params, final, - max_streams, - settings.aggregation_memory_efficient_merge_threads - ? static_cast(settings.aggregation_memory_efficient_merge_threads) - : static_cast(settings.max_threads)); - - pipeline.streams.resize(1); - } -} - void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bool overflow_row, bool final) { Block header_before_merge = pipeline.getHeader(); @@ -1856,14 +1512,6 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPipeline & pipeline, bo } -void InterpreterSelectQuery::executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression) -{ - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, expression, getSelectQuery().having()->getColumnName()); - }); -} - void InterpreterSelectQuery::executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression) { pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr @@ -1876,22 +1524,6 @@ void InterpreterSelectQuery::executeHaving(QueryPipeline & pipeline, const Expre } -void InterpreterSelectQuery::executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final) -{ - executeUnion(pipeline, {}); - - const Settings & settings = context->getSettingsRef(); - - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), - overflow_row, - expression, - has_having ? getSelectQuery().having()->getColumnName() : "", - settings.totals_mode, - settings.totals_auto_threshold, - final); -} - void InterpreterSelectQuery::executeTotalsAndHaving(QueryPipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final) { const Settings & settings = context->getSettingsRef(); @@ -1905,31 +1537,6 @@ void InterpreterSelectQuery::executeTotalsAndHaving(QueryPipeline & pipeline, bo } -void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificator modificator) -{ - executeUnion(pipeline, {}); - - Block header = pipeline.firstStream()->getHeader(); - - ColumnNumbers keys; - - for (const auto & key : query_analyzer->aggregationKeys()) - keys.push_back(header.getPositionByName(key.name)); - - const Settings & settings = context->getSettingsRef(); - - Aggregator::Params params(header, keys, query_analyzer->aggregates(), - false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, - SettingUInt64(0), SettingUInt64(0), - settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); - - if (modificator == Modificator::ROLLUP) - pipeline.firstStream() = std::make_shared(pipeline.firstStream(), params); - else - pipeline.firstStream() = std::make_shared(pipeline.firstStream(), params); -} - void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator) { pipeline.resize(1); @@ -1964,14 +1571,6 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif } -void InterpreterSelectQuery::executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression) -{ - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, expression); - }); -} - void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression) { pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr @@ -1980,68 +1579,6 @@ void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const E }); } -void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputSortingInfoPtr input_sorting_info) -{ - auto & query = getSelectQuery(); - SortDescription output_order_descr = getSortDescription(query, *context); - const Settings & settings = context->getSettingsRef(); - UInt64 limit = getLimitForSorting(query, *context); - - if (input_sorting_info) - { - /* Case of sorting with optimization using sorting key. - * We have several threads, each of them reads batch of parts in direct - * or reverse order of sorting key using one input stream per part - * and then merge them into one sorted stream. - * At this stage we merge per-thread streams into one. - * If the input is sorted by some prefix of the sorting key required for output, - * we have to finish sorting after the merge. - */ - - bool need_finish_sorting = (input_sorting_info->order_key_prefix_descr.size() < output_order_descr.size()); - - UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit); - executeMergeSorted(pipeline, input_sorting_info->order_key_prefix_descr, limit_for_merging); - - if (need_finish_sorting) - { - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, output_order_descr, limit); - }); - - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), input_sorting_info->order_key_prefix_descr, - output_order_descr, settings.max_block_size, limit); - } - } - else - { - pipeline.transform([&](auto & stream) - { - auto sorting_stream = std::make_shared(stream, output_order_descr, limit); - - /// Limits on sorting - IBlockInputStream::LocalLimits limits; - limits.mode = IBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); - sorting_stream->setLimits(limits); - - auto merging_stream = std::make_shared( - sorting_stream, output_order_descr, settings.max_block_size, limit, - settings.max_bytes_before_remerge_sort, - settings.max_bytes_before_external_sort / pipeline.streams.size(), - context->getTemporaryVolume(), - settings.temporary_files_codec, - settings.min_free_disk_space_for_temporary_data); - - stream = merging_stream; - }); - - /// If there are several streams, we merge them into one - executeMergeSorted(pipeline, output_order_descr, limit); - } -} void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSortingInfoPtr input_sorting_info) { @@ -2138,41 +1675,6 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting } -void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline) -{ - auto & query = getSelectQuery(); - SortDescription order_descr = getSortDescription(query, *context); - UInt64 limit = getLimitForSorting(query, *context); - - /// If there are several streams, then we merge them into one - if (pipeline.hasMoreThanOneStream()) - { - unifyStreams(pipeline, pipeline.firstStream()->getHeader()); - executeMergeSorted(pipeline, order_descr, limit); - } -} - - -void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit) -{ - if (pipeline.hasMoreThanOneStream()) - { - const Settings & settings = context->getSettingsRef(); - - /** MergingSortedBlockInputStream reads the sources sequentially. - * To make the data on the remote servers prepared in parallel, we wrap it in AsynchronousBlockInputStream. - */ - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream); - }); - - pipeline.firstStream() = std::make_shared( - pipeline.streams, sort_description, settings.max_block_size, limit); - pipeline.streams.resize(1); - } -} - void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline) { auto & query = getSelectQuery(); @@ -2202,14 +1704,6 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline, const } -void InterpreterSelectQuery::executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression) -{ - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, expression); - }); -} - void InterpreterSelectQuery::executeProjection(QueryPipeline & pipeline, const ExpressionActionsPtr & expression) { pipeline.addSimpleTransform([&](const Block & header) -> ProcessorPtr @@ -2219,28 +1713,6 @@ void InterpreterSelectQuery::executeProjection(QueryPipeline & pipeline, const E } -void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_order, Names columns) -{ - auto & query = getSelectQuery(); - if (query.distinct) - { - const Settings & settings = context->getSettingsRef(); - - auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context); - UInt64 limit_for_distinct = 0; - - /// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows. - if ((!query.orderBy() || !before_order) && !query.limit_with_ties) - limit_for_distinct = limit_length + limit_offset; - - pipeline.transform([&](auto & stream) - { - SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode); - stream = std::make_shared(stream, limits, limit_for_distinct, columns); - }); - } -} - void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool before_order, Names columns) { auto & query = getSelectQuery(); @@ -2268,51 +1740,6 @@ void InterpreterSelectQuery::executeDistinct(QueryPipeline & pipeline, bool befo } -void InterpreterSelectQuery::executeUnion(Pipeline & pipeline, Block header) -{ - /// If there are still several streams, then we combine them into one - if (pipeline.hasMoreThanOneStream()) - { - if (!header) - header = pipeline.firstStream()->getHeader(); - - unifyStreams(pipeline, std::move(header)); - - pipeline.firstStream() = std::make_shared(pipeline.streams, pipeline.stream_with_non_joined_data, max_streams); - pipeline.stream_with_non_joined_data = nullptr; - pipeline.streams.resize(1); - pipeline.union_stream = true; - } - else if (pipeline.stream_with_non_joined_data) - { - pipeline.streams.push_back(pipeline.stream_with_non_joined_data); - pipeline.stream_with_non_joined_data = nullptr; - } -} - - -/// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined. -void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline) -{ - auto & query = getSelectQuery(); - /// If there is LIMIT - if (query.limitLength()) - { - auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, *context); - SortDescription sort_descr; - if (query.limit_with_ties) - { - if (!query.orderBy()) - throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR); - sort_descr = getSortDescription(query, *context); - } - pipeline.transform([&, limit = limit_length + limit_offset](auto & stream) - { - stream = std::make_shared(stream, limit, 0, false, false, query.limit_with_ties, sort_descr); - }); - } -} - /// Preliminary LIMIT - is used in every source, if there are several sources, before they are combined. void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline, bool do_not_skip_offset) { @@ -2334,24 +1761,6 @@ void InterpreterSelectQuery::executePreLimit(QueryPipeline & pipeline, bool do_n } -void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline) -{ - auto & query = getSelectQuery(); - if (!query.limitByLength() || !query.limitBy()) - return; - - Names columns; - for (const auto & elem : query.limitBy()->children) - columns.emplace_back(elem->getColumnName()); - UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT"); - UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0); - - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, length, offset, columns); - }); -} - void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline) { auto & query = getSelectQuery(); @@ -2400,73 +1809,6 @@ namespace } } -void InterpreterSelectQuery::executeLimit(Pipeline & pipeline) -{ - auto & query = getSelectQuery(); - /// If there is LIMIT - if (query.limitLength()) - { - /** Rare case: - * if there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels, - * then when using LIMIT, you should read the data to the end, rather than cancel the query earlier, - * because if you cancel the query, we will not get `totals` data from the remote server. - * - * Another case: - * if there is WITH TOTALS and there is no ORDER BY, then read the data to the end, - * otherwise TOTALS is counted according to incomplete data. - */ - bool always_read_till_end = false; - - if (query.group_by_with_totals && !query.orderBy()) - always_read_till_end = true; - - if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query)) - always_read_till_end = true; - - SortDescription order_descr; - if (query.limit_with_ties) - { - if (!query.orderBy()) - throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR); - order_descr = getSortDescription(query, *context); - } - - UInt64 limit_length; - UInt64 limit_offset; - std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context); - - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, limit_length, limit_offset, always_read_till_end, false, query.limit_with_ties, order_descr); - }); - } -} - -void InterpreterSelectQuery::executeOffset(Pipeline & /* pipeline */) {} - -void InterpreterSelectQuery::executeWithFill(Pipeline & pipeline) -{ - auto & query = getSelectQuery(); - if (query.orderBy()) - { - SortDescription order_descr = getSortDescription(query, *context); - SortDescription fill_descr; - for (auto & desc : order_descr) - { - if (desc.with_fill) - fill_descr.push_back(desc); - } - - if (fill_descr.empty()) - return; - - pipeline.transform([&](auto & stream) - { - stream = std::make_shared(stream, fill_descr); - }); - } -} - void InterpreterSelectQuery::executeWithFill(QueryPipeline & pipeline) { auto & query = getSelectQuery(); @@ -2557,18 +1899,6 @@ void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline) } } - -void InterpreterSelectQuery::executeExtremes(Pipeline & pipeline) -{ - if (!context->getSettingsRef().extremes) - return; - - pipeline.transform([&](auto & stream) - { - stream->enableExtremes(); - }); -} - void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline) { if (!context->getSettingsRef().extremes) @@ -2577,23 +1907,6 @@ void InterpreterSelectQuery::executeExtremes(QueryPipeline & pipeline) pipeline.addExtremesTransform(); } - -void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const SubqueriesForSets & subqueries_for_sets) -{ - /// Merge streams to one. Use MergeSorting if data was read in sorted order, Union otherwise. - if (query_info.input_sorting_info) - { - if (pipeline.stream_with_non_joined_data) - throw Exception("Using read in order optimization, but has stream with non-joined data in pipeline", ErrorCodes::LOGICAL_ERROR); - executeMergeSorted(pipeline, query_info.input_sorting_info->order_key_prefix_descr, 0); - } - else - executeUnion(pipeline, {}); - - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), subqueries_for_sets, *context); -} - void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pipeline, const SubqueriesForSets & subqueries_for_sets) { if (query_info.input_sorting_info) @@ -2610,25 +1923,6 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pip } -void InterpreterSelectQuery::unifyStreams(Pipeline & pipeline, Block header) -{ - /// Unify streams in case they have different headers. - - /// TODO: remove previous addition of _dummy column. - if (header.columns() > 1 && header.has("_dummy")) - header.erase("_dummy"); - - for (auto & stream : pipeline.streams) - { - auto stream_header = stream->getHeader(); - auto mode = ConvertingBlockInputStream::MatchColumnsMode::Name; - - if (!blocksHaveEqualStructure(header, stream_header)) - stream = std::make_shared(stream, header, mode); - } -} - - void InterpreterSelectQuery::ignoreWithTotals() { getSelectQuery().group_by_with_totals = false; diff --git a/src/Interpreters/InterpreterSelectQuery.h b/src/Interpreters/InterpreterSelectQuery.h index b97ff65e988..ca7fb4c72ba 100644 --- a/src/Interpreters/InterpreterSelectQuery.h +++ b/src/Interpreters/InterpreterSelectQuery.h @@ -77,12 +77,6 @@ public: /// Execute a query. Get the stream of blocks to read. BlockIO execute() override; - /// Execute the query and return multuple streams for parallel processing. - BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline); - - QueryPipeline executeWithProcessors() override; - bool canExecuteWithProcessors() const override { return true; } - bool ignoreLimits() const override { return options.ignore_limits; } bool ignoreQuota() const override { return options.ignore_quota; } @@ -108,89 +102,15 @@ private: Block getSampleBlockImpl(); - struct Pipeline - { - /** Streams of data. - * The source data streams are produced in the executeFetchColumns function. - * Then they are converted (wrapped in other streams) using the `execute*` functions, - * to get the whole pipeline running the query. - */ - BlockInputStreams streams; - - /** When executing FULL or RIGHT JOIN, there will be a data stream from which you can read "not joined" rows. - * It has a special meaning, since reading from it should be done after reading from the main streams. - * It is appended to the main streams in UnionBlockInputStream or ParallelAggregatingBlockInputStream. - */ - BlockInputStreamPtr stream_with_non_joined_data; - bool union_stream = false; - - /// Cache value of InterpreterSelectQuery::max_streams - size_t max_threads = 1; - - BlockInputStreamPtr & firstStream() { return streams.at(0); } - - template - void transform(Transform && transformation) - { - for (auto & stream : streams) - transformation(stream); - - if (stream_with_non_joined_data) - transformation(stream_with_non_joined_data); - } - - bool hasMoreThanOneStream() const - { - return streams.size() + (stream_with_non_joined_data ? 1 : 0) > 1; - } - - /// Resulting stream is mix of other streams data. Distinct and/or order guaranties are broken. - bool hasMixedStreams() const - { - return hasMoreThanOneStream() || union_stream; - } - - bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; } - bool initialized() const { return !streams.empty(); } - - /// Compatibility with QueryPipeline (Processors) - void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; } - size_t getNumThreads() const { return max_threads; } - }; - - template - void executeImpl(TPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe, QueryPipeline & save_context_and_storage); + void executeImpl(QueryPipeline & pipeline, const BlockInputStreamPtr & prepared_input, std::optional prepared_pipe); /// Different stages of query execution. - /// dry_run - don't read from table, use empty header block instead. - void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run); - - template - void executeFetchColumns(QueryProcessingStage::Enum processing_stage, TPipeline & pipeline, + void executeFetchColumns( + QueryProcessingStage::Enum processing_stage, + QueryPipeline & pipeline, const PrewhereInfoPtr & prewhere_info, - const Names & columns_to_remove_after_prewhere, - QueryPipeline & save_context_and_storage); - - void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); - void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); - void executeMergeAggregated(Pipeline & pipeline, bool overflow_row, bool final); - void executeTotalsAndHaving(Pipeline & pipeline, bool has_having, const ExpressionActionsPtr & expression, bool overflow_row, bool final); - void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression); - static void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression); - void executeOrder(Pipeline & pipeline, InputSortingInfoPtr sorting_info); - void executeWithFill(Pipeline & pipeline); - void executeMergeSorted(Pipeline & pipeline); - void executePreLimit(Pipeline & pipeline); - void executeUnion(Pipeline & pipeline, Block header); - void executeLimitBy(Pipeline & pipeline); - void executeLimit(Pipeline & pipeline); - void executeOffset(Pipeline & pipeline); - static void executeProjection(Pipeline & pipeline, const ExpressionActionsPtr & expression); - void executeDistinct(Pipeline & pipeline, bool before_order, Names columns); - void executeExtremes(Pipeline & pipeline); - void executeSubqueriesInSetsAndJoins(Pipeline & pipeline, const std::unordered_map & subqueries_for_sets); - void executeMergeSorted(Pipeline & pipeline, const SortDescription & sort_description, UInt64 limit); + const Names & columns_to_remove_after_prewhere); void executeWhere(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); void executeAggregation(QueryPipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); @@ -213,17 +133,12 @@ private: String generateFilterActions(ExpressionActionsPtr & actions, const ASTPtr & row_policy_filter, const Names & prerequisite_columns = {}) const; - /// Add ConvertingBlockInputStream to specified header. - static void unifyStreams(Pipeline & pipeline, Block header); - enum class Modificator { ROLLUP = 0, CUBE = 1 }; - void executeRollupOrCube(Pipeline & pipeline, Modificator modificator); - void executeRollupOrCube(QueryPipeline & pipeline, Modificator modificator); /** If there is a SETTINGS section in the SELECT query, then apply settings from it. diff --git a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp index 7fe124b31e6..7b86616555a 100644 --- a/src/Interpreters/InterpreterSelectWithUnionQuery.cpp +++ b/src/Interpreters/InterpreterSelectWithUnionQuery.cpp @@ -3,15 +3,9 @@ #include #include #include -#include -#include -#include -#include #include -#include #include #include -#include #include #include @@ -180,69 +174,10 @@ Block InterpreterSelectWithUnionQuery::getSampleBlock( } -BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(QueryPipeline & parent_pipeline) -{ - BlockInputStreams nested_streams; - - for (auto & interpreter : nested_interpreters) - { - BlockInputStreams streams = interpreter->executeWithMultipleStreams(parent_pipeline); - nested_streams.insert(nested_streams.end(), streams.begin(), streams.end()); - } - - /// Unify data structure. - if (nested_interpreters.size() > 1) - { - for (auto & stream : nested_streams) - stream = std::make_shared(stream, result_header,ConvertingBlockInputStream::MatchColumnsMode::Position); - parent_pipeline.addInterpreterContext(context); - } - - /// Update max_streams due to: - /// - max_distributed_connections for Distributed() engine - /// - max_streams_to_max_threads_ratio - /// - /// XXX: res.pipeline.getMaxThreads() cannot be used since it is capped to - /// number of streams, which is empty for non-Processors case. - max_streams = (*std::min_element(nested_interpreters.begin(), nested_interpreters.end(), [](const auto &a, const auto &b) - { - return a->getMaxStreams() < b->getMaxStreams(); - }))->getMaxStreams(); - - return nested_streams; -} - - BlockIO InterpreterSelectWithUnionQuery::execute() { BlockIO res; - BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline); - BlockInputStreamPtr result_stream; - - if (nested_streams.empty()) - { - result_stream = std::make_shared(getSampleBlock()); - } - else if (nested_streams.size() == 1) - { - result_stream = nested_streams.front(); - nested_streams.clear(); - } - else - { - result_stream = std::make_shared(nested_streams, nullptr, max_streams); - nested_streams.clear(); - } - - res.in = result_stream; - res.pipeline.addInterpreterContext(context); - return res; -} - - -QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors() -{ - QueryPipeline main_pipeline; + QueryPipeline & main_pipeline = res.pipeline; std::vector pipelines; bool has_main_pipeline = false; @@ -254,12 +189,12 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors() if (!has_main_pipeline) { has_main_pipeline = true; - main_pipeline = interpreter->executeWithProcessors(); + main_pipeline = interpreter->execute().pipeline; headers.emplace_back(main_pipeline.getHeader()); } else { - pipelines.emplace_back(interpreter->executeWithProcessors()); + pipelines.emplace_back(interpreter->execute().pipeline); headers.emplace_back(pipelines.back().getHeader()); } } @@ -280,7 +215,7 @@ QueryPipeline InterpreterSelectWithUnionQuery::executeWithProcessors() main_pipeline.addInterpreterContext(context); - return main_pipeline; + return res; } diff --git a/src/Interpreters/InterpreterSelectWithUnionQuery.h b/src/Interpreters/InterpreterSelectWithUnionQuery.h index c7a8e09578b..3b5fe533a84 100644 --- a/src/Interpreters/InterpreterSelectWithUnionQuery.h +++ b/src/Interpreters/InterpreterSelectWithUnionQuery.h @@ -29,12 +29,6 @@ public: BlockIO execute() override; - /// Execute the query without union of streams. - BlockInputStreams executeWithMultipleStreams(QueryPipeline & parent_pipeline); - - QueryPipeline executeWithProcessors() override; - bool canExecuteWithProcessors() const override { return true; } - bool ignoreLimits() const override { return options.ignore_limits; } bool ignoreQuota() const override { return options.ignore_quota; } diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 2cc6730b90d..8fc799d0b48 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -280,7 +280,6 @@ static std::tuple executeQueryImpl( /// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion. String query(begin, query_end); BlockIO res; - QueryPipeline & pipeline = res.pipeline; String query_for_logging; @@ -338,7 +337,6 @@ static std::tuple executeQueryImpl( context.resetInputCallbacks(); auto interpreter = InterpreterFactory::get(ast, context, stage); - bool use_processors = interpreter->canExecuteWithProcessors(); std::shared_ptr quota; if (!interpreter->ignoreQuota()) @@ -358,10 +356,9 @@ static std::tuple executeQueryImpl( limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); } - if (use_processors) - pipeline = interpreter->executeWithProcessors(); - else - res = interpreter->execute(); + res = interpreter->execute(); + QueryPipeline & pipeline = res.pipeline; + bool use_processors = pipeline.initialized(); if (res.pipeline.initialized()) use_processors = true; diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 6564fee827e..f9666ed0548 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -397,25 +397,6 @@ void IStorage::checkAlterIsPossible(const AlterCommands & commands, const Settin } } -BlockInputStreams IStorage::readStreams( - const Names & column_names, - const SelectQueryInfo & query_info, - const Context & context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - unsigned num_streams) -{ - ForceTreeShapedPipeline enable_tree_shape(query_info); - auto pipes = read(column_names, query_info, context, processed_stage, max_block_size, num_streams); - - BlockInputStreams res; - res.reserve(pipes.size()); - - for (auto & pipe : pipes) - res.emplace_back(std::make_shared(std::move(pipe))); - - return res; -} StorageID IStorage::getStorageID() const { diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 655ced0d8cf..76a5d72262a 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -303,16 +303,6 @@ public: throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - /** The same as read, but returns BlockInputStreams. - */ - BlockInputStreams readStreams( - const Names & /*column_names*/, - const SelectQueryInfo & /*query_info*/, - const Context & /*context*/, - QueryProcessingStage::Enum /*processed_stage*/, - size_t /*max_block_size*/, - unsigned /*num_streams*/); - /** Writes the data to a table. * Receives a description of the query, which can contain information about the data write method. * Returns an object by which you can write data sequentially. diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 3d7e26d3817..84cf3a32aa1 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -80,28 +80,6 @@ struct SelectQueryInfo /// Prepared sets are used for indices by storage engine. /// Example: x IN (1, 2, 3) PreparedSets sets; - - /// Temporary flag is needed to support old pipeline with input streams. - /// If enabled, then pipeline returned by storage must be a tree. - /// Processors from the tree can't return ExpandPipeline status. - mutable bool force_tree_shaped_pipeline = false; -}; - -/// RAII class to enable force_tree_shaped_pipeline for SelectQueryInfo. -/// Looks awful, but I hope it's temporary. -struct ForceTreeShapedPipeline -{ - explicit ForceTreeShapedPipeline(const SelectQueryInfo & info_) : info(info_) - { - force_tree_shaped_pipeline = info.force_tree_shaped_pipeline; - info.force_tree_shaped_pipeline = true; - } - - ~ForceTreeShapedPipeline() { info.force_tree_shaped_pipeline = force_tree_shaped_pipeline; } - -private: - bool force_tree_shaped_pipeline; - const SelectQueryInfo & info; }; } diff --git a/src/Storages/StorageView.cpp b/src/Storages/StorageView.cpp index 636c7f9d64d..06c0bcacb4c 100644 --- a/src/Storages/StorageView.cpp +++ b/src/Storages/StorageView.cpp @@ -64,16 +64,8 @@ Pipes StorageView::read( QueryPipeline pipeline; InterpreterSelectWithUnionQuery interpreter(current_inner_query, context, {}, column_names); - /// FIXME res may implicitly use some objects owned be pipeline, but them will be destructed after return - if (query_info.force_tree_shaped_pipeline) - { - BlockInputStreams streams = interpreter.executeWithMultipleStreams(pipeline); - for (auto & stream : streams) - pipes.emplace_back(std::make_shared(std::move(stream))); - } - else - /// TODO: support multiple streams here. Need more general interface than pipes. - pipes.emplace_back(interpreter.executeWithProcessors().getPipe()); + /// TODO: support multiple streams here. Need more general interface than pipes. + pipes.emplace_back(interpreter.execute().pipeline.getPipe()); /// It's expected that the columns read from storage are not constant. /// Because method 'getSampleBlockForColumns' is used to obtain a structure of result in InterpreterSelectQuery.