From 9ac401573f5c5db6c4ba72c1baca384be9ea0dcd Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 8 Aug 2019 18:18:28 +0300 Subject: [PATCH] Added InterpreterSelectQuery::getSampleBlockImpl. Disable dry_run. --- .../Interpreters/InterpreterSelectQuery.cpp | 98 +++++++++---------- .../src/Interpreters/InterpreterSelectQuery.h | 11 +-- .../InterpreterSelectWithUnionQuery.h | 1 + dbms/src/Storages/StorageDistributed.cpp | 18 +--- dbms/src/Storages/StorageMerge.cpp | 33 ++++--- dbms/src/Storages/StorageMerge.h | 2 +- 6 files changed, 72 insertions(+), 91 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 782dcddf580..9c10c317d28 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -270,7 +270,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( String database_name; String table_name; - getDatabaseAndTableNames(database_name, table_name); + getDatabaseAndTableNames(query, database_name, table_name, context); if (auto view_source = context.getViewSource()) { @@ -344,17 +344,16 @@ InterpreterSelectQuery::InterpreterSelectQuery( source_header = storage->getSampleBlockForColumns(required_columns); /// Calculate structure of the result. - { - Pipeline pipeline; - executeImpl(pipeline, nullptr, true); - result_header = pipeline.firstStream()->getHeader(); - } + result_header = getSampleBlockImpl(); + for (auto & col : result_header) + if (!col.column) + col.column = col.type->createColumn(); } -void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, String & table_name) +void InterpreterSelectQuery::getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context) { - if (auto db_and_table = getDatabaseAndTable(getSelectQuery(), 0)) + if (auto db_and_table = getDatabaseAndTable(query, 0)) { table_name = db_and_table->table; database_name = db_and_table->database; @@ -403,60 +402,35 @@ QueryPipeline InterpreterSelectQuery::executeWithProcessors() } -Block InterpreterSelectQuery::getHeaderForExecutionStep( - const ASTPtr & query_ptr, - const StoragePtr & storage, - QueryProcessingStage::Enum stage, - size_t subquery_depth, - const Context & context, - const PrewhereInfoPtr & prewhere_info) +Block InterpreterSelectQuery::getSampleBlockImpl() { - SelectQueryOptions options(stage, subquery_depth); - options.only_analyze = true; - - Names required_result_column_names; - - /// TODO: remove it. - auto query = query_ptr->clone(); - - auto syntax_analyzer_result = SyntaxAnalyzer(context, options).analyze( - query, {}, required_result_column_names, storage); - - auto query_analyzer = ExpressionAnalyzer( - query, syntax_analyzer_result, context, NamesAndTypesList(), - NameSet(required_result_column_names.begin(), required_result_column_names.end()), - options.subquery_depth, !options.only_analyze); - - if (stage == QueryProcessingStage::Enum::FetchColumns) - { - auto required_columns = query_analyzer.getRequiredSourceColumns(); - auto header = storage->getSampleBlockForColumns(required_columns); - - if (prewhere_info) - { - prewhere_info->prewhere_actions->execute(header); - header = materializeBlock(header); - if (prewhere_info->remove_prewhere_column) - header.erase(prewhere_info->prewhere_column_name); - } - return header; - } - FilterInfoPtr filter_info; - auto & select_query = query->as(); - auto analysis_result = analyzeExpressions( - select_query, - query_analyzer, + getSelectQuery(), + *query_analyzer, QueryProcessingStage::Enum::FetchColumns, - stage, + options.to_stage, context, storage, true, filter_info); - if (stage == QueryProcessingStage::Enum::WithMergeableState) + if (options.to_stage == QueryProcessingStage::Enum::FetchColumns) + { + auto header = source_header; + + if (analysis_result.prewhere_info) + { + analysis_result.prewhere_info->prewhere_actions->execute(header); + header = materializeBlock(header); + if (analysis_result.prewhere_info->remove_prewhere_column) + header.erase(analysis_result.prewhere_info->prewhere_column_name); + } + return header; + } + + if (options.to_stage == QueryProcessingStage::Enum::WithMergeableState) { if (!analysis_result.need_aggregate) return analysis_result.before_order_and_select->getSampleBlock(); @@ -465,7 +439,7 @@ Block InterpreterSelectQuery::getHeaderForExecutionStep( Names key_names; AggregateDescriptions aggregates; - query_analyzer.getAggregateInfo(key_names, aggregates); + query_analyzer->getAggregateInfo(key_names, aggregates); Block res; @@ -524,6 +498,24 @@ InterpreterSelectQuery::analyzeExpressions( { chain.finalize(); + /// Check that actions on current step are valid. + /// Now this in needed for mutations to check in mutation is valid before execute it in background. + /// Because some functions only checking correctness of constant arguments during execution, + /// but not in getReturnType method (e.g. compare date with constant string). + if (dry_run) + { + for (auto & step : chain.steps) + { + auto step_required_columns = step.actions->getRequiredColumnsWithTypes(); + + Block sample; + for (auto & col : step_required_columns) + sample.insert({col.type->createColumn(), col.type, col.name}); + + step.actions->execute(sample); + } + } + if (has_prewhere) { const ExpressionActionsChain::Step & step = chain.steps.at(0); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 461daeb5ec2..87a4fdbb31c 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -79,14 +79,6 @@ public: ASTPtr getQuery() const { return query_ptr; } - static Block getHeaderForExecutionStep( - const ASTPtr & query, - const StoragePtr & storage, - QueryProcessingStage::Enum stage, - size_t subquery_depth, - const Context & context, - const PrewhereInfoPtr & prewhere_info); - private: InterpreterSelectQuery( const ASTPtr & query_ptr_, @@ -98,6 +90,7 @@ private: ASTSelectQuery & getSelectQuery() { return query_ptr->as(); } + Block getSampleBlockImpl(); struct Pipeline { @@ -192,7 +185,7 @@ private: /** From which table to read. With JOIN, the "left" table is returned. */ - void getDatabaseAndTableNames(String & database_name, String & table_name); + static void getDatabaseAndTableNames(const ASTSelectQuery & query, String & database_name, String & table_name, const Context & context); /// Different stages of query execution. diff --git a/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h b/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h index 9f2a4a96494..aa5a763feaa 100644 --- a/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h @@ -50,6 +50,7 @@ private: Context context; std::vector> nested_interpreters; + Blocks nested_headers; Block result_header; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index c75db697bad..6155dabd028 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -301,22 +301,8 @@ BlockInputStreams StorageDistributed::read( const auto & modified_query_ast = rewriteSelectQuery( query_info.query, remote_database, remote_table, remote_table_function_ptr); - StoragePtr tmp_storage; - - if (remote_table_function_ptr) - tmp_storage = context.getQueryContext().executeTableFunction(remote_table_function_ptr); - else - tmp_storage = context.getTable(remote_database, remote_table); - - Block header = - //InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock()); - InterpreterSelectQuery::getHeaderForExecutionStep(query_info.query, tmp_storage, processed_stage, 0, context, query_info.prewhere_info); - - /// Create empty columns for header. - /// All columns must be empty, because otherwise (by some reason) remote query can return one excessive row. - /// So, all columns are recreated. - for (auto & col : header) - col.column = col.type->createColumn(); + Block header = materializeBlock( + InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage)).getSampleBlock()); ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr ? ClusterProxy::SelectStreamFactory( diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index 3b1a1540447..3487a1becf5 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -190,7 +190,7 @@ BlockInputStreams StorageMerge::read( modified_context.getSettingsRef().optimize_move_to_prewhere = false; /// What will be result structure depending on query processed stage in source tables? - Block header = getQueryHeader(query_info, context, processed_stage); + Block header = getQueryHeader(column_names, query_info, context, processed_stage); /** First we make list of selected tables to find out its size. * This is necessary to correctly pass the recommended number of threads to each table. @@ -407,20 +407,29 @@ void StorageMerge::alter( } Block StorageMerge::getQueryHeader( - const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage) + const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage) { - auto storage = shared_from_this(); - auto header = InterpreterSelectQuery::getHeaderForExecutionStep(query_info.query, storage, processed_stage, 0, context, query_info.prewhere_info); - - for (auto & col : header) + switch (processed_stage) { - if (!col.column) - col.column = col.type->createColumn(); - else - col.column = col.column->convertToFullColumnIfConst(); + case QueryProcessingStage::FetchColumns: + { + Block header = getSampleBlockForColumns(column_names); + if (query_info.prewhere_info) + { + query_info.prewhere_info->prewhere_actions->execute(header); + header = materializeBlock(header); + if (query_info.prewhere_info->remove_prewhere_column) + header.erase(query_info.prewhere_info->prewhere_column_name); + } + return header; + } + case QueryProcessingStage::WithMergeableState: + case QueryProcessingStage::Complete: + return materializeBlock(InterpreterSelectQuery( + query_info.query, context, std::make_shared(getSampleBlockForColumns(column_names)), + SelectQueryOptions(processed_stage).analyze()).getSampleBlock()); } - - return header; + throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR); } void StorageMerge::convertingSourceStream(const Block & header, const Context & context, ASTPtr & query, diff --git a/dbms/src/Storages/StorageMerge.h b/dbms/src/Storages/StorageMerge.h index 03c28fde0b4..6d02ad029cc 100644 --- a/dbms/src/Storages/StorageMerge.h +++ b/dbms/src/Storages/StorageMerge.h @@ -79,7 +79,7 @@ protected: const String & table_name_regexp_, const Context & context_); - Block getQueryHeader(const SelectQueryInfo & query_info, + Block getQueryHeader(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage); BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,