From fd914f3d9715df06bef5611591b7852b8a9dc66c Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 28 Jun 2021 13:35:55 +0300 Subject: [PATCH] fix 'Not found column' error in MaterializeMySQL --- src/Interpreters/InterpreterSelectQuery.cpp | 3 +++ src/Parsers/ASTSelectQuery.cpp | 15 +++++++++++ src/Parsers/ASTSelectQuery.h | 2 ++ src/Storages/IStorage.h | 4 +++ src/Storages/StorageMaterializeMySQL.cpp | 25 ++++++------------- src/Storages/StorageMaterializeMySQL.h | 2 ++ src/Storages/StorageMerge.cpp | 7 ++++++ .../materialize_with_ddl.py | 15 +++++++++++ 8 files changed, 56 insertions(+), 17 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 7cca527cbc1..052d6cce617 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -389,6 +389,9 @@ InterpreterSelectQuery::InterpreterSelectQuery( query_info.syntax_analyzer_result = syntax_analyzer_result; + if (storage && !query.final() && storage->needRewriteQueryWithFinal(syntax_analyzer_result->requiredSourceColumns())) + query.setFinal(); + /// Save scalar sub queries's results in the query context if (!options.only_analyze && context->hasQueryContext()) for (const auto & it : syntax_analyzer_result->getScalars()) diff --git a/src/Parsers/ASTSelectQuery.cpp b/src/Parsers/ASTSelectQuery.cpp index 84a2e1070d6..2e29f779e21 100644 --- a/src/Parsers/ASTSelectQuery.cpp +++ b/src/Parsers/ASTSelectQuery.cpp @@ -438,4 +438,19 @@ ASTPtr & ASTSelectQuery::getExpression(Expression expr) return children[positions[expr]]; } +void ASTSelectQuery::setFinal() +{ + auto & tables_in_select_query = tables()->as(); + + if (tables_in_select_query.children.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Tables list is empty, it's a bug"); + + auto & tables_element = tables_in_select_query.children[0]->as(); + + if (!tables_element.table_expression) + throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no table expression, it's a bug"); + + tables_element.table_expression->as().final = true; +} + } diff --git a/src/Parsers/ASTSelectQuery.h b/src/Parsers/ASTSelectQuery.h index e9aaa4ab83b..0269c9f037c 100644 --- a/src/Parsers/ASTSelectQuery.h +++ b/src/Parsers/ASTSelectQuery.h @@ -92,6 +92,8 @@ public: void addTableFunction(ASTPtr & table_function_ptr); void updateTreeHashImpl(SipHash & hash_state) const override; + void setFinal(); + protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0e8c7e0a263..f2695ed133f 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -272,6 +272,10 @@ public: throw Exception("Method watch is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + /// Returns true if FINAL modifier must be added to SELECT query depending on required columns. + /// It's needed for ReplacingMergeTree wrappers such as MaterializedMySQL and MaterializedPostrgeSQL + virtual bool needRewriteQueryWithFinal(const Names & /*column_names*/) const { return false; } + /** Read a set of columns from the table. * Accepts a list of columns to read, as well as a description of the query, * from which information can be extracted about how to retrieve data diff --git a/src/Storages/StorageMaterializeMySQL.cpp b/src/Storages/StorageMaterializeMySQL.cpp index 8e6f2e1ad63..a00efbd02b8 100644 --- a/src/Storages/StorageMaterializeMySQL.cpp +++ b/src/Storages/StorageMaterializeMySQL.cpp @@ -35,6 +35,14 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora setInMemoryMetadata(in_memory_metadata); } +bool StorageMaterializeMySQL::needRewriteQueryWithFinal(const Names & column_names) const +{ + const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr(); + Block nested_header = nested_metadata->getSampleBlock(); + ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1); + return std::find(column_names.begin(), column_names.end(), version_column.name) == column_names.end(); +} + Pipe StorageMaterializeMySQL::read( const Names & column_names, const StorageMetadataPtr & /*metadata_snapshot*/, @@ -53,20 +61,6 @@ Pipe StorageMaterializeMySQL::read( Block nested_header = nested_metadata->getSampleBlock(); ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2); - ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1); - - if (ASTSelectQuery * select_query = query_info.query->as(); select_query && !column_names_set.count(version_column.name)) - { - auto & tables_in_select_query = select_query->tables()->as(); - - if (!tables_in_select_query.children.empty()) - { - auto & tables_element = tables_in_select_query.children[0]->as(); - - if (tables_element.table_expression) - tables_element.table_expression->as().final = true; - } - } String filter_column_name; Names require_columns_name = column_names; @@ -80,9 +74,6 @@ Pipe StorageMaterializeMySQL::read( expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value)); filter_column_name = expressions->children.back()->getColumnName(); - - for (const auto & column_name : column_names) - expressions->children.emplace_back(std::make_shared(column_name)); } Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams); diff --git a/src/Storages/StorageMaterializeMySQL.h b/src/Storages/StorageMaterializeMySQL.h index 45221ed5b76..6d9d6f95445 100644 --- a/src/Storages/StorageMaterializeMySQL.h +++ b/src/Storages/StorageMaterializeMySQL.h @@ -24,6 +24,8 @@ public: StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_); + bool needRewriteQueryWithFinal(const Names & /*column_names*/) const override; + Pipe read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 172805c08ed..5c7b6d04d9c 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -305,6 +305,13 @@ Pipe StorageMerge::createSources( return pipe; } + if (!modified_select.final() && storage->needRewriteQueryWithFinal(real_column_names)) + { + /// NOTE: It may not work correctly in some cases, because query was analyzed without final. + /// However, it's needed for MaterializeMySQL and it's unlikely that someone will use it with Merge tables. + modified_select.setFinal(); + } + auto storage_stage = storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, metadata_snapshot, modified_query_info); if (processed_stage <= storage_stage) diff --git a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py index c5db90821e2..2d526477fde 100644 --- a/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py +++ b/tests/integration/test_materialize_mysql_database/materialize_with_ddl.py @@ -860,7 +860,22 @@ def move_to_prewhere_and_column_filtering(clickhouse_node, mysql_node, service_n clickhouse_node.query("CREATE DATABASE cond_on_key_col ENGINE = MaterializeMySQL('{}:3306', 'cond_on_key_col', 'root', 'clickhouse')".format(service_name)) mysql_node.query("create table cond_on_key_col.products (id int primary key, product_id int not null, catalog_id int not null, brand_id int not null, name text)") mysql_node.query("insert into cond_on_key_col.products (id, name, catalog_id, brand_id, product_id) values (915, 'ertyui', 5287, 15837, 0), (990, 'wer', 1053, 24390, 1), (781, 'qwerty', 1041, 1176, 2);") + mysql_node.query("create table cond_on_key_col.test (id int(11) NOT NULL AUTO_INCREMENT, a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, PRIMARY KEY (id)) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;") + mysql_node.query("insert into cond_on_key_col.test values (42, 123, 1);") + mysql_node.query("CREATE TABLE cond_on_key_col.balance_change_record (id bigint(20) NOT NULL AUTO_INCREMENT, type tinyint(4) DEFAULT NULL, value decimal(10,4) DEFAULT NULL, time timestamp NULL DEFAULT NULL, " + "initiative_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, passivity_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, " + "person_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, tenant_code varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, " + "created_time timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_time timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " + "value_snapshot decimal(10,4) DEFAULT NULL, PRIMARY KEY (id), KEY balance_change_record_initiative_id (person_id) USING BTREE, " + "KEY type (type) USING BTREE, KEY balance_change_record_type (time) USING BTREE, KEY initiative_id (initiative_id) USING BTREE, " + "KEY balance_change_record_tenant_code (passivity_id) USING BTREE, KEY tenant_code (tenant_code) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=1691049 DEFAULT CHARSET=utf8") + mysql_node.query("insert into cond_on_key_col.balance_change_record values (123, 1, 3.14, null, 'qwe', 'asd', 'zxc', 'rty', null, null, 2.7);") + mysql_node.query("CREATE TABLE cond_on_key_col.test1 (id int(11) NOT NULL AUTO_INCREMENT, c1 varchar(32) NOT NULL, c2 varchar(32), PRIMARY KEY (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4") + mysql_node.query("insert into cond_on_key_col.test1(c1,c2) values ('a','b'), ('c', null);") check_query(clickhouse_node, "SELECT DISTINCT P.id, P.name, P.catalog_id FROM cond_on_key_col.products P WHERE P.name ILIKE '%e%' and P.catalog_id=5287", '915\tertyui\t5287\n') + check_query(clickhouse_node, "select count(a) from cond_on_key_col.test where b = 1;", "1\n") + check_query(clickhouse_node, "select id from cond_on_key_col.balance_change_record where type=1;", "123\n") + check_query(clickhouse_node, "select count(c1) from cond_on_key_col.test1 where c2='b';", "1\n") clickhouse_node.query("DROP DATABASE cond_on_key_col") mysql_node.query("DROP DATABASE cond_on_key_col")