From 1aeb705b20ef37f0608e6c639dc333b90388bdb4 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 27 May 2021 19:53:58 +0300 Subject: [PATCH] Fix some tests. --- .../QueryPlan/ReadFromMergeTree.cpp | 165 +++++++++++------- src/Processors/QueryPlan/ReadFromMergeTree.h | 8 +- src/Storages/MergeTree/MergeTreeData.cpp | 12 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 20 ++- .../MergeTree/MergeTreeDataSelectExecutor.h | 14 +- src/Storages/StorageReplicatedMergeTree.cpp | 4 +- .../0_stateless/01651_bugs_from_15889.sql | 10 +- .../01861_explain_pipeline.reference | 10 +- 8 files changed, 141 insertions(+), 102 deletions(-) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 6d6ee43acb3..2ad10461613 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -88,9 +88,20 @@ size_t minMarksForConcurrentRead( } +struct ReadFromMergeTree::AnalysisResult +{ + RangesInDataParts parts_with_ranges; + MergeTreeDataSelectSamplingData sampling; + bool sample_factor_column_queried = false; + String query_id; + IndexStats index_stats; + Names column_names_to_read; + ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default; +}; + ReadFromMergeTree::ReadFromMergeTree( const SelectQueryInfo & query_info_, - const PartitionIdToMaxBlock * max_block_numbers_to_read_, + std::shared_ptr max_block_numbers_to_read_, ContextPtr context_, const MergeTreeData & data_, StorageMetadataPtr metadata_snapshot_, @@ -107,7 +118,7 @@ ReadFromMergeTree::ReadFromMergeTree( data_.getPartitionValueType(), virt_column_names_)}) , query_info(std::move(query_info_)) - , max_block_numbers_to_read(max_block_numbers_to_read_) + , max_block_numbers_to_read(std::move(max_block_numbers_to_read_)) , context(std::move(context_)) , data(data_) , metadata_snapshot(std::move(metadata_snapshot_)) @@ -842,26 +853,26 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( return Pipe::unitePipes(std::move(partition_pipes)); } -void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) +ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts) const { - auto parts = std::move(prepared_parts); + AnalysisResult result; + size_t total_parts = parts.size(); auto part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, query_info.query, context); if (part_values && part_values->empty()) - { - pipeline.init(Pipe(std::make_shared(getOutputStream().header))); - return; - } + return result; + + result.column_names_to_read = real_column_names; /// If there are only virtual columns in the query, you must request at least one non-virtual one. - if (real_column_names.empty()) + if (result.column_names_to_read.empty()) { NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical(); - real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns)); + result.column_names_to_read.push_back(ExpressionActions::getSmallestColumn(available_real_columns)); } - metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID()); + metadata_snapshot->check(result.column_names_to_read, data.getVirtuals(), data.getStorageID()); // Build and check if primary key is used when necessary const auto & primary_key = metadata_snapshot->getPrimaryKey(); @@ -881,28 +892,26 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build auto query_context = context->hasQueryContext() ? context->getQueryContext() : context; MergeTreeDataSelectExecutor::filterPartsByPartition( - metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read, log, index_stats); + metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, result.index_stats); - bool sample_factor_column_queried = false; for (const auto & col : virt_column_names) if (col == "_sample_factor") - sample_factor_column_queried = true; + result.sample_factor_column_queried = true; - auto sampling = MergeTreeDataSelectExecutor::getSampling( + result.sampling = MergeTreeDataSelectExecutor::getSampling( select, parts, metadata_snapshot, key_condition, - data, log, sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context); + data, log, result.sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context); - if (sampling.read_nothing) - { - pipeline.init(Pipe(std::make_shared(getOutputStream().header))); - return; - } + if (result.sampling.read_nothing) + return result; size_t total_marks_pk = 0; for (const auto & part : parts) total_marks_pk += part->index_granularity.getMarksCountWithoutFinal(); - auto parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes( + size_t parts_before_pk = parts.size(); + + result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes( std::move(parts), metadata_snapshot, query_info, @@ -911,18 +920,18 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build settings.reader_settings, log, settings.num_streams, - index_stats, + result.index_stats, true); size_t sum_marks_pk = total_marks_pk; - for (const auto & stat : index_stats) + for (const auto & stat : result.index_stats) if (stat.type == IndexType::PrimaryKey) sum_marks_pk = stat.num_granules_after; size_t sum_marks = 0; size_t sum_ranges = 0; - for (const auto & part : parts_with_ranges) + for (const auto & part : result.parts_with_ranges) { sum_ranges += part.ranges.size(); sum_marks += part.getMarksCount(); @@ -931,31 +940,53 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build LOG_DEBUG( log, "Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges", - parts.size(), + parts_before_pk, total_parts, - parts_with_ranges.size(), + result.parts_with_ranges.size(), sum_marks_pk, total_marks_pk, sum_marks, sum_ranges); - String query_id = MergeTreeDataSelectExecutor::checkLimits(data, parts_with_ranges, context); + result.query_id = MergeTreeDataSelectExecutor::checkLimits(data, result.parts_with_ranges, context); - ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size()); + ProfileEvents::increment(ProfileEvents::SelectedParts, result.parts_with_ranges.size()); ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges); ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks); + const auto & input_order_info = query_info.input_order_info + ? query_info.input_order_info + : (query_info.projection ? query_info.projection->input_order_info : nullptr); + + const auto & q_settings = context->getSettingsRef(); + if ((q_settings.optimize_read_in_order || q_settings.optimize_aggregation_in_order) && input_order_info) + result.read_type = (input_order_info->direction > 0) ? ReadType::InOrder + : ReadType::InReverseOrder; + + return result; +} + +void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) +{ + auto result = selectRangesToRead(prepared_parts); + if (result.parts_with_ranges.empty()) + { + pipeline.init(Pipe(std::make_shared(getOutputStream().header))); + return; + } + /// Projection, that needed to drop columns, which have appeared by execution /// of some extra expressions, and to allow execute the same expressions later. /// NOTE: It may lead to double computation of expressions. ActionsDAGPtr result_projection; - Names column_names_to_read = real_column_names; - if (!select.final() && sampling.use_sampling) + Names column_names_to_read = std::move(result.column_names_to_read); + const auto & select = query_info.query->as(); + if (!select.final() && result.sampling.use_sampling) { /// Add columns needed for `sample_by_ast` to `column_names_to_read`. /// Skip this if final was used, because such columns were already added from PK. - std::vector add_columns = sampling.filter_expression->getRequiredColumns().getNames(); + std::vector add_columns = result.sampling.filter_expression->getRequiredColumns().getNames(); column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); std::sort(column_names_to_read.begin(), column_names_to_read.end()); column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), @@ -985,7 +1016,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); pipe = spreadMarkRangesAmongStreamsFinal( - std::move(parts_with_ranges), + std::move(result.parts_with_ranges), column_names_to_read, result_projection); } @@ -999,7 +1030,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActionsDAG(false); pipe = spreadMarkRangesAmongStreamsWithOrder( - std::move(parts_with_ranges), + std::move(result.parts_with_ranges), column_names_to_read, sorting_key_prefix_expr, result_projection, @@ -1008,7 +1039,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build else { pipe = spreadMarkRangesAmongStreams( - std::move(parts_with_ranges), + std::move(result.parts_with_ranges), column_names_to_read); } @@ -1018,15 +1049,15 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build return; } - if (sampling.use_sampling) + if (result.sampling.use_sampling) { - auto sampling_actions = std::make_shared(sampling.filter_expression); + auto sampling_actions = std::make_shared(result.sampling.filter_expression); pipe.addSimpleTransform([&](const Block & header) { return std::make_shared( header, sampling_actions, - sampling.filter_function->getColumnName(), + result.sampling.filter_function->getColumnName(), false); }); } @@ -1041,12 +1072,12 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build } /// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values. - if (sample_factor_column_queried) + if (result.sample_factor_column_queried) { ColumnWithTypeAndName column; column.name = "_sample_factor"; column.type = std::make_shared(); - column.column = column.type->createColumnConst(0, Field(sampling.used_sample_factor)); + column.column = column.type->createColumnConst(0, Field(result.sampling.used_sample_factor)); auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column)); auto adding_column_action = std::make_shared(adding_column_dag); @@ -1073,8 +1104,8 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build processors.emplace_back(processor); // Attach QueryIdHolder if needed - if (!query_id.empty()) - pipe.addQueryIdHolder(std::make_shared(query_id, data)); + if (!result.query_id.empty()) + pipe.addQueryIdHolder(std::make_shared(result.query_id, data)); pipeline.init(std::move(pipe)); } @@ -1098,45 +1129,50 @@ static const char * indexTypeToString(ReadFromMergeTree::IndexType type) __builtin_unreachable(); } -// static const char * readTypeToString(ReadFromMergeTree::ReadType type) -// { -// switch (type) -// { -// case ReadFromMergeTree::ReadType::Default: -// return "Default"; -// case ReadFromMergeTree::ReadType::InOrder: -// return "InOrder"; -// case ReadFromMergeTree::ReadType::InReverseOrder: -// return "InReverseOrder"; -// } +static const char * readTypeToString(ReadFromMergeTree::ReadType type) +{ + switch (type) + { + case ReadFromMergeTree::ReadType::Default: + return "Default"; + case ReadFromMergeTree::ReadType::InOrder: + return "InOrder"; + case ReadFromMergeTree::ReadType::InReverseOrder: + return "InReverseOrder"; + } -// __builtin_unreachable(); -// } + __builtin_unreachable(); +} void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const { + auto result = selectRangesToRead(prepared_parts); std::string prefix(format_settings.offset, format_settings.indent_char); - //format_settings.out << prefix << "ReadType: " << readTypeToString(read_type) << '\n'; + format_settings.out << prefix << "ReadType: " << readTypeToString(result.read_type) << '\n'; - if (!index_stats.empty()) + if (!result.index_stats.empty()) { - format_settings.out << prefix << "Parts: " << index_stats.back().num_parts_after << '\n'; - format_settings.out << prefix << "Granules: " << index_stats.back().num_granules_after << '\n'; + format_settings.out << prefix << "Parts: " << result.index_stats.back().num_parts_after << '\n'; + format_settings.out << prefix << "Granules: " << result.index_stats.back().num_granules_after << '\n'; } } void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const { - //map.add("Read Type", readTypeToString(read_type)); - if (!index_stats.empty()) + auto result = selectRangesToRead(prepared_parts); + map.add("Read Type", readTypeToString(result.read_type)); + if (!result.index_stats.empty()) { - map.add("Parts", index_stats.back().num_parts_after); - map.add("Granules", index_stats.back().num_granules_after); + map.add("Parts", result.index_stats.back().num_parts_after); + map.add("Granules", result.index_stats.back().num_granules_after); } } void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const { + auto result = selectRangesToRead(prepared_parts); + auto index_stats = std::move(result.index_stats); + std::string prefix(format_settings.offset, format_settings.indent_char); if (!index_stats.empty()) { @@ -1186,6 +1222,9 @@ void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const void ReadFromMergeTree::describeIndexes(JSONBuilder::JSONMap & map) const { + auto result = selectRangesToRead(prepared_parts); + auto index_stats = std::move(result.index_stats); + if (!index_stats.empty()) { /// Do not print anything if no indexes is applied. diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index e9341e46770..ef5cc5dc70c 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -72,7 +72,7 @@ public: ReadFromMergeTree( const SelectQueryInfo & query_info_, - const PartitionIdToMaxBlock * max_block_numbers_to_read_, + std::shared_ptr max_block_numbers_to_read_, ContextPtr context_, const MergeTreeData & data_, StorageMetadataPtr metadata_snapshot_, @@ -97,7 +97,7 @@ public: private: SelectQueryInfo query_info; - const PartitionIdToMaxBlock * max_block_numbers_to_read; + std::shared_ptr max_block_numbers_to_read; ContextPtr context; const MergeTreeData & data; StorageMetadataPtr metadata_snapshot; @@ -106,7 +106,6 @@ private: Names real_column_names; MergeTreeData::DataPartsVector prepared_parts; PrewhereInfoPtr prewhere_info; - IndexStats index_stats; Names virt_column_names; Settings settings; @@ -134,6 +133,9 @@ private: RangesInDataParts && parts, const Names & column_names, ActionsDAGPtr & out_projection); + + struct AnalysisResult; + AnalysisResult selectRangesToRead(MergeTreeData::DataPartsVector parts) const; }; } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e16bbb640e2..3f86432e2ae 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3818,7 +3818,7 @@ static void selectBestProjection( const Names & required_columns, ProjectionCandidate & candidate, ContextPtr query_context, - const PartitionIdToMaxBlock * max_added_blocks, + std::shared_ptr max_added_blocks, const Settings & settings, const MergeTreeData::DataPartsVector & parts, ProjectionCandidate *& selected_candidate, @@ -4097,11 +4097,11 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( // First build a MergeTreeDataSelectCache to check if a projection is indeed better than base // query_info.merge_tree_data_select_cache = std::make_unique(); - std::unique_ptr max_added_blocks; + std::shared_ptr max_added_blocks; if (settings.select_sequential_consistency) { if (const StorageReplicatedMergeTree * replicated = dynamic_cast(this)) - max_added_blocks = std::make_unique(replicated->getMaxAddedBlocks()); + max_added_blocks = std::make_shared(replicated->getMaxAddedBlocks()); } auto parts = getDataPartsVector(); @@ -4122,7 +4122,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( analysis_result.required_columns, candidate, query_context, - max_added_blocks.get(), + max_added_blocks, settings, parts, selected_candidate, @@ -4143,7 +4143,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( query_info, // TODO syntax_analysis_result set in index query_context, settings.max_threads, - max_added_blocks.get()); + max_added_blocks); // Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read. // NOTE: It is not clear if we need it. E.g. projections do not support skip index for now. @@ -4160,7 +4160,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( analysis_result.required_columns, candidate, query_context, - max_added_blocks.get(), + max_added_blocks, settings, parts, selected_candidate, diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 8a3550fc511..cb2ead7952a 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -139,7 +139,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( const UInt64 max_block_size, const unsigned num_streams, QueryProcessingStage::Enum processed_stage, - const PartitionIdToMaxBlock * max_block_numbers_to_read) const + std::shared_ptr max_block_numbers_to_read) const { const auto & settings = context->getSettingsRef(); auto parts = data.getDataPartsVector(); @@ -670,8 +670,8 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition( const StorageMetadataPtr & metadata_snapshot, const MergeTreeData & data, const SelectQueryInfo & query_info, - ContextPtr & context, - ContextPtr & query_context, + const ContextPtr & context, + const ContextPtr & query_context, MergeTreeData::DataPartsVector & parts, const std::optional> & part_values, const PartitionIdToMaxBlock * max_block_numbers_to_read, @@ -766,7 +766,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd MergeTreeData::DataPartsVector && parts, StorageMetadataPtr metadata_snapshot, const SelectQueryInfo & query_info, - ContextPtr & context, + const ContextPtr & context, KeyCondition & key_condition, const MergeTreeReaderSettings & reader_settings, Poco::Logger * log, @@ -993,7 +993,10 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd return parts_with_ranges; } -String MergeTreeDataSelectExecutor::checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context) +String MergeTreeDataSelectExecutor::checkLimits( + const MergeTreeData & data, + const RangesInDataParts & parts_with_ranges, + const ContextPtr & context) { const auto & settings = context->getSettingsRef(); // Check limitations. query_id is used as the quota RAII's resource key. @@ -1092,7 +1095,7 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead( const SelectQueryInfo & query_info, ContextPtr context, unsigned num_streams, - const PartitionIdToMaxBlock * max_block_numbers_to_read) const + std::shared_ptr max_block_numbers_to_read) const { size_t total_parts = parts.size(); if (total_parts == 0) @@ -1137,7 +1140,7 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead( ReadFromMergeTree::IndexStats index_stats; filterPartsByPartition( - metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read, log, index_stats); + metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, index_stats); auto sampling = MergeTreeDataSelectExecutor::getSampling( select, parts, metadata_snapshot, key_condition, @@ -1173,7 +1176,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( ContextPtr context, const UInt64 max_block_size, const unsigned num_streams, - const PartitionIdToMaxBlock * max_block_numbers_to_read) const + std::shared_ptr max_block_numbers_to_read) const { size_t total_parts = parts.size(); if (total_parts == 0) @@ -1207,6 +1210,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, //.min_marks_for_concurrent_read = settings.min_marks_for_concurrent_read, .use_uncompressed_cache = settings.use_uncompressed_cache, + .force_primary_key = settings.force_primary_key, .reader_settings = reader_settings, .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), }; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 3e8076de8d3..077584039a1 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -53,7 +53,7 @@ public: UInt64 max_block_size, unsigned num_streams, QueryProcessingStage::Enum processed_stage, - const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const; + std::shared_ptr max_block_numbers_to_read = nullptr) const; size_t estimateNumMarksToRead( MergeTreeData::DataPartsVector parts, @@ -63,7 +63,7 @@ public: const SelectQueryInfo & query_info, ContextPtr context, unsigned num_streams, - const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const; + std::shared_ptr max_block_numbers_to_read = nullptr) const; QueryPlanPtr readFromParts( MergeTreeData::DataPartsVector parts, @@ -74,7 +74,7 @@ public: ContextPtr context, UInt64 max_block_size, unsigned num_streams, - const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const; + std::shared_ptr max_block_numbers_to_read = nullptr) const; private: const MergeTreeData & data; @@ -196,8 +196,8 @@ public: const StorageMetadataPtr & metadata_snapshot, const MergeTreeData & data, const SelectQueryInfo & query_info, - ContextPtr & context, - ContextPtr & query_context, + const ContextPtr & context, + const ContextPtr & query_context, MergeTreeData::DataPartsVector & parts, const std::optional> & part_values, const PartitionIdToMaxBlock * max_block_numbers_to_read, @@ -208,7 +208,7 @@ public: MergeTreeData::DataPartsVector && parts, StorageMetadataPtr metadata_snapshot, const SelectQueryInfo & query_info, - ContextPtr & context, + const ContextPtr & context, KeyCondition & key_condition, const MergeTreeReaderSettings & reader_settings, Poco::Logger * log, @@ -227,7 +227,7 @@ public: NamesAndTypesList available_real_columns, ContextPtr context); - static String checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context); + static String checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, const ContextPtr & context); }; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3adf3026b23..e38589a5143 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4377,9 +4377,9 @@ void StorageReplicatedMergeTree::read( */ if (local_context->getSettingsRef().select_sequential_consistency) { - auto max_added_blocks = getMaxAddedBlocks(); + auto max_added_blocks = std::make_shared(getMaxAddedBlocks()); if (auto plan = reader.read( - column_names, metadata_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, &max_added_blocks)) + column_names, metadata_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, std::move(max_added_blocks))) query_plan = std::move(*plan); return; } diff --git a/tests/queries/0_stateless/01651_bugs_from_15889.sql b/tests/queries/0_stateless/01651_bugs_from_15889.sql index 1fbf669a1b8..2764ed61291 100644 --- a/tests/queries/0_stateless/01651_bugs_from_15889.sql +++ b/tests/queries/0_stateless/01651_bugs_from_15889.sql @@ -8,7 +8,7 @@ INSERT INTO xp SELECT '2020-01-01', number, '' FROM numbers(100000); CREATE TABLE xp_d AS xp ENGINE = Distributed(test_shard_localhost, currentDatabase(), xp); -SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- { serverError 20 } +SELECT count(7 = (SELECT number FROM numbers(0) ORDER BY number ASC NULLS FIRST LIMIT 7)) FROM xp_d PREWHERE toYYYYMM(A) GLOBAL IN (SELECT NULL = (SELECT number FROM numbers(1) ORDER BY number DESC NULLS LAST LIMIT 1), toYYYYMM(min(A)) FROM xp_d) WHERE B > NULL; -- { serverError 8 } SELECT count() FROM xp_d WHERE A GLOBAL IN (SELECT NULL); -- { serverError 53 } @@ -45,7 +45,7 @@ SYSTEM FLUSH LOGS; WITH concat(addressToLine(arrayJoin(trace) AS addr), '#') AS symbol SELECT count() > 7 FROM trace_log AS t -WHERE (query_id = +WHERE (query_id = ( SELECT [NULL, NULL, NULL, NULL, 0.00009999999747378752, NULL, NULL, NULL, NULL, NULL], @@ -60,7 +60,7 @@ WHERE (query_id = WITH addressToSymbol(arrayJoin(trace)) AS symbol SELECT count() > 0 FROM trace_log AS t -WHERE greaterOrEquals(event_date, ignore(ignore(ignore(NULL, '')), 256), yesterday()) AND (trace_type = 'Memory') AND (query_id = +WHERE greaterOrEquals(event_date, ignore(ignore(ignore(NULL, '')), 256), yesterday()) AND (trace_type = 'Memory') AND (query_id = ( SELECT ignore(ignore(ignore(ignore(65536)), ignore(65537), ignore(2)), ''), @@ -82,7 +82,7 @@ WITH ( WHERE current_database = currentDatabase() ORDER BY query_start_time DESC LIMIT 1 - ) AS time_with_microseconds, + ) AS time_with_microseconds, ( SELECT inf, @@ -101,7 +101,7 @@ WITH ( WHERE current_database = currentDatabase() ORDER BY query_start_time DESC LIMIT 1 - ) AS time_with_microseconds, + ) AS time_with_microseconds, ( SELECT query_start_time FROM system.query_log diff --git a/tests/queries/0_stateless/01861_explain_pipeline.reference b/tests/queries/0_stateless/01861_explain_pipeline.reference index 8d755f807c0..9d62fb9f6b8 100644 --- a/tests/queries/0_stateless/01861_explain_pipeline.reference +++ b/tests/queries/0_stateless/01861_explain_pipeline.reference @@ -1,13 +1,10 @@ (Expression) ExpressionTransform (SettingQuotaAndLimits) - (Expression) + (ReadFromMergeTree) ExpressionTransform - (MergingFinal) ReplacingSorted 2 → 1 - (Expression) ExpressionTransform × 2 - (ReadFromMergeTree) MergeTree × 2 0 → 1 0 0 1 1 @@ -19,13 +16,10 @@ ExpressionTransform (Expression) ExpressionTransform × 2 (SettingQuotaAndLimits) - (Expression) + (ReadFromMergeTree) ExpressionTransform × 2 - (MergingFinal) ReplacingSorted × 2 2 → 1 Copy × 2 1 → 2 AddingSelector × 2 - (Expression) ExpressionTransform × 2 - (ReadFromMergeTree) MergeTree × 2 0 → 1