diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp index 8fcc088baa9..396b08bfc02 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp @@ -590,7 +590,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & else if (!candidates.real.empty()) { auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions); - size_t ordinary_reading_marks = ordinary_reading_select_result->marks(); + size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks; /// Nothing to read. Ignore projections. if (ordinary_reading_marks == 0) @@ -599,7 +599,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & return false; } - const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges(); + const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges; /// Selecting best candidate. for (auto & candidate : candidates.real) diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp index 05afc80cba0..6f714d71c7d 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp @@ -142,7 +142,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) MergeTreeDataSelectExecutor reader(reading->getMergeTreeData()); auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions); - size_t ordinary_reading_marks = ordinary_reading_select_result->marks(); + size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks; /// Nothing to read. Ignore projections. if (ordinary_reading_marks == 0) @@ -151,7 +151,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) return false; } - const auto & parts_with_ranges = ordinary_reading_select_result->partsWithRanges(); + const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges; std::shared_ptr max_added_blocks = getMaxAddedBlocks(reading); diff --git a/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp b/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp index c3b3449857b..b5196129c5c 100644 --- a/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp +++ b/src/Processors/QueryPlan/Optimizations/projectionsCommon.cpp @@ -250,23 +250,17 @@ bool analyzeProjectionCandidate( context->getSettingsRef().max_threads, max_added_blocks); - if (projection_result_ptr->error()) - return false; - candidate.merge_tree_projection_select_result_ptr = std::move(projection_result_ptr); - candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->marks(); + candidate.sum_marks += candidate.merge_tree_projection_select_result_ptr->selected_marks; if (!normal_parts.empty()) { /// TODO: We can reuse existing analysis_result by filtering out projection parts auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions)); - if (normal_result_ptr->error()) - return false; - - if (normal_result_ptr->marks() != 0) + if (normal_result_ptr->selected_marks != 0) { - candidate.sum_marks += normal_result_ptr->marks(); + candidate.sum_marks += normal_result_ptr->selected_marks; candidate.merge_tree_ordinary_select_result_ptr = std::move(normal_result_ptr); } } diff --git a/src/Processors/QueryPlan/Optimizations/projectionsCommon.h b/src/Processors/QueryPlan/Optimizations/projectionsCommon.h index 055ca5d4084..1b3b4892cce 100644 --- a/src/Processors/QueryPlan/Optimizations/projectionsCommon.h +++ b/src/Processors/QueryPlan/Optimizations/projectionsCommon.h @@ -1,31 +1,15 @@ #pragma once #include #include +#include namespace DB { -class ReadFromMergeTree; - using PartitionIdToMaxBlock = std::unordered_map; - struct ProjectionDescription; - class MergeTreeDataSelectExecutor; -struct MergeTreeDataSelectAnalysisResult; -using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr; - -class IMergeTreeDataPart; -using DataPartPtr = std::shared_ptr; -using DataPartsVector = std::vector; -struct RangesInDataParts; - -struct StorageInMemoryMetadata; -using StorageMetadataPtr = std::shared_ptr; - -struct SelectQueryInfo; - } namespace DB::QueryPlanOptimizations @@ -61,8 +45,8 @@ struct ProjectionCandidate /// Analysis result, separate for parts with and without projection. /// Analysis is done in order to estimate the number of marks we are going to read. /// For chosen projection, it is reused for reading step. - MergeTreeDataSelectAnalysisResultPtr merge_tree_projection_select_result_ptr; - MergeTreeDataSelectAnalysisResultPtr merge_tree_ordinary_select_result_ptr; + ReadFromMergeTree::AnalysisResultPtr merge_tree_projection_select_result_ptr; + ReadFromMergeTree::AnalysisResultPtr merge_tree_ordinary_select_result_ptr; }; /// This function fills ProjectionCandidate structure for specified projection. diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index f14960bc8d1..139603543f7 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -246,7 +246,7 @@ ReadFromMergeTree::ReadFromMergeTree( bool sample_factor_column_queried_, std::shared_ptr max_block_numbers_to_read_, Poco::Logger * log_, - MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_, + AnalysisResultPtr analyzed_result_ptr_, bool enable_parallel_reading) : SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader( storage_snapshot_->getSampleBlockForColumns(real_column_names_), @@ -1254,7 +1254,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( return merging_pipes.empty() ? Pipe::unitePipes(std::move(no_merging_pipes)) : Pipe::unitePipes(std::move(merging_pipes)); } -MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead( +ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead( MergeTreeData::DataPartsVector parts, std::vector alter_conversions) const { @@ -1454,7 +1454,7 @@ void ReadFromMergeTree::applyFilters() buildIndexes(indexes, filter_actions_dag, data, prepared_parts, context, query_info, metadata_for_reading); } -MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead( +ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead( MergeTreeData::DataPartsVector parts, std::vector alter_conversions, const PrewhereInfoPtr & prewhere_info, @@ -1490,7 +1490,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead( indexes); } -MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl( +ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl( MergeTreeData::DataPartsVector parts, std::vector alter_conversions, const StorageMetadataPtr & metadata_snapshot_base, @@ -1527,27 +1527,26 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl( buildIndexes(indexes, query_info.filter_actions_dag, data, parts, context, query_info, metadata_snapshot); if (indexes->part_values && indexes->part_values->empty()) - return std::make_shared(MergeTreeDataSelectAnalysisResult{.result = std::move(result)}); + return std::make_shared(std::move(result)); if (settings.force_primary_key && indexes->key_condition.alwaysUnknownOrTrue()) { - return std::make_shared(MergeTreeDataSelectAnalysisResult{ - .result = std::make_exception_ptr(Exception( - ErrorCodes::INDEX_NOT_USED, - "Primary key ({}) is not used and setting 'force_primary_key' is set", - fmt::join(primary_key_column_names, ", ")))}); + throw Exception(ErrorCodes::INDEX_NOT_USED, + "Primary key ({}) is not used and setting 'force_primary_key' is set", + fmt::join(primary_key_column_names, ", ")); } + LOG_DEBUG(log, "Key condition: {}", indexes->key_condition.toString()); if (indexes->part_offset_condition) LOG_DEBUG(log, "Part offset condition: {}", indexes->part_offset_condition->toString()); if (indexes->key_condition.alwaysFalse()) - return std::make_shared(MergeTreeDataSelectAnalysisResult{.result = std::move(result)}); + return std::make_shared(std::move(result)); size_t total_marks_pk = 0; size_t parts_before_pk = 0; - try + { MergeTreeDataSelectExecutor::filterPartsByPartition( indexes->partition_pruner, @@ -1574,14 +1573,13 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl( log); if (result.sampling.read_nothing) - return std::make_shared(MergeTreeDataSelectAnalysisResult{.result = std::move(result)}); + return std::make_shared(std::move(result)); for (const auto & part : parts) total_marks_pk += part->index_granularity.getMarksCountWithoutFinal(); parts_before_pk = parts.size(); auto reader_settings = getMergeTreeReaderSettings(context, query_info); - result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes( std::move(parts), std::move(alter_conversions), @@ -1596,10 +1594,6 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl( result.index_stats, indexes->use_skip_indexes); } - catch (...) - { - return std::make_shared(MergeTreeDataSelectAnalysisResult{.result = std::current_exception()}); - } size_t sum_marks_pk = total_marks_pk; for (const auto & stat : result.index_stats) @@ -1631,7 +1625,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToReadImpl( ? ReadType::InOrder : ReadType::InReverseOrder; - return std::make_shared(MergeTreeDataSelectAnalysisResult{.result = std::move(result)}); + return std::make_shared(std::move(result)); } bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t limit) @@ -1764,11 +1758,11 @@ bool ReadFromMergeTree::requestOutputEachPartitionThroughSeparatePort() ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const { - auto result_ptr = analyzed_result_ptr ? analyzed_result_ptr : selectRangesToRead(prepared_parts, alter_conversions_for_parts); - if (std::holds_alternative(result_ptr->result)) - std::rethrow_exception(std::get(result_ptr->result)); + auto result_ptr = analyzed_result_ptr + ? analyzed_result_ptr + : selectRangesToRead(prepared_parts, alter_conversions_for_parts); - return std::get(result_ptr->result); + return *result_ptr; } bool ReadFromMergeTree::isQueryWithFinal() const @@ -2235,33 +2229,5 @@ void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const } } -bool MergeTreeDataSelectAnalysisResult::error() const -{ - return std::holds_alternative(result); -} - -size_t MergeTreeDataSelectAnalysisResult::marks() const -{ - if (std::holds_alternative(result)) - std::rethrow_exception(std::get(result)); - - return std::get(result).selected_marks; -} - -UInt64 MergeTreeDataSelectAnalysisResult::rows() const -{ - if (std::holds_alternative(result)) - std::rethrow_exception(std::get(result)); - - return std::get(result).selected_rows; -} - -const RangesInDataParts & MergeTreeDataSelectAnalysisResult::partsWithRanges() const -{ - if (std::holds_alternative(result)) - std::rethrow_exception(std::get(result)); - - return std::get(result).parts_with_ranges; -} } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 4f9406c4a85..25c2c8373c5 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -55,9 +55,6 @@ struct UsefulSkipIndexes std::vector merged_indices; }; -struct MergeTreeDataSelectAnalysisResult; -using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr; - /// This step is created to read from MergeTree* table. /// For now, it takes a list of parts and creates source from it. class ReadFromMergeTree final : public SourceStepWithFilter @@ -108,6 +105,8 @@ public: void checkLimits(const Settings & settings, const SelectQueryInfo & query_info_) const; }; + using AnalysisResultPtr = std::shared_ptr; + ReadFromMergeTree( MergeTreeData::DataPartsVector parts_, std::vector alter_conversions_, @@ -122,9 +121,8 @@ public: bool sample_factor_column_queried_, std::shared_ptr max_block_numbers_to_read_, Poco::Logger * log_, - MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_, - bool enable_parallel_reading - ); + AnalysisResultPtr analyzed_result_ptr_, + bool enable_parallel_reading); static constexpr auto name = "ReadFromMergeTree"; String getName() const override { return name; } @@ -157,7 +155,7 @@ public: std::optional> part_values; }; - static MergeTreeDataSelectAnalysisResultPtr selectRangesToRead( + static AnalysisResultPtr selectRangesToRead( MergeTreeData::DataPartsVector parts, std::vector alter_conversions, const PrewhereInfoPtr & prewhere_info, @@ -174,7 +172,7 @@ public: Poco::Logger * log, std::optional & indexes); - MergeTreeDataSelectAnalysisResultPtr selectRangesToRead( + AnalysisResultPtr selectRangesToRead( MergeTreeData::DataPartsVector parts, std::vector alter_conversions) const; @@ -196,7 +194,7 @@ public: bool willOutputEachPartitionThroughSeparatePort() const { return output_each_partition_through_separate_port; } bool hasAnalyzedResult() const { return analyzed_result_ptr != nullptr; } - void setAnalyzedResult(MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); } + void setAnalyzedResult(AnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); } const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; } const std::vector & getAlterConvertionsForParts() const { return alter_conversions_for_parts; } @@ -209,7 +207,7 @@ public: void applyFilters() override; private: - static MergeTreeDataSelectAnalysisResultPtr selectRangesToReadImpl( + static AnalysisResultPtr selectRangesToReadImpl( MergeTreeData::DataPartsVector parts, std::vector alter_conversions, const StorageMetadataPtr & metadata_snapshot_base, @@ -294,21 +292,11 @@ private: RangesInDataParts && parts, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection); ReadFromMergeTree::AnalysisResult getAnalysisResult() const; - MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr; + AnalysisResultPtr analyzed_result_ptr; bool is_parallel_reading_from_replicas; std::optional all_ranges_callback; std::optional read_task_callback; }; -struct MergeTreeDataSelectAnalysisResult -{ - std::variant result; - - bool error() const; - size_t marks() const; - UInt64 rows() const; - const RangesInDataParts & partsWithRanges() const; -}; - } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index c8bfebc4919..1373f5bceb4 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6930,7 +6930,7 @@ UInt64 MergeTreeData::estimateNumberOfRowsToRead( query_context, query_context->getSettingsRef().max_threads); - UInt64 total_rows = result_ptr->rows(); + UInt64 total_rows = result_ptr->selected_rows; if (query_info.limit > 0 && query_info.limit < total_rows) total_rows = query_info.limit; return total_rows; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 91519d00cb6..9323126f1a2 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -135,9 +135,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( std::shared_ptr max_block_numbers_to_read, bool enable_parallel_reading) const { - if (query_info.merge_tree_empty_result) - return std::make_unique(); - const auto & snapshot_data = assert_cast(*storage_snapshot->data); const auto & parts = snapshot_data.parts; const auto & alter_conversions = snapshot_data.alter_conversions; @@ -933,7 +930,7 @@ static void selectColumnNames( } } -MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead( +ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead( MergeTreeData::DataPartsVector parts, const PrewhereInfoPtr & prewhere_info, const Names & column_names_to_return, @@ -947,8 +944,7 @@ MergeTreeDataSelectAnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar { size_t total_parts = parts.size(); if (total_parts == 0) - return std::make_shared( - MergeTreeDataSelectAnalysisResult{.result = ReadFromMergeTree::AnalysisResult()}); + return std::make_shared(); Names real_column_names; Names virt_column_names; @@ -989,13 +985,13 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts( const UInt64 max_block_size, const size_t num_streams, std::shared_ptr max_block_numbers_to_read, - MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr, + ReadFromMergeTree::AnalysisResultPtr merge_tree_select_result_ptr, bool enable_parallel_reading) const { /// If merge_tree_select_result_ptr != nullptr, we use analyzed result so parts will always be empty. if (merge_tree_select_result_ptr) { - if (merge_tree_select_result_ptr->marks() == 0) + if (merge_tree_select_result_ptr->selected_marks == 0) return {}; } else if (parts.empty()) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 9d56100a10c..8ba6358e75a 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -48,13 +48,13 @@ public: UInt64 max_block_size, size_t num_streams, std::shared_ptr max_block_numbers_to_read = nullptr, - MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr = nullptr, + ReadFromMergeTree::AnalysisResultPtr merge_tree_select_result_ptr = nullptr, bool enable_parallel_reading = false) const; /// Get an estimation for the number of marks we are going to read. /// Reads nothing. Secondary indexes are not used. /// This method is used to select best projection for table. - MergeTreeDataSelectAnalysisResultPtr estimateNumMarksToRead( + ReadFromMergeTree::AnalysisResultPtr estimateNumMarksToRead( MergeTreeData::DataPartsVector parts, const PrewhereInfoPtr & prewhere_info, const Names & column_names, diff --git a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index 6606e4d738e..bbb38346f38 100644 --- a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -37,7 +37,7 @@ public: } /// Used in queries with projection. - StorageFromMergeTreeDataPart(const MergeTreeData & storage_, MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr_) + StorageFromMergeTreeDataPart(const MergeTreeData & storage_, ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr_) : IStorage(storage_.getStorageID()), storage(storage_), analysis_result_ptr(analysis_result_ptr_) { setInMemoryMetadata(storage.getInMemoryMetadata()); @@ -127,7 +127,7 @@ private: const std::vector alter_conversions; const MergeTreeData & storage; const String partition_id; - const MergeTreeDataSelectAnalysisResultPtr analysis_result_ptr; + const ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr; static StorageID getIDFromPart(const MergeTreeData::DataPartPtr & part_) { diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 69dbb64db38..401607b2cc3 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -42,9 +42,6 @@ using ReadInOrderOptimizerPtr = std::shared_ptr; class Cluster; using ClusterPtr = std::shared_ptr; -struct MergeTreeDataSelectAnalysisResult; -using MergeTreeDataSelectAnalysisResultPtr = std::shared_ptr; - struct PrewhereInfo { /// Actions for row level security filter. Applied separately before prewhere_actions. @@ -214,7 +211,6 @@ struct SelectQueryInfo ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; } - bool merge_tree_empty_result = false; bool settings_limit_offset_done = false; bool is_internal = false; bool parallel_replicas_disabled = false;