diff --git a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h index 9d7a5f5355d..11e27f6aba0 100644 --- a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h @@ -14,8 +14,12 @@ namespace DB struct ChunkSelectFinalIndices : public ChunkInfo { - ColumnPtr select_final_indices; - explicit ChunkSelectFinalIndices(MutableColumnPtr select_final_indices_) : select_final_indices(std::move(select_final_indices_)) {} + ColumnPtr column_holder; + const ColumnUInt32 * select_final_indices; + explicit ChunkSelectFinalIndices(MutableColumnPtr select_final_indices_) : column_holder(std::move(select_final_indices_)) + { + select_final_indices = typeid_cast(column_holder.get()); + } }; /** Merges several sorted inputs into one. diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 44fd04f4caa..a4eb9421243 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -986,7 +986,8 @@ static void addMergingFinal( MergeTreeData::MergingParams merging_params, Names partition_key_columns, size_t max_block_size_rows, - bool use_skipping_final) + bool use_skipping_final, + bool query_has_where) { const auto & header = pipe.getHeader(); size_t num_outputs = pipe.numOutputPorts(); @@ -1041,9 +1042,14 @@ static void addMergingFinal( }; pipe.addTransform(get_merging_processor()); - if (use_skipping_final && merging_params.mode == MergeTreeData::MergingParams::Replacing) + if (use_skipping_final && !query_has_where) + { + /// Skipping FINAL algorithm will output the original chunk and a column indices of selected rows + /// If query has WHERE, we will merge the indices with the filter in FilterTransform later + /// Otherwise, use SelectByIndicesTransform to select rows pipe.addSimpleTransform([](const Block & header_) { return std::make_shared(header_); }); + } } @@ -1052,6 +1058,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( { const auto & settings = context->getSettingsRef(); const auto & data_settings = data.getSettings(); + const auto & select_query = query_info.query->as(); + bool query_has_where = select_query && select_query->where(); PartRangesReadInfo info(parts_with_ranges, settings, *data_settings); @@ -1184,7 +1192,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( data.merging_params, partition_key_columns, block_size.max_block_size_rows, - settings.use_skipping_final); + use_skipping_final, + query_has_where); merging_pipes.emplace_back(Pipe::unitePipes(std::move(pipes))); } diff --git a/src/Processors/Transforms/FilterTransform.cpp b/src/Processors/Transforms/FilterTransform.cpp index 089393a1430..1ac54d851c4 100644 --- a/src/Processors/Transforms/FilterTransform.cpp +++ b/src/Processors/Transforms/FilterTransform.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { @@ -34,6 +35,87 @@ static void replaceFilterToConstant(Block & block, const String & filter_column_ } } +static std::shared_ptr getSelectByFinalIndices(Chunk & chunk) +{ + if (auto select_final_indices_info = std::dynamic_pointer_cast(chunk.getChunkInfo())) + { + chunk.setChunkInfo(nullptr); + return select_final_indices_info; + } + return nullptr; +} + +static void +executeSelectByIndices(Columns & columns, std::shared_ptr & select_final_indices_info, size_t & num_rows) +{ + if (select_final_indices_info) + { + const auto & index_column = select_final_indices_info->select_final_indices; + + if (index_column && index_column->size() != num_rows) + for (auto & column : columns) + column = column->index(*index_column, 0); + + num_rows = index_column->size(); + } +} + +static std::unique_ptr combineFilterAndIndices( + std::unique_ptr description, + std::shared_ptr & select_final_indices_info, + size_t num_rows) +{ + if (select_final_indices_info) + { + const auto * index_column = select_final_indices_info->select_final_indices; + + if (description->data && index_column && index_column->size() != num_rows) + { + const auto & selected_by_indices = index_column->getData(); + const auto & selected_by_filter = *description->data; + /// At this point we know that the filter is not constant, just create a new filter + auto mutable_holder = ColumnUInt8::create(num_rows, 0); + ColumnUInt8 * concrete_column = typeid_cast(mutable_holder.get()); + auto & data = concrete_column->getData(); + for (auto idx : selected_by_indices) + data[idx] |= 1; + for (size_t i = 0; i < num_rows; ++i) + data[i] &= selected_by_filter[i]; + description->data_holder = std::move(mutable_holder); + description->data = &data; + } + } + return std::move(description); +} + +static std::unique_ptr combineFilterAndIndices( + std::unique_ptr description, + std::shared_ptr & select_final_indices_info, + size_t num_rows) +{ + if (select_final_indices_info) + { + const auto * index_column = select_final_indices_info->select_final_indices; + + if (description->filter_indices && index_column && index_column->size() != num_rows) + { + std::unique_ptr res; + const auto & selected_by_indices = index_column->getData(); + const auto & selected_by_filter = typeid_cast(description->filter_indices)->getData(); + auto mutable_holder = ColumnUInt8::create(num_rows, 0); + auto & data = mutable_holder->getData(); + for (auto idx : selected_by_indices) + data[idx] += 1; + for (auto idx : selected_by_filter) + data[idx] = (data[idx] + 1) << 1; + res->data_holder = std::move(mutable_holder); + res->data = &data; + return res; + } + } + return description; +} + Block FilterTransform::transformHeader( Block header, const ActionsDAG * expression, @@ -126,6 +208,7 @@ void FilterTransform::doTransform(Chunk & chunk) { size_t num_rows_before_filtration = chunk.getNumRows(); auto columns = chunk.detachColumns(); + auto select_final_indices_info = getSelectByFinalIndices(chunk); { Block block = getInputPort().getHeader().cloneWithColumns(columns); @@ -139,6 +222,7 @@ void FilterTransform::doTransform(Chunk & chunk) if (constant_filter_description.always_true || on_totals) { + executeSelectByIndices(columns, select_final_indices_info, num_rows_before_filtration); chunk.setColumns(std::move(columns), num_rows_before_filtration); removeFilterIfNeed(chunk); return; @@ -159,11 +243,20 @@ void FilterTransform::doTransform(Chunk & chunk) if (constant_filter_description.always_true) { + executeSelectByIndices(columns, select_final_indices_info, num_rows_before_filtration); chunk.setColumns(std::move(columns), num_rows_before_filtration); removeFilterIfNeed(chunk); return; } + std::unique_ptr filter_description; + if (filter_column->isSparse()) + filter_description = combineFilterAndIndices( + std::make_unique(*filter_column), select_final_indices_info, num_rows_before_filtration); + else + filter_description = combineFilterAndIndices( + std::make_unique(*filter_column), select_final_indices_info, num_rows_before_filtration); + /** Let's find out how many rows will be in result. * To do this, we filter out the first non-constant column * or calculate number of set bytes in the filter. @@ -178,12 +271,6 @@ void FilterTransform::doTransform(Chunk & chunk) } } - std::unique_ptr filter_description; - if (filter_column->isSparse()) - filter_description = std::make_unique(*filter_column); - else - filter_description = std::make_unique(*filter_column); - size_t num_filtered_rows = 0; if (first_non_constant_column != num_columns) {