From 5bfb15262c1ba2ffb68ec7003edc9a4fa6356b9d Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 15 Jun 2022 17:25:38 +0300 Subject: [PATCH] Revert "More parallel execution for queries with `FINAL` (#36396)" This reverts commit c8afeafe0e974a0070f04a056705fabc75cc998e. --- src/Processors/Merges/IMergingTransform.cpp | 65 +++++ src/Processors/Merges/IMergingTransform.h | 10 + src/Processors/QueryPlan/IQueryPlanStep.cpp | 3 - src/Processors/QueryPlan/PartsSplitter.cpp | 274 ------------------ src/Processors/QueryPlan/PartsSplitter.h | 25 -- .../QueryPlan/ReadFromMergeTree.cpp | 138 +++++---- .../Transforms/AddingSelectorTransform.cpp | 76 +++++ .../Transforms/AddingSelectorTransform.h | 26 ++ .../Transforms/FilterSortedStreamByRange.h | 66 ----- src/Processors/Transforms/FilterTransform.h | 1 + src/Processors/Transforms/SelectorInfo.h | 14 + src/QueryPipeline/printPipeline.h | 5 +- tests/performance/parallel_final.xml | 5 - .../01861_explain_pipeline.reference | 17 +- .../02286_parallel_final.reference | 9 - .../0_stateless/02286_parallel_final.sh | 31 -- 16 files changed, 280 insertions(+), 485 deletions(-) delete mode 100644 src/Processors/QueryPlan/PartsSplitter.cpp delete mode 100644 src/Processors/QueryPlan/PartsSplitter.h create mode 100644 src/Processors/Transforms/AddingSelectorTransform.cpp create mode 100644 src/Processors/Transforms/AddingSelectorTransform.h delete mode 100644 src/Processors/Transforms/FilterSortedStreamByRange.h create mode 100644 src/Processors/Transforms/SelectorInfo.h delete mode 100644 tests/queries/0_stateless/02286_parallel_final.reference delete mode 100755 tests/queries/0_stateless/02286_parallel_final.sh diff --git a/src/Processors/Merges/IMergingTransform.cpp b/src/Processors/Merges/IMergingTransform.cpp index 226f55b3e92..f09c7c5339f 100644 --- a/src/Processors/Merges/IMergingTransform.cpp +++ b/src/Processors/Merges/IMergingTransform.cpp @@ -1,4 +1,5 @@ #include +#include namespace DB { @@ -180,4 +181,68 @@ IProcessor::Status IMergingTransformBase::prepare() return Status::Ready; } +static void filterChunk(IMergingAlgorithm::Input & input, size_t selector_position) +{ + if (!input.chunk.getChunkInfo()) + throw Exception("IMergingTransformBase expected ChunkInfo for input chunk", ErrorCodes::LOGICAL_ERROR); + + const auto * chunk_info = typeid_cast(input.chunk.getChunkInfo().get()); + if (!chunk_info) + throw Exception("IMergingTransformBase expected SelectorInfo for input chunk", ErrorCodes::LOGICAL_ERROR); + + const auto & selector = chunk_info->selector; + + IColumn::Filter filter; + filter.resize_fill(selector.size()); + + size_t num_rows = input.chunk.getNumRows(); + auto columns = input.chunk.detachColumns(); + + size_t num_result_rows = 0; + + for (size_t row = 0; row < num_rows; ++row) + { + if (selector[row] == selector_position) + { + ++num_result_rows; + filter[row] = 1; + } + } + + if (!filter.empty() && filter.back() == 0) + { + filter.back() = 1; + ++num_result_rows; + input.skip_last_row = true; + } + + for (auto & column : columns) + column = column->filter(filter, num_result_rows); + + input.chunk.clear(); + input.chunk.setColumns(std::move(columns), num_result_rows); +} + +void IMergingTransformBase::filterChunks() +{ + if (state.selector_position < 0) + return; + + if (!state.init_chunks.empty()) + { + for (size_t i = 0; i < input_states.size(); ++i) + { + auto & input = state.init_chunks[i]; + if (!input.chunk) + continue; + + filterChunk(input, state.selector_position); + } + } + + if (state.has_input) + filterChunk(state.input_chunk, state.selector_position); +} + + } diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index 144c47c96f5..ea6f6aed37f 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -28,10 +28,17 @@ public: Status prepare() override; + /// Set position which will be used in selector if input chunk has attached SelectorInfo (see SelectorInfo.h). + /// Columns will be filtered, keep only rows labeled with this position. + /// It is used in parallel final. + void setSelectorPosition(size_t position) { state.selector_position = position; } + protected: virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false. virtual void onFinish() {} /// Is called when all data is processed. + void filterChunks(); /// Filter chunks if selector position was set. For parallel final. + /// Processor state. struct State { @@ -43,6 +50,7 @@ protected: size_t next_input_to_read = 0; IMergingAlgorithm::Inputs init_chunks; + ssize_t selector_position = -1; }; State state; @@ -84,6 +92,8 @@ public: void work() override { + filterChunks(); + if (!state.init_chunks.empty()) algorithm.initialize(std::move(state.init_chunks)); diff --git a/src/Processors/QueryPlan/IQueryPlanStep.cpp b/src/Processors/QueryPlan/IQueryPlanStep.cpp index b36d1f0e12f..f06897e8488 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.cpp +++ b/src/Processors/QueryPlan/IQueryPlanStep.cpp @@ -86,9 +86,6 @@ static void doDescribeProcessor(const IProcessor & processor, size_t count, IQue doDescribeHeader(*last_header, num_equal_headers, settings); } - if (!processor.getDescription().empty()) - settings.out << String(settings.offset, settings.indent_char) << "Description: " << processor.getDescription() << '\n'; - settings.offset += settings.indent; } diff --git a/src/Processors/QueryPlan/PartsSplitter.cpp b/src/Processors/QueryPlan/PartsSplitter.cpp deleted file mode 100644 index 25574c6dcc5..00000000000 --- a/src/Processors/QueryPlan/PartsSplitter.cpp +++ /dev/null @@ -1,274 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace DB; - -namespace -{ - -using Value = std::vector; - -std::string toString(const Value & value) -{ - return fmt::format("({})", fmt::join(value, ", ")); -} - -/// Adaptor to access PK values from index. -class IndexAccess -{ -public: - explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_) { } - - Value getValue(size_t part_idx, size_t mark) const - { - const auto & index = parts[part_idx].data_part->index; - Value value(index.size()); - for (size_t i = 0; i < value.size(); ++i) - index[i]->get(mark, value[i]); - return value; - } - - size_t getMarkRows(size_t part_idx, size_t mark) const { return parts[part_idx].data_part->index_granularity.getMarkRows(mark); } - - size_t getTotalRowCount() const - { - size_t total = 0; - for (const auto & part : parts) - total += part.getRowsCount(); - return total; - } - -private: - const RangesInDataParts & parts; -}; - - -/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range. -/// Will try to produce exactly max_layer layers but may return less if data is distributed in not a very parallelizable way. -std::pair, std::vector> split(RangesInDataParts parts, size_t max_layers) -{ - // We will advance the iterator pointing to the mark with the smallest PK value until there will be not less than rows_per_layer rows in the current layer (roughly speaking). - // Then we choose the last observed value as the new border, so the current layer will consists of granules with values greater than the previous mark and less or equal - // than the new border. - - struct PartsRangesIterator - { - struct RangeInDataPart : MarkRange - { - size_t part_idx; - }; - - enum class EventType - { - RangeBeginning, - RangeEnding, - }; - - bool operator<(const PartsRangesIterator & other) const { return std::tie(value, event) > std::tie(other.value, other.event); } - - Value value; - RangeInDataPart range; - EventType event; - }; - - const auto index_access = std::make_unique(parts); - std::priority_queue parts_ranges_queue; - for (size_t part_idx = 0; part_idx < parts.size(); ++part_idx) - { - for (const auto & range : parts[part_idx].ranges) - { - parts_ranges_queue.push( - {index_access->getValue(part_idx, range.begin), {range, part_idx}, PartsRangesIterator::EventType::RangeBeginning}); - const auto & index_granularity = parts[part_idx].data_part->index_granularity; - if (index_granularity.hasFinalMark() && range.end + 1 == index_granularity.getMarksCount()) - parts_ranges_queue.push( - {index_access->getValue(part_idx, range.end), {range, part_idx}, PartsRangesIterator::EventType::RangeEnding}); - } - } - - /// The beginning of currently started (but not yet finished) range of marks of a part in the current layer. - std::unordered_map current_part_range_begin; - /// The current ending of a range of marks of a part in the current layer. - std::unordered_map current_part_range_end; - - /// Determine borders between layers. - std::vector borders; - std::vector result_layers; - - const size_t rows_per_layer = std::max(index_access->getTotalRowCount() / max_layers, 1); - - while (!parts_ranges_queue.empty()) - { - // New layer should include last granules of still open ranges from the previous layer, - // because they may already contain values greater than the last border. - size_t rows_in_current_layer = 0; - size_t marks_in_current_layer = 0; - - // Intersection between the current and next layers is just the last observed marks of each still open part range. Ratio is empirical. - auto layers_intersection_is_too_big = [&]() - { - const auto intersected_parts = current_part_range_end.size(); - return marks_in_current_layer < intersected_parts * 2; - }; - - result_layers.emplace_back(); - - while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers) - { - // We're advancing iterators until a new value showed up. - Value last_value; - while (!parts_ranges_queue.empty() && (last_value.empty() || last_value == parts_ranges_queue.top().value)) - { - auto current = parts_ranges_queue.top(); - parts_ranges_queue.pop(); - const auto part_idx = current.range.part_idx; - - if (current.event == PartsRangesIterator::EventType::RangeEnding) - { - result_layers.back().emplace_back( - parts[part_idx].data_part, - parts[part_idx].part_index_in_query, - MarkRanges{{current_part_range_begin[part_idx], current.range.end}}); - current_part_range_begin.erase(part_idx); - current_part_range_end.erase(part_idx); - continue; - } - - last_value = std::move(current.value); - rows_in_current_layer += index_access->getMarkRows(part_idx, current.range.begin); - marks_in_current_layer++; - current_part_range_begin.try_emplace(part_idx, current.range.begin); - current_part_range_end[part_idx] = current.range.begin; - if (current.range.begin + 1 < current.range.end) - { - current.range.begin++; - current.value = index_access->getValue(part_idx, current.range.begin); - parts_ranges_queue.push(std::move(current)); - } - } - if (parts_ranges_queue.empty()) - break; - if (rows_in_current_layer >= rows_per_layer && !layers_intersection_is_too_big() && result_layers.size() < max_layers) - borders.push_back(last_value); - } - for (const auto & [part_idx, last_mark] : current_part_range_end) - { - result_layers.back().emplace_back( - parts[part_idx].data_part, - parts[part_idx].part_index_in_query, - MarkRanges{{current_part_range_begin[part_idx], last_mark + 1}}); - current_part_range_begin[part_idx] = current_part_range_end[part_idx]; - } - } - for (auto & layer : result_layers) - { - std::stable_sort( - layer.begin(), - layer.end(), - [](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; }); - } - - return std::make_pair(std::move(borders), std::move(result_layers)); -} - - -/// Will return borders.size()+1 filters in total, i-th filter will accept rows with PK values within the range [borders[i-1], borders[i]). -std::vector buildFilters(const KeyDescription & primary_key, const std::vector & borders) -{ - auto add_and_condition = [&](ASTPtr & result, const ASTPtr & foo) { result = !result ? foo : makeASTFunction("and", result, foo); }; - - /// Produces ASTPtr to predicate (pk_col0, pk_col1, ... , pk_colN) > (value[0], value[1], ... , value[N]) - auto lexicographically_greater = [&](const Value & value) - { - // PK may contain functions of the table columns, so we need the actual PK AST with all expressions it contains. - ASTPtr pk_columns_as_tuple = makeASTFunction("tuple", primary_key.expression_list_ast->children); - - ASTPtr value_ast = std::make_shared(); - for (size_t i = 0; i < value.size(); ++i) - { - const auto & types = primary_key.data_types; - ASTPtr component_ast = std::make_shared(value[i]); - // Values of some types (e.g. Date, DateTime) are stored in columns as numbers and we get them as just numbers from the index. - // So we need an explicit Cast for them. - if (isColumnedAsNumber(types.at(i)->getTypeId()) && !isNumber(types.at(i)->getTypeId())) - component_ast = makeASTFunction("cast", std::move(component_ast), std::make_shared(types.at(i)->getName())); - value_ast->children.push_back(std::move(component_ast)); - } - ASTPtr value_as_tuple = makeASTFunction("tuple", value_ast->children); - - return makeASTFunction("greater", pk_columns_as_tuple, value_as_tuple); - }; - - std::vector filters(borders.size() + 1); - for (size_t layer = 0; layer <= borders.size(); ++layer) - { - if (layer > 0) - add_and_condition(filters[layer], lexicographically_greater(borders[layer - 1])); - if (layer < borders.size()) - add_and_condition(filters[layer], makeASTFunction("not", lexicographically_greater(borders[layer]))); - } - return filters; -} -} - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - -Pipes buildPipesForReadingByPKRanges( - const KeyDescription & primary_key, - RangesInDataParts parts, - size_t max_layers, - ContextPtr context, - ReadingInOrderStepGetter && reading_step_getter) -{ - if (max_layers <= 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1."); - - auto && [borders, result_layers] = split(std::move(parts), max_layers); - auto filters = buildFilters(primary_key, borders); - Pipes pipes(result_layers.size()); - for (size_t i = 0; i < result_layers.size(); ++i) - { - pipes[i] = reading_step_getter(std::move(result_layers[i])); - auto & filter_function = filters[i]; - if (!filter_function) - continue; - auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes()); - auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false); - ExpressionActionsPtr expression_actions = std::make_shared(std::move(actions)); - auto description = fmt::format( - "filter values in [{}, {})", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf"); - auto pk_expression = std::make_shared(primary_key.expression->getActionsDAG().clone()); - pipes[i].addSimpleTransform([pk_expression](const Block & header) - { return std::make_shared(header, pk_expression); }); - pipes[i].addSimpleTransform( - [&](const Block & header) - { - auto step = std::make_shared(header, expression_actions, filter_function->getColumnName(), true); - step->setDescription(description); - return step; - }); - } - return pipes; -} - -} diff --git a/src/Processors/QueryPlan/PartsSplitter.h b/src/Processors/QueryPlan/PartsSplitter.h deleted file mode 100644 index 56bca688c2d..00000000000 --- a/src/Processors/QueryPlan/PartsSplitter.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - - -namespace DB -{ - -using ReadingInOrderStepGetter = std::function; - -/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range. -/// A separate pipe will be constructed for each layer with a reading step (provided by the reading_step_getter) and a filter for this layer's range of PK values. -/// Will try to produce exactly max_layer pipes but may return less if data is distributed in not a very parallelizable way. -Pipes buildPipesForReadingByPKRanges( - const KeyDescription & primary_key, - RangesInDataParts parts, - size_t max_layers, - ContextPtr context, - ReadingInOrderStepGetter && reading_step_getter); -} diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 1caee41160f..8adaf2f1027 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1,16 +1,14 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include #include +#include +#include #include +#include +#include +#include +#include +#include +#include #include #include #include @@ -18,22 +16,17 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include #include -#include #include #include +#include +#include #include +#include +#include +#include #include #include -#include #include namespace ProfileEvents @@ -567,6 +560,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( static void addMergingFinal( Pipe & pipe, + size_t num_output_streams, const SortDescription & sort_description, MergeTreeData::MergingParams merging_params, Names partition_key_columns, @@ -613,7 +607,56 @@ static void addMergingFinal( __builtin_unreachable(); }; - pipe.addTransform(get_merging_processor()); + if (num_output_streams <= 1 || sort_description.empty()) + { + pipe.addTransform(get_merging_processor()); + return; + } + + ColumnNumbers key_columns; + key_columns.reserve(sort_description.size()); + for (const auto & desc : sort_description) + key_columns.push_back(header.getPositionByName(desc.column_name)); + + pipe.addSimpleTransform([&](const Block & stream_header) + { + return std::make_shared(stream_header, num_output_streams, key_columns); + }); + + pipe.transform([&](OutputPortRawPtrs ports) + { + Processors transforms; + std::vector output_ports; + transforms.reserve(ports.size() + num_output_streams); + output_ports.reserve(ports.size()); + + for (auto & port : ports) + { + auto copier = std::make_shared(header, num_output_streams); + connect(*port, copier->getInputPort()); + output_ports.emplace_back(copier->getOutputs().begin()); + transforms.emplace_back(std::move(copier)); + } + + for (size_t i = 0; i < num_output_streams; ++i) + { + auto merge = get_merging_processor(); + merge->setSelectorPosition(i); + auto input = merge->getInputs().begin(); + + /// Connect i-th merge with i-th input port of every copier. + for (size_t j = 0; j < ports.size(); ++j) + { + connect(*output_ports[j], *input); + ++output_ports[j]; + ++input; + } + + transforms.emplace_back(std::move(merge)); + } + + return transforms; + }); } @@ -667,7 +710,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) { - Pipes pipes; + Pipe pipe; + { RangesInDataParts new_parts; @@ -694,39 +738,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( if (new_parts.empty()) continue; - if (num_streams > 1 && metadata_for_reading->hasPrimaryKey()) - { - // Let's split parts into layers to ensure data parallelism of final. - auto reading_step_getter = [this, &column_names, &info](auto parts) - { - return read( - std::move(parts), - column_names, - ReadFromMergeTree::ReadType::InOrder, - 1 /* num_streams */, - 0 /* min_marks_for_concurrent_read */, - info.use_uncompressed_cache); - }; - pipes = buildPipesForReadingByPKRanges( - metadata_for_reading->getPrimaryKey(), std::move(new_parts), num_streams, context, std::move(reading_step_getter)); - } - else - { - pipes.emplace_back(read( - std::move(new_parts), column_names, ReadFromMergeTree::ReadType::InOrder, num_streams, 0, info.use_uncompressed_cache)); - } + pipe = read(std::move(new_parts), column_names, ReadFromMergeTree::ReadType::InOrder, + num_streams, 0, info.use_uncompressed_cache); /// Drop temporary columns, added by 'sorting_key_expr' if (!out_projection) - out_projection = createProjection(pipes.front().getHeader()); + out_projection = createProjection(pipe.getHeader()); } auto sorting_expr = std::make_shared( metadata_for_reading->getSortingKey().expression->getActionsDAG().clone()); - for (auto & pipe : pipes) - pipe.addSimpleTransform([sorting_expr](const Block & header) - { return std::make_shared(header, sorting_expr); }); + pipe.addSimpleTransform([sorting_expr](const Block & header) + { + return std::make_shared(header, sorting_expr); + }); /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition /// with level > 0 then we won't postprocess this part @@ -734,7 +760,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0) { - partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes))); + partition_pipes.emplace_back(std::move(pipe)); continue; } @@ -751,21 +777,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( for (size_t i = 0; i < sort_columns_size; ++i) sort_description.emplace_back(sort_columns[i], 1, 1); - for (auto & pipe : pipes) - addMergingFinal( - pipe, - sort_description, - data.merging_params, - partition_key_columns, - max_block_size); + addMergingFinal( + pipe, + std::min(num_streams, settings.max_final_threads), + sort_description, data.merging_params, partition_key_columns, max_block_size); - partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes))); + partition_pipes.emplace_back(std::move(pipe)); } if (!lonely_parts.empty()) { + RangesInDataParts new_parts; + size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size(); + const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead( settings.merge_tree_min_rows_for_concurrent_read, settings.merge_tree_min_bytes_for_concurrent_read, diff --git a/src/Processors/Transforms/AddingSelectorTransform.cpp b/src/Processors/Transforms/AddingSelectorTransform.cpp new file mode 100644 index 00000000000..f75a5920072 --- /dev/null +++ b/src/Processors/Transforms/AddingSelectorTransform.cpp @@ -0,0 +1,76 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +AddingSelectorTransform::AddingSelectorTransform( + const Block & header, size_t num_outputs_, ColumnNumbers key_columns_) + : ISimpleTransform(header, header, false) + , num_outputs(num_outputs_) + , key_columns(std::move(key_columns_)) + , hash(0) +{ + setInputNotNeededAfterRead(false); + + if (num_outputs <= 1) + throw Exception("SplittingByHashTransform expects more than 1 outputs, got " + std::to_string(num_outputs), + ErrorCodes::LOGICAL_ERROR); + + if (key_columns.empty()) + throw Exception("SplittingByHashTransform cannot split by empty set of key columns", + ErrorCodes::LOGICAL_ERROR); + + for (auto & column : key_columns) + if (column >= header.columns()) + throw Exception("Invalid column number: " + std::to_string(column) + + ". There is only " + std::to_string(header.columns()) + " columns in header", + ErrorCodes::LOGICAL_ERROR); +} + +static void calculateWeakHash32(const Chunk & chunk, const ColumnNumbers & key_columns, WeakHash32 & hash) +{ + auto num_rows = chunk.getNumRows(); + const auto & columns = chunk.getColumns(); + + hash.reset(num_rows); + + for (const auto & column_number : key_columns) + columns[column_number]->updateWeakHash32(hash); +} + +static IColumn::Selector fillSelector(const WeakHash32 & hash, size_t num_outputs) +{ + /// Row from interval [(2^32 / num_outputs) * i, (2^32 / num_outputs) * (i + 1)) goes to bucket with number i. + + const auto & hash_data = hash.getData(); + size_t num_rows = hash_data.size(); + IColumn::Selector selector(num_rows); + + for (size_t row = 0; row < num_rows; ++row) + { + selector[row] = hash_data[row]; /// [0, 2^32) + selector[row] *= num_outputs; /// [0, num_outputs * 2^32), selector stores 64 bit values. + selector[row] >>= 32u; /// [0, num_outputs) + } + + return selector; +} + +void AddingSelectorTransform::transform(Chunk & input_chunk, Chunk & output_chunk) +{ + auto chunk_info = std::make_shared(); + + calculateWeakHash32(input_chunk, key_columns, hash); + chunk_info->selector = fillSelector(hash, num_outputs); + + input_chunk.swap(output_chunk); + output_chunk.setChunkInfo(std::move(chunk_info)); +} + +} diff --git a/src/Processors/Transforms/AddingSelectorTransform.h b/src/Processors/Transforms/AddingSelectorTransform.h new file mode 100644 index 00000000000..bad97adfa76 --- /dev/null +++ b/src/Processors/Transforms/AddingSelectorTransform.h @@ -0,0 +1,26 @@ +#pragma once +#include +#include +#include +#include + +namespace DB +{ + +/// Add IColumn::Selector to chunk (see SelectorInfo.h). +/// Selector is filled by formula (WeakHash(key_columns) * num_outputs / MAX_INT). +class AddingSelectorTransform : public ISimpleTransform +{ +public: + AddingSelectorTransform(const Block & header, size_t num_outputs_, ColumnNumbers key_columns_); + String getName() const override { return "AddingSelector"; } + void transform(Chunk & input_chunk, Chunk & output_chunk) override; + +private: + size_t num_outputs; + ColumnNumbers key_columns; + + WeakHash32 hash; +}; + +} diff --git a/src/Processors/Transforms/FilterSortedStreamByRange.h b/src/Processors/Transforms/FilterSortedStreamByRange.h deleted file mode 100644 index adbc0626abb..00000000000 --- a/src/Processors/Transforms/FilterSortedStreamByRange.h +++ /dev/null @@ -1,66 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -namespace DB -{ - -/// Could be used when the predicate given by expression_ is true only on one continuous range of values and input is monotonous by that value. -/// The following optimization applies: when a new chunk of data comes in we firstly execute the expression_ only on the first and the last row. -/// If it evaluates to true on both rows then the whole chunk is immediately passed to further steps. -/// Otherwise, we apply the expression_ to all rows. -class FilterSortedStreamByRange : public ISimpleTransform -{ -public: - FilterSortedStreamByRange( - const Block & header_, - ExpressionActionsPtr expression_, - String filter_column_name_, - bool remove_filter_column_, - bool on_totals_ = false) - : ISimpleTransform( - header_, - FilterTransform::transformHeader(header_, expression_->getActionsDAG(), filter_column_name_, remove_filter_column_), - true) - , filter_transform(header_, expression_, filter_column_name_, remove_filter_column_, on_totals_) - { - } - - String getName() const override { return "FilterSortedStreamByRange"; } - - void transform(Chunk & chunk) override - { - int rows_before_filtration = chunk.getNumRows(); - if (rows_before_filtration < 2) - { - filter_transform.transform(chunk); - return; - } - - // Evaluate expression on just the first and the last row. - // If both of them satisfies conditions, than skip calculation for all the rows in between. - auto quick_check_columns = chunk.cloneEmptyColumns(); - auto src_columns = chunk.detachColumns(); - for (auto row : {0, rows_before_filtration - 1}) - for (size_t col = 0; col < quick_check_columns.size(); ++col) - quick_check_columns[col]->insertFrom(*src_columns[col].get(), row); - chunk.setColumns(std::move(quick_check_columns), 2); - filter_transform.transform(chunk); - const bool all_rows_will_pass_filter = chunk.getNumRows() == 2; - - chunk.setColumns(std::move(src_columns), rows_before_filtration); - - // Not all rows satisfy conditions. - if (!all_rows_will_pass_filter) - filter_transform.transform(chunk); - } - -private: - FilterTransform filter_transform; -}; - - -} diff --git a/src/Processors/Transforms/FilterTransform.h b/src/Processors/Transforms/FilterTransform.h index 3340fe230b7..39f1f1c42db 100644 --- a/src/Processors/Transforms/FilterTransform.h +++ b/src/Processors/Transforms/FilterTransform.h @@ -32,6 +32,7 @@ public: Status prepare() override; +protected: void transform(Chunk & chunk) override; private: diff --git a/src/Processors/Transforms/SelectorInfo.h b/src/Processors/Transforms/SelectorInfo.h new file mode 100644 index 00000000000..2876d64ed28 --- /dev/null +++ b/src/Processors/Transforms/SelectorInfo.h @@ -0,0 +1,14 @@ +#pragma once +#include +#include + +namespace DB +{ + +/// ChunkInfo with IColumn::Selector. It is added by AddingSelectorTransform. +struct SelectorInfo : public ChunkInfo +{ + IColumn::Selector selector; +}; + +} diff --git a/src/QueryPipeline/printPipeline.h b/src/QueryPipeline/printPipeline.h index 4b947149c7c..6ff5fb24c37 100644 --- a/src/QueryPipeline/printPipeline.h +++ b/src/QueryPipeline/printPipeline.h @@ -28,10 +28,7 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri /// Nodes // TODO quoting and escaping for (const auto & processor : processors) { - auto description = processor->getDescription(); - if (!description.empty()) - description = ": " + description; - out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << description; + out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << processor->getDescription(); if (statuses_iter != statuses.end()) { diff --git a/tests/performance/parallel_final.xml b/tests/performance/parallel_final.xml index ca84ed52a04..775926d1ee8 100644 --- a/tests/performance/parallel_final.xml +++ b/tests/performance/parallel_final.xml @@ -18,7 +18,6 @@ collapsing_final_16p_str_keys_rnd collapsing_final_1024p_ord collapsing_final_1024p_rnd - collapsing_final_1p_ord @@ -31,7 +30,6 @@ create table collapsing_final_16p_str_keys_rnd (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 create table collapsing_final_1024p_ord (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by intDiv(key1, 8192 * 2) create table collapsing_final_1024p_rnd (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024 - create table collapsing_final_1p_ord (key1 UInt64, key2 UInt64, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2) insert into collapsing_final_16p_ord select number, number, 1, number from numbers_mt(8388608) @@ -45,9 +43,6 @@ insert into collapsing_final_1024p_ord select number, 1, number from numbers_mt(16777216) insert into collapsing_final_1024p_rnd select number, 1, number from numbers_mt(16777216) - - insert into collapsing_final_1p_ord select number, number + 1, 1, number from numbers_mt(5e7) - optimize table {collapsing} final SELECT count() FROM {collapsing} final diff --git a/tests/queries/0_stateless/01861_explain_pipeline.reference b/tests/queries/0_stateless/01861_explain_pipeline.reference index aec3ae06dce..2ba294d7e4d 100644 --- a/tests/queries/0_stateless/01861_explain_pipeline.reference +++ b/tests/queries/0_stateless/01861_explain_pipeline.reference @@ -16,15 +16,8 @@ ExpressionTransform ExpressionTransform × 2 (ReadFromMergeTree) ExpressionTransform × 2 - ReplacingSorted - ExpressionTransform - FilterSortedStreamByRange - Description: filter values in [(5), +inf) - ExpressionTransform - MergeTreeInOrder 0 → 1 - ReplacingSorted 2 → 1 - ExpressionTransform × 2 - FilterSortedStreamByRange × 2 - Description: filter values in [-inf, (5)) - ExpressionTransform × 2 - MergeTreeInOrder × 2 0 → 1 + ReplacingSorted × 2 2 → 1 + Copy × 2 1 → 2 + AddingSelector × 2 + ExpressionTransform × 2 + MergeTreeInOrder × 2 0 → 1 diff --git a/tests/queries/0_stateless/02286_parallel_final.reference b/tests/queries/0_stateless/02286_parallel_final.reference deleted file mode 100644 index f6573cb9042..00000000000 --- a/tests/queries/0_stateless/02286_parallel_final.reference +++ /dev/null @@ -1,9 +0,0 @@ -2 -2 -3 -5 -8 -8 -8 -8 -8 diff --git a/tests/queries/0_stateless/02286_parallel_final.sh b/tests/queries/0_stateless/02286_parallel_final.sh deleted file mode 100755 index 6686a5d3e33..00000000000 --- a/tests/queries/0_stateless/02286_parallel_final.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env bash - -CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -# shellcheck source=../shell_config.sh -. "$CURDIR"/../shell_config.sh - -test_random_values() { - layers=$1 - $CLICKHOUSE_CLIENT -n -q " - create table tbl_8parts_${layers}granules_rnd (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 % 8); - insert into tbl_8parts_${layers}granules_rnd select number, 1 from numbers_mt($((layers * 8 * 8192))); - explain pipeline select * from tbl_8parts_${layers}granules_rnd final settings max_threads = 16;" 2>&1 | - grep -c "CollapsingSortedTransform" -} - -for layers in 2 3 5 8; do - test_random_values $layers -done; - -test_sequential_values() { - layers=$1 - $CLICKHOUSE_CLIENT -n -q " - create table tbl_8parts_${layers}granules_seq (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 / $((layers * 8192)))::UInt64; - insert into tbl_8parts_${layers}granules_seq select number, 1 from numbers_mt($((layers * 8 * 8192))); - explain pipeline select * from tbl_8parts_${layers}granules_seq final settings max_threads = 8;" 2>&1 | - grep -c "CollapsingSortedTransform" -} - -for layers in 2 3 5 8 16; do - test_sequential_values $layers -done;