From 5b6bf58730827a48cf162942ecc3ff38caaa0999 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 2 Nov 2023 11:19:37 +0000 Subject: [PATCH 1/5] Proper process aliases for aggregation-by-partition optimization. --- src/Interpreters/ActionsDAG.cpp | 96 +++++++++++++++---- src/Interpreters/ActionsDAG.h | 5 + .../useDataParallelAggregation.cpp | 13 +-- 3 files changed, 91 insertions(+), 23 deletions(-) diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 04dee2ed6e6..f681b4ce5cc 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -335,6 +335,28 @@ const ActionsDAG::Node * ActionsDAG::tryFindInOutputs(const std::string & name) return nullptr; } +ActionsDAG::NodeRawConstPtrs ActionsDAG::findInOutpus(const Names & names) const +{ + NodeRawConstPtrs required_nodes; + required_nodes.reserve(names.size()); + + std::unordered_map names_map; + for (const auto * node : outputs) + names_map[node->result_name] = node; + + for (const auto & name : names) + { + auto it = names_map.find(name); + if (it == names_map.end()) + throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, + "Unknown column: {}, there are only columns {}", name, dumpDAG()); + + required_nodes.push_back(it->second); + } + + return required_nodes; +} + void ActionsDAG::addOrReplaceInOutputs(const Node & node) { for (auto & output_node : outputs) @@ -441,23 +463,7 @@ void ActionsDAG::removeUnusedActions(const NameSet & required_names, bool allow_ void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_remove_inputs, bool allow_constant_folding) { - NodeRawConstPtrs required_nodes; - required_nodes.reserve(required_names.size()); - - std::unordered_map names_map; - for (const auto * node : outputs) - names_map[node->result_name] = node; - - for (const auto & name : required_names) - { - auto it = names_map.find(name); - if (it == names_map.end()) - throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, - "Unknown column: {}, there are only columns {}", name, dumpDAG()); - - required_nodes.push_back(it->second); - } - + auto required_nodes = findInOutpus(required_names); outputs.swap(required_nodes); removeUnusedActions(allow_remove_inputs, allow_constant_folding); } @@ -528,6 +534,62 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs, bool allow_consta std::erase_if(inputs, [&](const Node * node) { return !visited_nodes.contains(node); }); } +ActionsDAGPtr ActionsDAG::cloneSubDAG(const NodeRawConstPtrs & outputs, bool remove_aliases) +{ + auto actions = std::make_shared(); + std::unordered_map copy_map; + + struct Frame + { + const Node * node = nullptr; + size_t next_child = 0; + }; + + std::stack stack; + + for (const auto * output : outputs) + { + if (copy_map.contains(output)) + continue; + + stack.push(Frame{output}); + while (!stack.empty()) + { + auto & frame = stack.top(); + const auto & children = frame.node->children; + while (frame.next_child < children.size() && copy_map.contains(children[frame.next_child])) + ++frame.next_child; + + if (frame.next_child < children.size()) + { + stack.push(Frame{children[frame.next_child]}); + continue; + } + + auto & copy_node = copy_map[frame.node]; + + if (remove_aliases && frame.node->type == ActionType::ALIAS) + copy_node = copy_map[frame.node->children.front()]; + else + copy_node = &actions->nodes.emplace_back(*frame.node); + + if (frame.node->type == ActionType::INPUT) + actions->inputs.push_back(copy_node); + + stack.pop(); + } + } + + for (auto & node : actions->nodes) + for (auto & child : node.children) + child = copy_map[child]; + + for (const auto * output : outputs) + actions->outputs.push_back(copy_map[output]); + + return actions; +} + static ColumnWithTypeAndName executeActionForHeader(const ActionsDAG::Node * node, ColumnsWithTypeAndName arguments) { ColumnWithTypeAndName res_column; diff --git a/src/Interpreters/ActionsDAG.h b/src/Interpreters/ActionsDAG.h index 48ed03d7347..2f7e2f109df 100644 --- a/src/Interpreters/ActionsDAG.h +++ b/src/Interpreters/ActionsDAG.h @@ -157,6 +157,9 @@ public: /// Same, but return nullptr if node not found. const Node * tryFindInOutputs(const std::string & name) const; + /// Same, but for the list of names. + NodeRawConstPtrs findInOutpus(const Names & names) const; + /// Find first node with the same name in output nodes and replace it. /// If was not found, add node to outputs end. void addOrReplaceInOutputs(const Node & node); @@ -257,6 +260,8 @@ public: ActionsDAGPtr clone() const; + static ActionsDAGPtr cloneSubDAG(const NodeRawConstPtrs & outputs, bool remove_aliases); + /// Execute actions for header. Input block must have empty columns. /// Result should be equal to the execution of ExpressionActions built from this DAG. /// Actions are not changed, no expressions are compiled. diff --git a/src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp b/src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp index f90d10b31d5..0b53b6dd8a6 100644 --- a/src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp +++ b/src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp @@ -95,7 +95,7 @@ bool allOutputsDependsOnlyOnAllowedNodes( { const auto & match = matches.at(node); /// Function could be mapped into its argument. In this case .monotonicity != std::nullopt (see matchTrees) - if (match.node && match.node->result_name == node->result_name && !match.monotonicity) + if (match.node && !match.monotonicity) res = irreducible_nodes.contains(match.node); } @@ -155,9 +155,10 @@ bool isPartitionKeySuitsGroupByKey( return false; /// We are interested only in calculations required to obtain group by keys (and not aggregate function arguments for example). - group_by_actions->removeUnusedActions(aggregating.getParams().keys); + auto key_nodes = group_by_actions->findInOutpus(aggregating.getParams().keys); + auto group_by_key_actions = ActionsDAG::cloneSubDAG(key_nodes, true); - const auto & gb_key_required_columns = group_by_actions->getRequiredColumnsNames(); + const auto & gb_key_required_columns = group_by_key_actions->getRequiredColumnsNames(); const auto & partition_actions = reading.getStorageMetadata()->getPartitionKey().expression->getActionsDAG(); @@ -166,9 +167,9 @@ bool isPartitionKeySuitsGroupByKey( if (std::ranges::find(gb_key_required_columns, col) == gb_key_required_columns.end()) return false; - const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_actions); + const auto irreducibe_nodes = removeInjectiveFunctionsFromResultsRecursively(group_by_key_actions); - const auto matches = matchTrees(group_by_actions->getOutputs(), partition_actions); + const auto matches = matchTrees(group_by_key_actions->getOutputs(), partition_actions); return allOutputsDependsOnlyOnAllowedNodes(partition_actions, irreducibe_nodes, matches); } @@ -206,7 +207,7 @@ size_t tryAggregatePartitionsIndependently(QueryPlan::Node * node, QueryPlan::No return 0; if (!reading->willOutputEachPartitionThroughSeparatePort() - && isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression()->clone(), *aggregating_step)) + && isPartitionKeySuitsGroupByKey(*reading, expression_step->getExpression(), *aggregating_step)) { if (reading->requestOutputEachPartitionThroughSeparatePort()) aggregating_step->skipMerging(); From 5095a2175874bb2dbae254f78c8ff71b4ac1d37f Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 2 Nov 2023 11:23:12 +0000 Subject: [PATCH 2/5] Update tests list. --- tests/analyzer_tech_debt.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/analyzer_tech_debt.txt b/tests/analyzer_tech_debt.txt index ff93d2f4e30..7a73958c0f6 100644 --- a/tests/analyzer_tech_debt.txt +++ b/tests/analyzer_tech_debt.txt @@ -47,7 +47,6 @@ 02458_use_structure_from_insertion_table 02479_race_condition_between_insert_and_droppin_mv 02493_inconsistent_hex_and_binary_number -02521_aggregation_by_partitions 02554_fix_grouping_sets_predicate_push_down 02575_merge_prewhere_different_default_kind 02713_array_low_cardinality_string From 34fdb8a7c634b81c2cf5534f3a13ab77813752ed Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 6 Nov 2023 18:46:39 +0000 Subject: [PATCH 3/5] Fix rocksdb with analyzer. --- src/Storages/KVStorageUtils.cpp | 117 ++++++++++++++++++ src/Storages/KVStorageUtils.h | 3 + .../RocksDB/StorageEmbeddedRocksDB.cpp | 89 +++++++++++-- src/Storages/RocksDB/StorageEmbeddedRocksDB.h | 4 +- tests/analyzer_tech_debt.txt | 2 - 5 files changed, 199 insertions(+), 16 deletions(-) diff --git a/src/Storages/KVStorageUtils.cpp b/src/Storages/KVStorageUtils.cpp index 16ab99d03b4..8238886db4e 100644 --- a/src/Storages/KVStorageUtils.cpp +++ b/src/Storages/KVStorageUtils.cpp @@ -1,5 +1,7 @@ #include +#include + #include #include #include @@ -121,6 +123,121 @@ bool traverseASTFilter( } return false; } + +bool traverseDAGFilter( + const std::string & primary_key, const DataTypePtr & primary_key_type, const ActionsDAG::Node * elem, const ContextPtr & context, FieldVectorPtr & res) +{ + if (elem->type == ActionsDAG::ActionType::ALIAS) + return traverseDAGFilter(primary_key, primary_key_type, elem->children.at(0), context, res); + + if (elem->type != ActionsDAG::ActionType::FUNCTION) + return false; + + auto func_name = elem->function_base->getName(); + + if (func_name == "and") + { + // one child has the key filter condition is ok + for (const auto * child : elem->children) + if (traverseDAGFilter(primary_key, primary_key_type, child, context, res)) + return true; + return false; + } + else if (func_name == "or") + { + // make sure every child has the key filter condition + for (const auto * child : elem->children) + if (!traverseDAGFilter(primary_key, primary_key_type, child, context, res)) + return false; + return true; + } + else if (func_name == "equals" || func_name == "in") + { + if (elem->children.size() != 2) + return false; + + if (func_name == "in") + { + const auto * key = elem->children.at(0); + while (key->type == ActionsDAG::ActionType::ALIAS) + key = key->children.at(0); + + if (key->type != ActionsDAG::ActionType::INPUT) + return false; + + if (key->result_name != primary_key) + return false; + + const auto * value = elem->children.at(1); + if (value->type != ActionsDAG::ActionType::COLUMN) + return false; + + const IColumn * value_col = value->column.get(); + if (const auto * col_const = typeid_cast(value_col)) + value_col = &col_const->getDataColumn(); + + const auto * col_set = typeid_cast(value_col); + if (!col_set) + return false; + + auto future_set = col_set->getData(); + future_set->buildOrderedSetInplace(context); + + auto set = future_set->get(); + if (!set) + return false; + + if (!set->hasExplicitSetElements()) + return false; + + set->checkColumnsNumber(1); + const auto & set_column = *set->getSetElements()[0]; + + if (set_column.getDataType() != primary_key_type->getTypeId()) + return false; + + for (size_t row = 0; row < set_column.size(); ++row) + res->push_back(set_column[row]); + return true; + } + else + { + const auto * key = elem->children.at(0); + while (key->type == ActionsDAG::ActionType::ALIAS) + key = key->children.at(0); + + if (key->type != ActionsDAG::ActionType::INPUT) + return false; + + if (key->result_name != primary_key) + return false; + + const auto * value = elem->children.at(1); + if (value->type != ActionsDAG::ActionType::COLUMN) + return false; + + auto converted_field = convertFieldToType((*value->column)[0], *primary_key_type); + if (!converted_field.isNull()) + res->push_back(converted_field); + return true; + } + } + return false; +} +} + +std::pair getFilterKeys( + const String & primary_key, const DataTypePtr & primary_key_type, const ActionDAGNodes & filter_nodes, const ContextPtr & context) +{ + if (filter_nodes.nodes.empty()) + return {{}, true}; + + auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes, {}, context); + const auto * predicate = filter_actions_dag->getOutputs().at(0); + + FieldVectorPtr res = std::make_shared(); + auto matched_keys = traverseDAGFilter(primary_key, primary_key_type, predicate, context, res); + return std::make_pair(res, !matched_keys); } std::pair getFilterKeys( diff --git a/src/Storages/KVStorageUtils.h b/src/Storages/KVStorageUtils.h index c3bb2aefa62..c6d63b800df 100644 --- a/src/Storages/KVStorageUtils.h +++ b/src/Storages/KVStorageUtils.h @@ -21,6 +21,9 @@ using DataTypePtr = std::shared_ptr; std::pair getFilterKeys( const std::string & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context); +std::pair getFilterKeys( + const String & primary_key, const DataTypePtr & primary_key_type, const ActionDAGNodes & filter_nodes, const ContextPtr & context); + template void fillColumns(const K & key, const V & value, size_t key_pos, const Block & header, MutableColumns & columns) { diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp index 5e8d54bcdf1..76f001509c0 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp @@ -20,6 +20,8 @@ #include #include +#include +#include #include #include @@ -415,7 +417,46 @@ void StorageEmbeddedRocksDB::initDB() } } -Pipe StorageEmbeddedRocksDB::read( +class ReadFromEmbeddedRocksDB : public SourceStepWithFilter +{ +public: + std::string getName() const override { return "ReadFromEmbeddedRocksDB"; } + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + void applyFilters() override; + + ReadFromEmbeddedRocksDB( + Block sample_block, + StorageSnapshotPtr storage_snapshot_, + const StorageEmbeddedRocksDB & storage_, + SelectQueryInfo query_info_, + ContextPtr context_, + size_t max_block_size_, + size_t num_streams_) + : SourceStepWithFilter(DataStream{.header = std::move(sample_block)}) + , storage_snapshot(std::move(storage_snapshot_)) + , storage(storage_) + , query_info(std::move(query_info_)) + , context(std::move(context_)) + , max_block_size(max_block_size_) + , num_streams(num_streams_) + { + } + +private: + StorageSnapshotPtr storage_snapshot; + const StorageEmbeddedRocksDB & storage; + SelectQueryInfo query_info; + ContextPtr context; + + size_t max_block_size; + size_t num_streams; + + FieldVectorPtr keys; + bool all_scan = false; +}; + +void StorageEmbeddedRocksDB::read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, @@ -425,23 +466,36 @@ Pipe StorageEmbeddedRocksDB::read( size_t num_streams) { storage_snapshot->check(column_names); - - FieldVectorPtr keys; - bool all_scan = false; - Block sample_block = storage_snapshot->metadata->getSampleBlock(); - auto primary_key_data_type = sample_block.getByName(primary_key).type; - std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info, context_); + + auto reading = std::make_unique( + std::move(sample_block), + storage_snapshot, + *this, + query_info, + context_, + max_block_size, + num_streams); + + query_plan.addStep(std::move(reading)); +} + +void ReadFromEmbeddedRocksDB::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +{ + const auto & sample_block = getOutputStream().header; + if (all_scan) { - auto iterator = std::unique_ptr(rocksdb_ptr->NewIterator(rocksdb::ReadOptions())); + auto iterator = std::unique_ptr(storage.rocksdb_ptr->NewIterator(rocksdb::ReadOptions())); iterator->SeekToFirst(); - return Pipe(std::make_shared(*this, sample_block, std::move(iterator), max_block_size)); + auto source = std::make_shared(storage, sample_block, std::move(iterator), max_block_size); + source->setStorageLimits(query_info.storage_limits); + pipeline.init(Pipe(std::move(source))); } else { if (keys->empty()) - return {}; + return; ::sort(keys->begin(), keys->end()); keys->erase(std::unique(keys->begin(), keys->end()), keys->end()); @@ -459,13 +513,22 @@ Pipe StorageEmbeddedRocksDB::read( size_t begin = num_keys * thread_idx / num_threads; size_t end = num_keys * (thread_idx + 1) / num_threads; - pipes.emplace_back(std::make_shared( - *this, sample_block, keys, keys->begin() + begin, keys->begin() + end, max_block_size)); + auto source = std::make_shared( + storage, sample_block, keys, keys->begin() + begin, keys->begin() + end, max_block_size); + source->setStorageLimits(query_info.storage_limits); + pipes.emplace_back(std::move(source)); } - return Pipe::unitePipes(std::move(pipes)); + pipeline.init(Pipe::unitePipes(std::move(pipes))); } } +void ReadFromEmbeddedRocksDB::applyFilters() +{ + const auto & sample_block = getOutputStream().header; + auto primary_key_data_type = sample_block.getByName(storage.primary_key).type; + std::tie(keys, all_scan) = getFilterKeys(storage.primary_key, primary_key_data_type, filter_nodes, context); +} + SinkToStoragePtr StorageEmbeddedRocksDB::write( const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/, bool /*async_insert*/) { diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h index 336f6a8abe3..11eba607c3a 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h @@ -26,6 +26,7 @@ class Context; class StorageEmbeddedRocksDB final : public IStorage, public IKeyValueEntity, WithContext { friend class EmbeddedRocksDBSink; + friend class ReadFromEmbeddedRocksDB; public: StorageEmbeddedRocksDB(const StorageID & table_id_, const String & relative_data_path_, @@ -39,7 +40,8 @@ public: std::string getName() const override { return "EmbeddedRocksDB"; } - Pipe read( + void read( + QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, diff --git a/tests/analyzer_tech_debt.txt b/tests/analyzer_tech_debt.txt index ff93d2f4e30..64d4cd531e0 100644 --- a/tests/analyzer_tech_debt.txt +++ b/tests/analyzer_tech_debt.txt @@ -14,7 +14,6 @@ 01268_shard_avgweighted 01455_shard_leaf_max_rows_bytes_to_read 01495_subqueries_in_with_statement -01504_rocksdb 01560_merge_distributed_join 01584_distributed_buffer_cannot_find_column 01586_columns_pruning @@ -39,7 +38,6 @@ 02352_grouby_shadows_arg 02354_annoy 02366_union_decimal_conversion -02375_rocksdb_with_filters 02402_merge_engine_with_view 02404_memory_bound_merging 02426_orc_bug From c58c583043aaa0897ce95a00a4b200bd32f0b81d Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 7 Nov 2023 11:42:08 +0000 Subject: [PATCH 4/5] Fix tests. --- src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp index cf225322840..42519c84f35 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -520,7 +521,10 @@ void ReadFromEmbeddedRocksDB::initializePipeline(QueryPipelineBuilder & pipeline else { if (keys->empty()) + { + pipeline.init(Pipe(std::make_shared(sample_block))); return; + } ::sort(keys->begin(), keys->end()); keys->erase(std::unique(keys->begin(), keys->end()), keys->end()); From 7eacea4c518a46664f6f388143bcd74c9c0c9e71 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 8 Nov 2023 17:28:57 +0100 Subject: [PATCH 5/5] Update src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp Co-authored-by: Nikita Taranov --- .../QueryPlan/Optimizations/useDataParallelAggregation.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp b/src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp index 0b53b6dd8a6..124cb735d5a 100644 --- a/src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp +++ b/src/Processors/QueryPlan/Optimizations/useDataParallelAggregation.cpp @@ -156,7 +156,7 @@ bool isPartitionKeySuitsGroupByKey( /// We are interested only in calculations required to obtain group by keys (and not aggregate function arguments for example). auto key_nodes = group_by_actions->findInOutpus(aggregating.getParams().keys); - auto group_by_key_actions = ActionsDAG::cloneSubDAG(key_nodes, true); + auto group_by_key_actions = ActionsDAG::cloneSubDAG(key_nodes, /*remove_aliases=*/ true); const auto & gb_key_required_columns = group_by_key_actions->getRequiredColumnsNames();