fix 'Not found column' error in MaterializeMySQL

This commit is contained in:
Alexander Tokmakov 2021-06-28 13:35:55 +03:00
parent 4913b18532
commit fd914f3d97
8 changed files with 56 additions and 17 deletions

View File

@ -389,6 +389,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.syntax_analyzer_result = syntax_analyzer_result;
if (storage && !query.final() && storage->needRewriteQueryWithFinal(syntax_analyzer_result->requiredSourceColumns()))
query.setFinal();
/// Save scalar sub queries's results in the query context
if (!options.only_analyze && context->hasQueryContext())
for (const auto & it : syntax_analyzer_result->getScalars())

View File

@ -438,4 +438,19 @@ ASTPtr & ASTSelectQuery::getExpression(Expression expr)
return children[positions[expr]];
}
void ASTSelectQuery::setFinal()
{
auto & tables_in_select_query = tables()->as<ASTTablesInSelectQuery &>();
if (tables_in_select_query.children.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tables list is empty, it's a bug");
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
if (!tables_element.table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no table expression, it's a bug");
tables_element.table_expression->as<ASTTableExpression &>().final = true;
}
}

View File

@ -92,6 +92,8 @@ public:
void addTableFunction(ASTPtr & table_function_ptr);
void updateTreeHashImpl(SipHash & hash_state) const override;
void setFinal();
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -272,6 +272,10 @@ public:
throw Exception("Method watch is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/// Returns true if FINAL modifier must be added to SELECT query depending on required columns.
/// It's needed for ReplacingMergeTree wrappers such as MaterializedMySQL and MaterializedPostrgeSQL
virtual bool needRewriteQueryWithFinal(const Names & /*column_names*/) const { return false; }
/** Read a set of columns from the table.
* Accepts a list of columns to read, as well as a description of the query,
* from which information can be extracted about how to retrieve data

View File

@ -35,6 +35,14 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora
setInMemoryMetadata(in_memory_metadata);
}
bool StorageMaterializeMySQL::needRewriteQueryWithFinal(const Names & column_names) const
{
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
Block nested_header = nested_metadata->getSampleBlock();
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
return std::find(column_names.begin(), column_names.end(), version_column.name) == column_names.end();
}
Pipe StorageMaterializeMySQL::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
@ -53,20 +61,6 @@ Pipe StorageMaterializeMySQL::read(
Block nested_header = nested_metadata->getSampleBlock();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>(); select_query && !column_names_set.count(version_column.name))
{
auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>();
if (!tables_in_select_query.children.empty())
{
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
if (tables_element.table_expression)
tables_element.table_expression->as<ASTTableExpression &>().final = true;
}
}
String filter_column_name;
Names require_columns_name = column_names;
@ -80,9 +74,6 @@ Pipe StorageMaterializeMySQL::read(
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
filter_column_name = expressions->children.back()->getColumnName();
for (const auto & column_name : column_names)
expressions->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
}
Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);

View File

@ -24,6 +24,8 @@ public:
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_);
bool needRewriteQueryWithFinal(const Names & /*column_names*/) const override;
Pipe read(
const Names & column_names, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info,
ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;

View File

@ -305,6 +305,13 @@ Pipe StorageMerge::createSources(
return pipe;
}
if (!modified_select.final() && storage->needRewriteQueryWithFinal(real_column_names))
{
/// NOTE: It may not work correctly in some cases, because query was analyzed without final.
/// However, it's needed for MaterializeMySQL and it's unlikely that someone will use it with Merge tables.
modified_select.setFinal();
}
auto storage_stage
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, metadata_snapshot, modified_query_info);
if (processed_stage <= storage_stage)

View File

@ -860,7 +860,22 @@ def move_to_prewhere_and_column_filtering(clickhouse_node, mysql_node, service_n
clickhouse_node.query("CREATE DATABASE cond_on_key_col ENGINE = MaterializeMySQL('{}:3306', 'cond_on_key_col', 'root', 'clickhouse')".format(service_name))
mysql_node.query("create table cond_on_key_col.products (id int primary key, product_id int not null, catalog_id int not null, brand_id int not null, name text)")
mysql_node.query("insert into cond_on_key_col.products (id, name, catalog_id, brand_id, product_id) values (915, 'ertyui', 5287, 15837, 0), (990, 'wer', 1053, 24390, 1), (781, 'qwerty', 1041, 1176, 2);")
mysql_node.query("create table cond_on_key_col.test (id int(11) NOT NULL AUTO_INCREMENT, a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, PRIMARY KEY (id)) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;")
mysql_node.query("insert into cond_on_key_col.test values (42, 123, 1);")
mysql_node.query("CREATE TABLE cond_on_key_col.balance_change_record (id bigint(20) NOT NULL AUTO_INCREMENT, type tinyint(4) DEFAULT NULL, value decimal(10,4) DEFAULT NULL, time timestamp NULL DEFAULT NULL, "
"initiative_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, passivity_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, "
"person_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, tenant_code varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, "
"created_time timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_time timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
"value_snapshot decimal(10,4) DEFAULT NULL, PRIMARY KEY (id), KEY balance_change_record_initiative_id (person_id) USING BTREE, "
"KEY type (type) USING BTREE, KEY balance_change_record_type (time) USING BTREE, KEY initiative_id (initiative_id) USING BTREE, "
"KEY balance_change_record_tenant_code (passivity_id) USING BTREE, KEY tenant_code (tenant_code) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=1691049 DEFAULT CHARSET=utf8")
mysql_node.query("insert into cond_on_key_col.balance_change_record values (123, 1, 3.14, null, 'qwe', 'asd', 'zxc', 'rty', null, null, 2.7);")
mysql_node.query("CREATE TABLE cond_on_key_col.test1 (id int(11) NOT NULL AUTO_INCREMENT, c1 varchar(32) NOT NULL, c2 varchar(32), PRIMARY KEY (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4")
mysql_node.query("insert into cond_on_key_col.test1(c1,c2) values ('a','b'), ('c', null);")
check_query(clickhouse_node, "SELECT DISTINCT P.id, P.name, P.catalog_id FROM cond_on_key_col.products P WHERE P.name ILIKE '%e%' and P.catalog_id=5287", '915\tertyui\t5287\n')
check_query(clickhouse_node, "select count(a) from cond_on_key_col.test where b = 1;", "1\n")
check_query(clickhouse_node, "select id from cond_on_key_col.balance_change_record where type=1;", "123\n")
check_query(clickhouse_node, "select count(c1) from cond_on_key_col.test1 where c2='b';", "1\n")
clickhouse_node.query("DROP DATABASE cond_on_key_col")
mysql_node.query("DROP DATABASE cond_on_key_col")