diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 8c1dc845d26..dc210d7bc33 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -1237,144 +1237,186 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( if (sum_marks > max_marks_to_use_cache) use_uncompressed_cache = false; - Pipe pipe; - - { - Pipes pipes; - - for (const auto & part : parts) - { - auto source_processor = std::make_shared( - data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes, - settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, - use_uncompressed_cache, - query_info.prewhere_info, true, reader_settings, - virt_columns, part.part_index_in_query); - - pipes.emplace_back(std::move(source_processor)); - } - - pipe = Pipe::unitePipes(std::move(pipes)); - } - - /// Drop temporary columns, added by 'sorting_key_expr' - if (!out_projection) - out_projection = createProjection(pipe, data); - - pipe.addSimpleTransform([&metadata_snapshot](const Block & header) - { - return std::make_shared(header, metadata_snapshot->getSortingKey().expression); - }); - - Names sort_columns = metadata_snapshot->getSortingKeyColumns(); - SortDescription sort_description; - size_t sort_columns_size = sort_columns.size(); - sort_description.reserve(sort_columns_size); - - Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; - - Block header = pipe.getHeader(); - for (size_t i = 0; i < sort_columns_size; ++i) - sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); - - auto get_merging_processor = [&]() -> MergingTransformPtr - { - switch (data.merging_params.mode) - { - case MergeTreeData::MergingParams::Ordinary: - { - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, max_block_size); - } - - case MergeTreeData::MergingParams::Collapsing: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, data.merging_params.sign_column, true, max_block_size); - - case MergeTreeData::MergingParams::Summing: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size); - - case MergeTreeData::MergingParams::Aggregating: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, max_block_size); - - case MergeTreeData::MergingParams::Replacing: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, data.merging_params.version_column, max_block_size); - - case MergeTreeData::MergingParams::VersionedCollapsing: - return std::make_shared(header, pipe.numOutputPorts(), - sort_description, data.merging_params.sign_column, max_block_size); - - case MergeTreeData::MergingParams::Graphite: - throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); - } - - __builtin_unreachable(); - }; - if (num_streams > settings.max_final_threads) num_streams = settings.max_final_threads; - if (num_streams <= 1 || sort_description.empty()) + std::vector parts_to_merge_ranges; + auto it = parts.begin(); + parts_to_merge_ranges.push_back(it); + + if (data_settings->do_not_merge_across_partitions_select_final) { - pipe.addTransform(get_merging_processor()); - return pipe; - } - - ColumnNumbers key_columns; - key_columns.reserve(sort_description.size()); - - for (auto & desc : sort_description) - { - if (!desc.column_name.empty()) - key_columns.push_back(header.getPositionByName(desc.column_name)); - else - key_columns.emplace_back(desc.column_number); - } - - pipe.addSimpleTransform([&](const Block & stream_header) - { - return std::make_shared(stream_header, num_streams, key_columns); - }); - - pipe.transform([&](OutputPortRawPtrs ports) - { - Processors processors; - std::vector output_ports; - processors.reserve(ports.size() + num_streams); - output_ports.reserve(ports.size()); - - for (auto & port : ports) + while (it != parts.end()) { - auto copier = std::make_shared(header, num_streams); - connect(*port, copier->getInputPort()); - output_ports.emplace_back(copier->getOutputs().begin()); - processors.emplace_back(std::move(copier)); + it = std::find_if( + it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; }); + parts_to_merge_ranges.push_back(it); } + num_streams /= (parts_to_merge_ranges.size() - 1); + } + else + { + parts_to_merge_ranges.push_back(parts.end()); + } + + Pipes select_and_merge_pipes; + + for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) + { + Pipe pipe; - for (size_t i = 0; i < num_streams; ++i) { - auto merge = get_merging_processor(); - merge->setSelectorPosition(i); - auto input = merge->getInputs().begin(); + Pipes pipes; - /// Connect i-th merge with i-th input port of every copier. - for (size_t j = 0; j < ports.size(); ++j) + for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) { - connect(*output_ports[j], *input); - ++output_ports[j]; - ++input; + LOG_DEBUG(log, "Partition id: {}", part_it->data_part->info.partition_id); + auto source_processor = std::make_shared( + data, + metadata_snapshot, + part_it->data_part, + max_block_size, + settings.preferred_block_size_bytes, + settings.preferred_max_column_in_block_size_bytes, + column_names, + part_it->ranges, + use_uncompressed_cache, + query_info.prewhere_info, + true, + reader_settings, + virt_columns, + part_it->part_index_in_query); + + pipes.emplace_back(std::move(source_processor)); } - processors.emplace_back(std::move(merge)); + pipe = Pipe::unitePipes(std::move(pipes)); } - return processors; - }); + /// Drop temporary columns, added by 'sorting_key_expr' + if (!out_projection) + out_projection = createProjection(pipe, data); - return pipe; + if (std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0) + { + select_and_merge_pipes.emplace_back(std::move(pipe)); + continue; + } + + pipe.addSimpleTransform([&metadata_snapshot](const Block & header) { + return std::make_shared(header, metadata_snapshot->getSortingKey().expression); + }); + + Names sort_columns = metadata_snapshot->getSortingKeyColumns(); + SortDescription sort_description; + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); + + Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; + + Block header = pipe.getHeader(); + for (size_t i = 0; i < sort_columns_size; ++i) + sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); + + auto get_merging_processor = [&]() -> MergingTransformPtr { + switch (data.merging_params.mode) + { + case MergeTreeData::MergingParams::Ordinary: { + return std::make_shared(header, pipe.numOutputPorts(), sort_description, max_block_size); + } + + case MergeTreeData::MergingParams::Collapsing: + return std::make_shared( + header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, true, max_block_size); + + case MergeTreeData::MergingParams::Summing: + return std::make_shared( + header, + pipe.numOutputPorts(), + sort_description, + data.merging_params.columns_to_sum, + partition_key_columns, + max_block_size); + + case MergeTreeData::MergingParams::Aggregating: + return std::make_shared(header, pipe.numOutputPorts(), sort_description, max_block_size); + + case MergeTreeData::MergingParams::Replacing: + return std::make_shared( + header, pipe.numOutputPorts(), sort_description, data.merging_params.version_column, max_block_size); + + case MergeTreeData::MergingParams::VersionedCollapsing: + return std::make_shared( + header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, max_block_size); + + case MergeTreeData::MergingParams::Graphite: + throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); + } + + __builtin_unreachable(); + }; + + if (num_streams <= 1 || sort_description.empty()) + { + pipe.addTransform(get_merging_processor()); + select_and_merge_pipes.emplace_back(std::move(pipe)); + continue; + } + + ColumnNumbers key_columns; + key_columns.reserve(sort_description.size()); + + for (auto & desc : sort_description) + { + if (!desc.column_name.empty()) + key_columns.push_back(header.getPositionByName(desc.column_name)); + else + key_columns.emplace_back(desc.column_number); + } + + pipe.addSimpleTransform([&](const Block & stream_header) { + return std::make_shared(stream_header, num_streams, key_columns); + }); + + pipe.transform([&](OutputPortRawPtrs ports) { + Processors processors; + std::vector output_ports; + processors.reserve(ports.size() + num_streams); + output_ports.reserve(ports.size()); + + LOG_DEBUG(log, "Output ports size: {}", ports.size()); + + for (auto & port : ports) + { + auto copier = std::make_shared(header, num_streams); + connect(*port, copier->getInputPort()); + output_ports.emplace_back(copier->getOutputs().begin()); + processors.emplace_back(std::move(copier)); + } + + for (size_t i = 0; i < num_streams; ++i) + { + auto merge = get_merging_processor(); + merge->setSelectorPosition(i); + auto input = merge->getInputs().begin(); + + /// Connect i-th merge with i-th input port of every copier. + for (size_t j = 0; j < ports.size(); ++j) + { + connect(*output_ports[j], *input); + ++output_ports[j]; + ++input; + } + + processors.emplace_back(std::move(merge)); + } + + return processors; + }); + select_and_merge_pipes.emplace_back(std::move(pipe)); + } + + return Pipe::unitePipes(std::move(select_and_merge_pipes)); } /// Calculates a set of mark ranges, that could possibly contain keys, required by condition. diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 97bc73caf5b..a9bd4248259 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -112,6 +112,8 @@ struct Settings; /** Obsolete settings. Kept for backward compatibility only. */ \ M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \ M(UInt64, check_delay_period, 60, "Obsolete setting, does nothing.", 0) \ + /** Select settings */ \ + M(Bool, do_not_merge_across_partitions_select_final, false, "Merge parts only in one partition in select final", 0) \ /// Settings that should not change after the creation of a table. #define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \