From ca1f336c4679fa2ac64ce6f025ad37f19c8b8a74 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 24 Jun 2021 23:25:06 +0000 Subject: [PATCH 1/4] Aliases for storageMerge fix --- src/Storages/StorageMerge.cpp | 101 +++++++++++++++--- src/Storages/StorageMerge.h | 12 ++- ...01925_test_storage_merge_aliases.reference | 10 ++ .../01925_test_storage_merge_aliases.sql | 57 ++++++++++ tests/queries/skip_list.json | 3 +- 5 files changed, 169 insertions(+), 14 deletions(-) create mode 100644 tests/queries/0_stateless/01925_test_storage_merge_aliases.reference create mode 100644 tests/queries/0_stateless/01925_test_storage_merge_aliases.sql diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 172805c08ed..7960ce19262 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -205,7 +206,7 @@ Pipe StorageMerge::read( if (selected_tables.empty()) /// FIXME: do we support sampling in this case? return createSources( - {}, query_info, processed_stage, max_block_size, header, {}, real_column_names, modified_context, 0, has_table_virtual_column); + {}, query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column); size_t tables_count = selected_tables.size(); Float64 num_streams_multiplier @@ -233,6 +234,9 @@ Pipe StorageMerge::read( query_info.input_order_info = input_sorting_info; } + auto sample_block = getInMemoryMetadataPtr()->getSampleBlock(); + Names required_columns; + for (const auto & table : selected_tables) { size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count); @@ -246,12 +250,60 @@ Pipe StorageMerge::read( if (query_info.query->as()->sampleSize() && !storage->supportsSampling()) throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED); + Aliases aliases; auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); + auto storage_columns = storage_metadata_snapshot->getColumns(); + + if (processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty()) + { + NameSet required_columns_set; + std::function extract_columns_from_alias_expression = [&](ASTPtr expr) + { + if (!expr) + return; + + if (typeid_cast(expr.get())) + return; + + if (const auto * ast_function = typeid_cast(expr.get())) + { + for (const auto & arg : ast_function->arguments->children) + extract_columns_from_alias_expression(arg); + } + else if (const auto * ast_identifier = typeid_cast(expr.get())) + { + auto column = ast_identifier->name(); + const auto column_default = storage_columns.getDefault(column); + bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias; + + if (is_alias) + { + auto alias_expression = column_default->expression; + auto type = sample_block.getByName(column).type; + aliases.push_back({ .name = column, .type = type, .expression = alias_expression }); + extract_columns_from_alias_expression(alias_expression); + } + else + { + required_columns_set.insert(column); + } + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected expression: {}", expr->getID()); + } + }; + + for (const auto & column : real_column_names) + extract_columns_from_alias_expression(std::make_shared(column)); + + required_columns = std::vector(required_columns_set.begin(), required_columns_set.end()); + } auto source_pipe = createSources( storage_metadata_snapshot, query_info, processed_stage, - max_block_size, header, table, real_column_names, modified_context, - current_streams, has_table_virtual_column); + max_block_size, header, aliases, table, required_columns.empty() ? real_column_names : required_columns, + modified_context, current_streams, has_table_virtual_column); pipes.emplace_back(std::move(source_pipe)); } @@ -272,6 +324,7 @@ Pipe StorageMerge::createSources( const QueryProcessingStage::Enum & processed_stage, const UInt64 max_block_size, const Block & header, + const Aliases & aliases, const StorageWithLockAndName & storage_with_lock, Names & real_column_names, ContextMutablePtr modified_context, @@ -369,7 +422,7 @@ Pipe StorageMerge::createSources( /// Subordinary tables could have different but convertible types, like numeric types of different width. /// We must return streams with structure equals to structure of Merge table. - convertingSourceStream(header, metadata_snapshot, modified_context, modified_query_info.query, pipe, processed_stage); + convertingSourceStream(header, metadata_snapshot, aliases, modified_context, modified_query_info.query, pipe, processed_stage); pipe.addTableLock(struct_lock); pipe.addStorageHolder(storage); @@ -492,6 +545,7 @@ void StorageMerge::alter( void StorageMerge::convertingSourceStream( const Block & header, const StorageMetadataPtr & metadata_snapshot, + const Aliases & aliases, ContextPtr local_context, ASTPtr & query, Pipe & pipe, @@ -499,16 +553,39 @@ void StorageMerge::convertingSourceStream( { Block before_block_header = pipe.getHeader(); - auto convert_actions_dag = ActionsDAG::makeConvertingActions( - pipe.getHeader().getColumnsWithTypeAndName(), - header.getColumnsWithTypeAndName(), - ActionsDAG::MatchColumnsMode::Name); - auto convert_actions = std::make_shared(convert_actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); + auto storage_sample_block = metadata_snapshot->getSampleBlock(); + auto pipe_columns = pipe.getHeader().getNamesAndTypesList(); - pipe.addSimpleTransform([&](const Block & stream_header) + for (const auto & alias : aliases) { - return std::make_shared(stream_header, convert_actions); - }); + pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type)); + ASTPtr expr = std::move(alias.expression); + expr->setAlias(alias.name); + + auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns); + auto expression_analyzer = ExpressionAnalyzer{alias.expression, syntax_result, local_context}; + + auto dag = std::make_shared(pipe_columns); + auto actions_dag = expression_analyzer.getActionsDAG(true, false); + auto actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); + + pipe.addSimpleTransform([&](const Block & stream_header) + { + return std::make_shared(stream_header, actions); + }); + } + + { + auto convert_actions_dag = ActionsDAG::makeConvertingActions(pipe.getHeader().getColumnsWithTypeAndName(), + header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name); + auto actions = std::make_shared(convert_actions_dag, ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); + pipe.addSimpleTransform([&](const Block & stream_header) + { + return std::make_shared(stream_header, actions); + }); + } + auto where_expression = query->as()->where(); diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index 2339716519c..a9f5b4b8a86 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -84,12 +84,22 @@ protected: const String & source_table_regexp_, ContextPtr context_); + struct AliasData + { + String name; + DataTypePtr type; + ASTPtr expression; + }; + + using Aliases = std::vector; + Pipe createSources( const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, UInt64 max_block_size, const Block & header, + const Aliases & aliaes, const StorageWithLockAndName & storage_with_lock, Names & real_column_names, ContextMutablePtr modified_context, @@ -98,7 +108,7 @@ protected: bool concat_streams = false); void convertingSourceStream( - const Block & header, const StorageMetadataPtr & metadata_snapshot, + const Block & header, const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases, ContextPtr context, ASTPtr & query, Pipe & pipe, QueryProcessingStage::Enum processed_stage); }; diff --git a/tests/queries/0_stateless/01925_test_storage_merge_aliases.reference b/tests/queries/0_stateless/01925_test_storage_merge_aliases.reference new file mode 100644 index 00000000000..b0fea25ed4b --- /dev/null +++ b/tests/queries/0_stateless/01925_test_storage_merge_aliases.reference @@ -0,0 +1,10 @@ +alias1 +1 4 16 23 +23 16 4 1 +2020-02-02 1 4 2 16 3 23 +alias2 +1 3 4 4 +4 4 3 1 +23 16 4 1 +2020-02-01 1 3 2 4 3 4 +2020-02-02 1 4 2 16 3 23 diff --git a/tests/queries/0_stateless/01925_test_storage_merge_aliases.sql b/tests/queries/0_stateless/01925_test_storage_merge_aliases.sql new file mode 100644 index 00000000000..b441358fd40 --- /dev/null +++ b/tests/queries/0_stateless/01925_test_storage_merge_aliases.sql @@ -0,0 +1,57 @@ +drop table if exists merge; +create table merge +( + dt Date, + colAlias0 Int32, + colAlias1 Int32, + col2 Int32, + colAlias2 UInt32, + col3 Int32, + colAlias3 UInt32 +) +engine = Merge(currentDatabase(), '^alias_'); + +drop table if exists alias_1; +drop table if exists alias_2; + +create table alias_1 +( + dt Date, + col Int32, + colAlias0 UInt32 alias col, + colAlias1 UInt32 alias col3 + colAlias0, + col2 Int32, + colAlias2 Int32 alias colAlias1 + col2 + 10, + col3 Int32, + colAlias3 Int32 alias colAlias2 + colAlias1 + col3 +) +engine = MergeTree() +order by (dt); + +insert into alias_1 (dt, col, col2, col3) values ('2020-02-02', 1, 2, 3); + +select 'alias1'; +select colAlias0, colAlias1, colAlias2, colAlias3 from alias_1; +select colAlias3, colAlias2, colAlias1, colAlias0 from merge; +select * from merge; + +create table alias_2 +( + dt Date, + col Int32, + col2 Int32, + colAlias0 UInt32 alias col, + colAlias3 Int32 alias col3 + colAlias0, + colAlias1 UInt32 alias colAlias0 + col2, + colAlias2 Int32 alias colAlias0 + colAlias1, + col3 Int32 +) +engine = MergeTree() +order by (dt); + +insert into alias_2 (dt, col, col2, col3) values ('2020-02-01', 1, 2, 3); + +select 'alias2'; +select colAlias0, colAlias1, colAlias2, colAlias3 from alias_2; +select colAlias3, colAlias2, colAlias1, colAlias0 from merge; +select * from merge; diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 78b8e3065ff..6b1b566aab7 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -842,6 +842,7 @@ "01870_modulo_partition_key", "01870_buffer_flush", // creates database "01889_postgresql_protocol_null_fields", - "01889_check_row_policy_defined_using_user_function" + "01889_check_row_policy_defined_using_user_function", + "01925_test_storage_merge_aliases" ] } From ded9007ca0b402481cf5ccc3f00f0aea078685a0 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 25 Jun 2021 07:00:15 +0000 Subject: [PATCH 2/4] Fix --- src/Storages/StorageMerge.cpp | 4 +++- .../queries/0_stateless/01925_test_storage_merge_aliases.sql | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 7960ce19262..e6c310544f1 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include @@ -267,6 +266,9 @@ Pipe StorageMerge::read( if (const auto * ast_function = typeid_cast(expr.get())) { + if (!ast_function->arguments) + return; + for (const auto & arg : ast_function->arguments->children) extract_columns_from_alias_expression(arg); } diff --git a/tests/queries/0_stateless/01925_test_storage_merge_aliases.sql b/tests/queries/0_stateless/01925_test_storage_merge_aliases.sql index b441358fd40..f3a5b2db62e 100644 --- a/tests/queries/0_stateless/01925_test_storage_merge_aliases.sql +++ b/tests/queries/0_stateless/01925_test_storage_merge_aliases.sql @@ -53,5 +53,5 @@ insert into alias_2 (dt, col, col2, col3) values ('2020-02-01', 1, 2, 3); select 'alias2'; select colAlias0, colAlias1, colAlias2, colAlias3 from alias_2; -select colAlias3, colAlias2, colAlias1, colAlias0 from merge; -select * from merge; +select colAlias3, colAlias2, colAlias1, colAlias0 from merge order by dt; +select * from merge order by dt; From c2ac9b6027e0bb9a0fc966b0287cf1b5b575b660 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 25 Jun 2021 14:30:58 +0000 Subject: [PATCH 3/4] Better --- src/Storages/StorageMerge.cpp | 64 ++++++++++++++--------------------- 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index e6c310544f1..a076393e7e6 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -11,6 +11,8 @@ #include #include #include +#include +#include #include #include #include @@ -255,51 +257,39 @@ Pipe StorageMerge::read( if (processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty()) { - NameSet required_columns_set; - std::function extract_columns_from_alias_expression = [&](ASTPtr expr) + auto syntax_result = TreeRewriter(local_context).analyzeSelect(query_info.query, TreeRewriterResult({}, storage, storage_metadata_snapshot)); + ASTPtr required_columns_expr_list = std::make_shared(); + + ASTPtr column_expr; + for (const auto & column : real_column_names) { - if (!expr) - return; + const auto column_default = storage_columns.getDefault(column); + bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias; - if (typeid_cast(expr.get())) - return; - - if (const auto * ast_function = typeid_cast(expr.get())) + if (is_alias) { - if (!ast_function->arguments) - return; + column_expr = column_default->expression->clone(); + replaceAliasColumnsInQuery(column_expr, storage_metadata_snapshot->getColumns(), + syntax_result->array_join_result_to_source, local_context); - for (const auto & arg : ast_function->arguments->children) - extract_columns_from_alias_expression(arg); - } - else if (const auto * ast_identifier = typeid_cast(expr.get())) - { - auto column = ast_identifier->name(); - const auto column_default = storage_columns.getDefault(column); - bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias; + auto column_description = storage_columns.get(column); + column_expr = addTypeConversionToAST(std::move(column_expr), column_description.type->getName(), + storage_metadata_snapshot->getColumns().getAll(), local_context); + column_expr = setAlias(column_expr, column); - if (is_alias) - { - auto alias_expression = column_default->expression; - auto type = sample_block.getByName(column).type; - aliases.push_back({ .name = column, .type = type, .expression = alias_expression }); - extract_columns_from_alias_expression(alias_expression); - } - else - { - required_columns_set.insert(column); - } + auto type = sample_block.getByName(column).type; + aliases.push_back({ .name = column, .type = type, .expression = column_expr->clone() }); } else - { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected expression: {}", expr->getID()); - } - }; + column_expr = std::make_shared(column); - for (const auto & column : real_column_names) - extract_columns_from_alias_expression(std::make_shared(column)); + required_columns_expr_list->children.emplace_back(std::move(column_expr)); + } - required_columns = std::vector(required_columns_set.begin(), required_columns_set.end()); + syntax_result = TreeRewriter(local_context).analyze(required_columns_expr_list, storage_columns.getAllPhysical(), + storage, storage_metadata_snapshot); + auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, local_context).getActionsDAG(true); + required_columns = alias_actions->getRequiredColumns().getNames(); } auto source_pipe = createSources( @@ -562,8 +552,6 @@ void StorageMerge::convertingSourceStream( { pipe_columns.emplace_back(NameAndTypePair(alias.name, alias.type)); ASTPtr expr = std::move(alias.expression); - expr->setAlias(alias.name); - auto syntax_result = TreeRewriter(local_context).analyze(expr, pipe_columns); auto expression_analyzer = ExpressionAnalyzer{alias.expression, syntax_result, local_context}; From 28c5a14922a2e578a0fbc616189b5cc439efc237 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Sat, 26 Jun 2021 15:50:15 +0300 Subject: [PATCH 4/4] Fix clang-tidy --- src/Storages/StorageMerge.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index a9f5b4b8a86..068008170ca 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -99,7 +99,7 @@ protected: const QueryProcessingStage::Enum & processed_stage, UInt64 max_block_size, const Block & header, - const Aliases & aliaes, + const Aliases & aliases, const StorageWithLockAndName & storage_with_lock, Names & real_column_names, ContextMutablePtr modified_context,