From 89e967333662c8ec4c46dfa1ba12bd10781538b9 Mon Sep 17 00:00:00 2001 From: Artem Zuikov Date: Fri, 14 Aug 2020 12:38:18 +0300 Subject: [PATCH] Hotfix for pushdown with StorageMerge (#13679) --- src/Interpreters/JoinedTables.cpp | 4 ++- .../PredicateExpressionsOptimizer.cpp | 5 +++ src/Storages/StorageMerge.cpp | 35 +++++++++++++++++-- src/Storages/VirtualColumnUtils.h | 1 - ...torage_merge_with_join_push_down.reference | 2 ++ ...1436_storage_merge_with_join_push_down.sql | 30 ++++++++++++++++ 6 files changed, 72 insertions(+), 5 deletions(-) create mode 100644 tests/queries/0_stateless/01436_storage_merge_with_join_push_down.reference create mode 100644 tests/queries/0_stateless/01436_storage_merge_with_join_push_down.sql diff --git a/src/Interpreters/JoinedTables.cpp b/src/Interpreters/JoinedTables.cpp index 127df9b5eac..d38a3fa68dc 100644 --- a/src/Interpreters/JoinedTables.cpp +++ b/src/Interpreters/JoinedTables.cpp @@ -28,6 +28,7 @@ namespace ErrorCodes { extern const int ALIAS_REQUIRED; extern const int AMBIGUOUS_COLUMN_NAME; + extern const int LOGICAL_ERROR; } namespace @@ -187,7 +188,8 @@ StoragePtr JoinedTables::getLeftTableStorage() bool JoinedTables::resolveTables() { tables_with_columns = getDatabaseAndTablesWithColumns(table_expressions, context); - assert(tables_with_columns.size() == table_expressions.size()); + if (tables_with_columns.size() != table_expressions.size()) + throw Exception("Unexpected tables count", ErrorCodes::LOGICAL_ERROR); const auto & settings = context.getSettingsRef(); if (settings.joined_subquery_requires_alias && tables_with_columns.size() > 1) diff --git a/src/Interpreters/PredicateExpressionsOptimizer.cpp b/src/Interpreters/PredicateExpressionsOptimizer.cpp index 3915a0f7f43..86bdec628cd 100644 --- a/src/Interpreters/PredicateExpressionsOptimizer.cpp +++ b/src/Interpreters/PredicateExpressionsOptimizer.cpp @@ -15,6 +15,7 @@ namespace DB namespace ErrorCodes { + extern const int LOGICAL_ERROR; } PredicateExpressionsOptimizer::PredicateExpressionsOptimizer( @@ -111,6 +112,10 @@ bool PredicateExpressionsOptimizer::tryRewritePredicatesToTables(ASTs & tables_e { bool is_rewrite_tables = false; + if (tables_element.size() != tables_predicates.size()) + throw Exception("Unexpected elements count in predicate push down: `set enable_optimize_predicate_expression = 0` to disable", + ErrorCodes::LOGICAL_ERROR); + for (size_t index = tables_element.size(); index > 0; --index) { size_t table_pos = index - 1; diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index a98d789a048..2d96a59392b 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -37,6 +37,27 @@ namespace ErrorCodes extern const int SAMPLING_NOT_SUPPORTED; } +namespace +{ + +/// Rewrite original query removing joined tables from it +void removeJoin(const ASTSelectQuery & select) +{ + const auto & tables = select.tables(); + if (!tables || tables->children.size() < 2) + return; + + const auto & joined_table = tables->children[1]->as(); + if (!joined_table.table_join) + return; + + /// The most simple temporary solution: leave only the first table in query. + /// TODO: we also need to remove joined columns and related functions (taking in account aliases if any). + tables->children.resize(1); +} + +} + StorageMerge::StorageMerge( const StorageID & table_id_, @@ -243,6 +264,9 @@ Pipe StorageMerge::createSources( SelectQueryInfo modified_query_info = query_info; modified_query_info.query = query_info.query->clone(); + /// Original query could contain JOIN but we need only the first joined table and its columns. + removeJoin(*modified_query_info.query->as()); + VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", table_name); Pipe pipe; @@ -428,9 +452,14 @@ Block StorageMerge::getQueryHeader( } case QueryProcessingStage::WithMergeableState: case QueryProcessingStage::Complete: - return InterpreterSelectQuery( - query_info.query, context, std::make_shared(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())), - SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); + { + auto query = query_info.query->clone(); + removeJoin(*query->as()); + + auto stream = std::make_shared( + metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())); + return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); + } } throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR); } diff --git a/src/Storages/VirtualColumnUtils.h b/src/Storages/VirtualColumnUtils.h index e1c7e400249..89b69eb79e3 100644 --- a/src/Storages/VirtualColumnUtils.h +++ b/src/Storages/VirtualColumnUtils.h @@ -26,7 +26,6 @@ void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & va /// Leave in the block only the rows that fit under the WHERE clause and the PREWHERE clause of the query. /// Only elements of the outer conjunction are considered, depending only on the columns present in the block. -/// Returns true if at least one row is discarded. void filterBlockWithQuery(const ASTPtr & query, Block & block, const Context & context); /// Extract from the input stream a set of `name` column values diff --git a/tests/queries/0_stateless/01436_storage_merge_with_join_push_down.reference b/tests/queries/0_stateless/01436_storage_merge_with_join_push_down.reference new file mode 100644 index 00000000000..aa47d0d46d4 --- /dev/null +++ b/tests/queries/0_stateless/01436_storage_merge_with_join_push_down.reference @@ -0,0 +1,2 @@ +0 +0 diff --git a/tests/queries/0_stateless/01436_storage_merge_with_join_push_down.sql b/tests/queries/0_stateless/01436_storage_merge_with_join_push_down.sql new file mode 100644 index 00000000000..a3c598c6d83 --- /dev/null +++ b/tests/queries/0_stateless/01436_storage_merge_with_join_push_down.sql @@ -0,0 +1,30 @@ +DROP TABLE IF EXISTS test1; +DROP TABLE IF EXISTS test1_distributed; +DROP TABLE IF EXISTS test_merge; + +SET enable_optimize_predicate_expression = 1; + +CREATE TABLE test1 (id Int64, name String) ENGINE MergeTree PARTITION BY (id) ORDER BY (id); +CREATE TABLE test1_distributed AS test1 ENGINE = Distributed(test_cluster_two_shards_localhost, default, test1); +CREATE TABLE test_merge AS test1 ENGINE = Merge('default', 'test1_distributed'); + +SELECT count() FROM test_merge +JOIN (SELECT 'anystring' AS name) AS n +USING name +WHERE id = 1; + +DROP TABLE test1; +DROP TABLE test_merge; + + +CREATE TABLE test1 (id Int64, name String) ENGINE MergeTree PARTITION BY (id) ORDER BY (id); +CREATE TABLE test_merge AS test1 ENGINE = Merge('default', 'test1'); + +SELECT count() FROM test_merge +JOIN (SELECT 'anystring' AS name) AS n +USING name +WHERE id = 1; + +DROP TABLE test1; +DROP TABLE test_merge; +DROP TABLE test1_distributed;