diff --git a/contrib/poco b/contrib/poco index 930a7ec1154..2d5a158303a 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit 930a7ec1154f4f9711edfb4b4a39f9fff2a5bbb5 +Subproject commit 2d5a158303adf9d47b980cdcfdb26cee1460704e diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 9ddfe9f5d87..53ec0796ec1 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -235,9 +235,9 @@ BlockInputStreams InterpreterSelectQuery::executeWithMultipleStreams() } -InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage, - ExpressionActionsChain & chain) +InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage) { + ExpressionActionsChain chain; AnalysisResult res; /// Do I need to perform the first part of the pipeline - running on remote servers during distributed processing. @@ -253,8 +253,16 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression */ std::shared_ptr remove_where_filter; + std::shared_ptr remove_prewhere_filter; { + if (query_analyzer->appendPrewhere(chain, false, remove_prewhere_filter)) + { + res.prewhere_info = std::make_shared( + chain.steps.front().actions, query.prewhere_expression->getColumnName()); + chain.addStep(); + } + res.need_aggregate = query_analyzer->hasAggregation(); query_analyzer->appendArrayJoin(chain, !res.first_stage); @@ -311,6 +319,9 @@ InterpreterSelectQuery::AnalysisResult InterpreterSelectQuery::analyzeExpression chain.clear(); } + if (res.prewhere_info) + res.prewhere_info->remove_prewhere_column = *remove_prewhere_filter; + /// Before executing WHERE and HAVING, remove the extra columns from the block (mostly the aggregation keys). if (res.has_where) { @@ -343,13 +354,14 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt AnalysisResult expressions; { - ExpressionActionsChain chain; - PrewhereInfoPtr prewhere_info; - std::shared_ptr remove_prewhere_filter; + QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; + if (storage) + from_stage = storage->getQueryProcessingStage(context); + + expressions = analyzeExpressions(from_stage); /** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */ - QueryProcessingStage::Enum from_stage = executeFetchColumns(pipeline, dry_run, chain, - prewhere_info, remove_prewhere_filter); + executeFetchColumns(from_stage, pipeline, dry_run, expressions.prewhere_info); if (from_stage == QueryProcessingStage::WithMergeableState && to_stage == QueryProcessingStage::WithMergeableState) @@ -358,11 +370,6 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt if (!dry_run) LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage)); - - expressions = analyzeExpressions(from_stage, chain); - - if (prewhere_info) - prewhere_info->remove_prewhere_column = *remove_prewhere_filter; } const Settings & settings = context.getSettingsRef(); @@ -528,9 +535,8 @@ static void getLimitLengthAndOffset(ASTSelectQuery & query, size_t & length, siz } } -QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns( - Pipeline & pipeline, bool dry_run, ExpressionActionsChain & chain, - PrewhereInfoPtr & prewhere_info, std::shared_ptr & remove_prewhere_filter) +void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum from_stage, Pipeline & pipeline, + bool dry_run, const PrewhereInfoPtr & prewhere_info) { /// List of columns to read to execute the query. Names required_columns = query_analyzer->getRequiredSourceColumns(); @@ -648,8 +654,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns( max_streams = 1; } - QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; - query_analyzer->makeSetsForIndex(); /// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery or prepared input? @@ -680,6 +684,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns( SelectQueryInfo query_info; query_info.query = query_ptr; query_info.sets = query_analyzer->getPreparedSets(); + query_info.prewhere_info = prewhere_info; /// PREWHERE optimization { @@ -696,13 +701,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns( optimize_prewhere(*merge_tree); } - if (!dry_run && query_analyzer->appendPrewhere(chain, false, remove_prewhere_filter)) - { - query_info.prewhere_info = prewhere_info = std::make_shared( - chain.steps.front().actions, query.prewhere_expression->getColumnName()); - chain.addStep(); - } - if (!dry_run) pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams); @@ -711,8 +709,10 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns( pipeline.streams.emplace_back(std::make_shared(storage->getSampleBlockForColumns(required_columns))); if (query_info.prewhere_info) - pipeline.streams.back() = std::make_shared(pipeline.streams.back(), - query_info.prewhere_info); + pipeline.streams.back() = std::make_shared( + pipeline.streams.back(), prewhere_info->prewhere_actions, + prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column + ); } pipeline.transform([&](auto & stream) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 29cf2a86ada..2d6699d37be 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -144,9 +144,10 @@ private: bool second_stage = false; SubqueriesForSets subqueries_for_sets; + PrewhereInfoPtr prewhere_info; }; - AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage, ExpressionActionsChain & chain); + AnalysisResult analyzeExpressions(QueryProcessingStage::Enum from_stage); /** From which table to read. With JOIN, the "left" table is returned. @@ -158,10 +159,9 @@ private: /// dry_run - don't read from table, use empty header block instead. void executeWithMultipleStreamsImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run); - /// Fetch data from the table. Returns the stage to which the query was processed in Storage. - QueryProcessingStage::Enum executeFetchColumns(Pipeline & pipeline, bool dry_run, ExpressionActionsChain & chain, - PrewhereInfoPtr & prewhere_info, - std::shared_ptr & remove_prewhere_filter); + /// Fetch data from the table. + void executeFetchColumns(QueryProcessingStage::Enum processing_stage, Pipeline & pipeline, bool dry_run, + const PrewhereInfoPtr & prewhere_info); void executeWhere(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool remove_filter); void executeAggregation(Pipeline & pipeline, const ExpressionActionsPtr & expression, bool overflow_row, bool final); diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 74db0604147..8dce2f8892b 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -150,6 +150,11 @@ public: return res; } + /** Returns stage to which query is going to be processed in read() function. + * (Normally, the function only reads the columns from the list, but in other cases, + * for example, the request can be partially processed on a remote server.) + */ + virtual QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const { return QueryProcessingStage::FetchColumns; } /** Read a set of columns from the table. * Accepts a list of columns to read, as well as a description of the query, @@ -157,9 +162,7 @@ public: * (indexes, locks, etc.) * Returns a stream with which you can read data sequentially * or multiple streams for parallel data reading. - * The `processed_stage` info is also written to what stage the request was processed. - * (Normally, the function only reads the columns from the list, but in other cases, - * for example, the request can be partially processed on a remote server.) + * The `processed_stage` must be the result of getQueryProcessingStage() function. * * context contains settings for one query. * Usually Storage does not care about these settings, since they are used in the interpreter. @@ -174,7 +177,7 @@ public: const Names & /*column_names*/, const SelectQueryInfo & /*query_info*/, const Context & /*context*/, - QueryProcessingStage::Enum & /*processed_stage*/, + QueryProcessingStage::Enum /*processed_stage*/, size_t /*max_block_size*/, unsigned /*num_streams*/) { @@ -303,6 +306,15 @@ protected: using ITableDeclaration::ITableDeclaration; using std::enable_shared_from_this::shared_from_this; + void checkQueryProcessingStage(QueryProcessingStage::Enum processed_stage, const Context & context) + { + auto expected = getQueryProcessingStage(context); + if (processed_stage != expected) + throw Exception("Unexpected query processing stage for storage " + getName() + + ": expected " + QueryProcessingStage::toString(expected) + + ", got " + QueryProcessingStage::toString(processed_stage), ErrorCodes::LOGICAL_ERROR); + } + private: friend class TableStructureReadLock; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 441d41b096c..9797613de1b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -134,7 +134,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( const Names & column_names_to_return, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, const size_t max_block_size, const unsigned num_streams, Int64 max_block_number_to_read) const @@ -193,7 +192,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::read( std::multiset part_values = VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_part"); data.check(real_column_names); - processed_stage = QueryProcessingStage::FetchColumns; const Settings & settings = context.getSettingsRef(); SortDescription sort_descr = data.getPrimarySortDescription(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 5d5faad2f32..1d6f46e718a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -26,7 +26,6 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned num_streams, Int64 max_block_number_to_read) const; diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 8688d191b9a..19bf6ac30ef 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -102,15 +102,30 @@ private: }; +QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const +{ + if (!no_destination) + { + auto destination = context.getTable(destination_database, destination_table); + + if (destination.get() == this) + throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); + + return destination->getQueryProcessingStage(context); + } + + return QueryProcessingStage::FetchColumns; +} + BlockInputStreams StorageBuffer::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) { - processed_stage = QueryProcessingStage::FetchColumns; + checkQueryProcessingStage(processed_stage, context); BlockInputStreams streams_from_dst; diff --git a/dbms/src/Storages/StorageBuffer.h b/dbms/src/Storages/StorageBuffer.h index ea8a0562628..2d4cb37876b 100644 --- a/dbms/src/Storages/StorageBuffer.h +++ b/dbms/src/Storages/StorageBuffer.h @@ -53,11 +53,13 @@ public: std::string getName() const override { return "Buffer"; } std::string getTableName() const override { return name; } + QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override; + BlockInputStreams read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageCatBoostPool.cpp b/dbms/src/Storages/StorageCatBoostPool.cpp index 9fa8133617c..d7fb2d1d906 100644 --- a/dbms/src/Storages/StorageCatBoostPool.cpp +++ b/dbms/src/Storages/StorageCatBoostPool.cpp @@ -259,10 +259,12 @@ void StorageCatBoostPool::createSampleBlockAndColumns() BlockInputStreams StorageCatBoostPool::read(const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, - QueryProcessingStage::Enum & /*processed_stage*/, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned /*threads*/) { + checkQueryProcessingStage(processed_stage, context); + auto stream = std::make_shared( data_description_file_name, "TSV", sample_block, context, max_block_size); diff --git a/dbms/src/Storages/StorageCatBoostPool.h b/dbms/src/Storages/StorageCatBoostPool.h index 34aed30693c..9cc6e72dbdc 100644 --- a/dbms/src/Storages/StorageCatBoostPool.h +++ b/dbms/src/Storages/StorageCatBoostPool.h @@ -17,7 +17,7 @@ public: BlockInputStreams read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned threads) override; diff --git a/dbms/src/Storages/StorageDictionary.cpp b/dbms/src/Storages/StorageDictionary.cpp index e4f2a154474..90832549ae0 100644 --- a/dbms/src/Storages/StorageDictionary.cpp +++ b/dbms/src/Storages/StorageDictionary.cpp @@ -36,13 +36,13 @@ StorageDictionary::StorageDictionary( BlockInputStreams StorageDictionary::read( const Names & column_names, - const SelectQueryInfo & /*query_info*/, + const SelectQueryInfo query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, const size_t max_block_size, const unsigned /*threads*/) { - processed_stage = QueryProcessingStage::FetchColumns; + checkQueryProcessingStage(processed_stage, context); auto dictionary = context.getExternalDictionaries().getDictionary(dictionary_name); return BlockInputStreams{dictionary->getBlockInputStream(column_names, max_block_size)}; } diff --git a/dbms/src/Storages/StorageDictionary.h b/dbms/src/Storages/StorageDictionary.h index 2e47c5fbe0d..cffaf8879cd 100644 --- a/dbms/src/Storages/StorageDictionary.h +++ b/dbms/src/Storages/StorageDictionary.h @@ -27,7 +27,7 @@ public: BlockInputStreams read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1) override; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 0521a8d244f..7ef991d983c 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -175,14 +175,7 @@ StoragePtr StorageDistributed::createWithOwnCluster( return res; } - -BlockInputStreams StorageDistributed::read( - const Names & /*column_names*/, - const SelectQueryInfo & query_info, - const Context & context, - QueryProcessingStage::Enum & processed_stage, - const size_t /*max_block_size*/, - const unsigned /*num_streams*/) +QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context & context) const { auto cluster = getCluster(); @@ -193,11 +186,24 @@ BlockInputStreams StorageDistributed::read( size_t result_size = (num_remote_shards * settings.max_parallel_replicas) + num_local_shards; if (settings.distributed_group_by_no_merge) - processed_stage = QueryProcessingStage::Complete; + return QueryProcessingStage::Complete; else /// Normal mode. - processed_stage = result_size == 1 - ? QueryProcessingStage::Complete - : QueryProcessingStage::WithMergeableState; + return result_size == 1 ? QueryProcessingStage::Complete + : QueryProcessingStage::WithMergeableState; +} + +BlockInputStreams StorageDistributed::read( + const Names & /*column_names*/, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + const size_t /*max_block_size*/, + const unsigned /*num_streams*/) +{ + checkQueryProcessingStage(processed_stage, context); + + auto cluster = getCluster(); + const Settings & settings = context.getSettingsRef(); const auto & modified_query_ast = rewriteSelectQuery( query_info.query, remote_database, remote_table); diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index 41a11fa3199..27060543e48 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -52,11 +52,13 @@ public: bool isRemote() const override { return true; } + QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override; + BlockInputStreams read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageFile.cpp b/dbms/src/Storages/StorageFile.cpp index 2f606d5cbcf..73177198b54 100644 --- a/dbms/src/Storages/StorageFile.cpp +++ b/dbms/src/Storages/StorageFile.cpp @@ -170,10 +170,11 @@ BlockInputStreams StorageFile::read( const Names & /*column_names*/, const SelectQueryInfo & /*query_info*/, const Context & context, - QueryProcessingStage::Enum & /*processed_stage*/, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned /*num_streams*/) { + checkQueryProcessingStage(processed_stage, context); return BlockInputStreams(1, std::make_shared(*this, context, max_block_size)); } diff --git a/dbms/src/Storages/StorageFile.h b/dbms/src/Storages/StorageFile.h index 4292ea6d7f9..6716dc306a4 100644 --- a/dbms/src/Storages/StorageFile.h +++ b/dbms/src/Storages/StorageFile.h @@ -35,7 +35,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageKafka.cpp b/dbms/src/Storages/StorageKafka.cpp index 80e4942839e..cebd6f9b5c2 100644 --- a/dbms/src/Storages/StorageKafka.cpp +++ b/dbms/src/Storages/StorageKafka.cpp @@ -236,12 +236,12 @@ BlockInputStreams StorageKafka::read( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) { + checkQueryProcessingStage(processed_stage, context); check(column_names); - processed_stage = QueryProcessingStage::FetchColumns; if (num_consumers == 0) return BlockInputStreams(); diff --git a/dbms/src/Storages/StorageKafka.h b/dbms/src/Storages/StorageKafka.h index 45530517e94..09c00a28248 100644 --- a/dbms/src/Storages/StorageKafka.h +++ b/dbms/src/Storages/StorageKafka.h @@ -39,7 +39,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 04d9f18ffd1..7e080687af3 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -491,12 +491,12 @@ BlockInputStreams StorageLog::read( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) { + checkQueryProcessingStage(processed_stage, context); check(column_names); - processed_stage = QueryProcessingStage::FetchColumns; loadMarks(); NamesAndTypesList all_columns = Nested::collect(getColumns().getAllPhysical().addTypes(column_names)); diff --git a/dbms/src/Storages/StorageLog.h b/dbms/src/Storages/StorageLog.h index b52a90ec563..2e3fae4a4fb 100644 --- a/dbms/src/Storages/StorageLog.h +++ b/dbms/src/Storages/StorageLog.h @@ -30,7 +30,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageMaterializedView.cpp b/dbms/src/Storages/StorageMaterializedView.cpp index c008eecb1c4..3e5b24266ca 100644 --- a/dbms/src/Storages/StorageMaterializedView.cpp +++ b/dbms/src/Storages/StorageMaterializedView.cpp @@ -170,11 +170,16 @@ bool StorageMaterializedView::hasColumn(const String & column_name) const return getTargetTable()->hasColumn(column_name); } +QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(const Context & context) const +{ + return getTargetTable()->getQueryProcessingStage(context); +} + BlockInputStreams StorageMaterializedView::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, const size_t max_block_size, const unsigned num_streams) { diff --git a/dbms/src/Storages/StorageMaterializedView.h b/dbms/src/Storages/StorageMaterializedView.h index 7edffe28d76..4e11829e902 100644 --- a/dbms/src/Storages/StorageMaterializedView.h +++ b/dbms/src/Storages/StorageMaterializedView.h @@ -41,11 +41,13 @@ public: void shutdown() override; bool checkTableCanBeDropped() const override; + QueryProcessingStage::Enum getQueryProcessingStage(const Context & context) const override; + BlockInputStreams read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageMemory.cpp b/dbms/src/Storages/StorageMemory.cpp index 7b36796c324..176c275de9b 100644 --- a/dbms/src/Storages/StorageMemory.cpp +++ b/dbms/src/Storages/StorageMemory.cpp @@ -83,11 +83,12 @@ StorageMemory::StorageMemory(String table_name_, ColumnsDescription columns_desc BlockInputStreams StorageMemory::read( const Names & column_names, const SelectQueryInfo & /*query_info*/, - const Context & /*context*/, - QueryProcessingStage::Enum & processed_stage, + const Context & context, + QueryProcessingStage::Enum processed_stage, size_t /*max_block_size*/, unsigned num_streams) { + checkQueryProcessingStage(processed_stage, context); check(column_names); processed_stage = QueryProcessingStage::FetchColumns; diff --git a/dbms/src/Storages/StorageMemory.h b/dbms/src/Storages/StorageMemory.h index 454fdcc2823..7aad88d980c 100644 --- a/dbms/src/Storages/StorageMemory.h +++ b/dbms/src/Storages/StorageMemory.h @@ -32,7 +32,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index 38a03b73637..3962a573a40 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -132,11 +132,46 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand) cons } +QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & context) const +{ + auto stage_in_source_tables = QueryProcessingStage::FetchColumns; + + auto database = context.getDatabase(source_database); + auto iterator = database->getIterator(context); + + bool first = true; + + while (iterator->isValid()) + { + if (table_name_regexp.match(iterator->name())) + { + auto & table = iterator->table(); + if (table.get() != this) + { + auto stage = table->getQueryProcessingStage(context); + + if (first) + stage_in_source_tables = stage; + else if (stage != stage_in_source_tables) + throw Exception("Source tables for Merge table are processing data up to different stages", + ErrorCodes::INCOMPATIBLE_SOURCE_TABLES); + + first = false; + } + } + + iterator->next(); + } + + return stage_in_source_tables; +} + + BlockInputStreams StorageMerge::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, const size_t max_block_size, const unsigned num_streams) { @@ -156,8 +191,6 @@ BlockInputStreams StorageMerge::read( real_column_names.push_back(name); } - std::optional processed_stage_in_source_tables; - /** First we make list of selected tables to find out its size. * This is necessary to correctly pass the recommended number of threads to each table. */ @@ -218,24 +251,17 @@ BlockInputStreams StorageMerge::read( if (curr_table_number < num_streams) { - QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage; source_streams = table->read( real_column_names, modified_query_info, modified_context, - processed_stage_in_source_table, + processed_stage, max_block_size, tables_count >= num_streams ? 1 : (num_streams / tables_count)); - if (!processed_stage_in_source_tables) - processed_stage_in_source_tables.emplace(processed_stage_in_source_table); - else if (processed_stage_in_source_table != *processed_stage_in_source_tables) - throw Exception("Source tables for Merge table are processing data up to different stages", - ErrorCodes::INCOMPATIBLE_SOURCE_TABLES); - if (!header) { - switch (processed_stage_in_source_table) + switch (processed_stage) { case QueryProcessingStage::FetchColumns: header = getSampleBlockForColumns(column_names); @@ -263,25 +289,17 @@ BlockInputStreams StorageMerge::read( } else { - if (!processed_stage_in_source_tables) - throw Exception("Logical error: unknown processed stage in source tables", ErrorCodes::LOGICAL_ERROR); - /// If many streams, initialize it lazily, to avoid long delay before start of query processing. source_streams.emplace_back(std::make_shared(header, [=]() -> BlockInputStreamPtr { - QueryProcessingStage::Enum processed_stage_in_source_table = processed_stage; BlockInputStreams streams = table->read( real_column_names, modified_query_info, modified_context, - processed_stage_in_source_table, + processed_stage, max_block_size, 1); - if (processed_stage_in_source_table != *processed_stage_in_source_tables) - throw Exception("Source tables for Merge table are processing data up to different stages", - ErrorCodes::INCOMPATIBLE_SOURCE_TABLES); - if (streams.empty()) { return std::make_shared(header); @@ -305,9 +323,6 @@ BlockInputStreams StorageMerge::read( res.insert(res.end(), source_streams.begin(), source_streams.end()); } - if (processed_stage_in_source_tables) - processed_stage = *processed_stage_in_source_tables; - if (res.empty()) return res; diff --git a/dbms/src/Storages/StorageMerge.h b/dbms/src/Storages/StorageMerge.h index e48cfc1d1fc..beb85dfef5a 100644 --- a/dbms/src/Storages/StorageMerge.h +++ b/dbms/src/Storages/StorageMerge.h @@ -29,11 +29,13 @@ public: NameAndTypePair getColumn(const String & column_name) const override; bool hasColumn(const String & column_name) const override; + QueryProcessingStage::Enum getQueryProcessingStage(const Context &) const override; + BlockInputStreams read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index da205e9a293..41bd62cc82b 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -101,11 +101,12 @@ BlockInputStreams StorageMergeTree::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, const size_t max_block_size, const unsigned num_streams) { - return reader.read(column_names, query_info, context, processed_stage, max_block_size, num_streams, 0); + checkQueryProcessingStage(processed_stage, context); + return reader.read(column_names, query_info, context, max_block_size, num_streams, 0); } BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Settings & /*settings*/) diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index a2787a8bb30..2699837b445 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -56,7 +56,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageMySQL.cpp b/dbms/src/Storages/StorageMySQL.cpp index 3c63ebe4ca5..1cd93bbe1ff 100644 --- a/dbms/src/Storages/StorageMySQL.cpp +++ b/dbms/src/Storages/StorageMySQL.cpp @@ -37,12 +37,12 @@ BlockInputStreams StorageMySQL::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned) { check(column_names); - processed_stage = QueryProcessingStage::FetchColumns; + checkQueryProcessingStage(processed_stage, context); String query = transformQueryForExternalDatabase(*query_info.query, getColumns().ordinary, remote_database_name, remote_table_name, context); Block sample_block; diff --git a/dbms/src/Storages/StorageMySQL.h b/dbms/src/Storages/StorageMySQL.h index 9e2b233283e..9f10211ca92 100644 --- a/dbms/src/Storages/StorageMySQL.h +++ b/dbms/src/Storages/StorageMySQL.h @@ -33,7 +33,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageNull.h b/dbms/src/Storages/StorageNull.h index cdde480a951..406cf3fc857 100644 --- a/dbms/src/Storages/StorageNull.h +++ b/dbms/src/Storages/StorageNull.h @@ -23,11 +23,12 @@ public: BlockInputStreams read( const Names & column_names, const SelectQueryInfo &, - const Context &, - QueryProcessingStage::Enum &, + const Context & context, + QueryProcessingStage::Enum processing_stage, size_t, unsigned) override { + checkQueryProcessingStage(processed_stage, context); return { std::make_shared(getSampleBlockForColumns(column_names)) }; } diff --git a/dbms/src/Storages/StorageODBC.cpp b/dbms/src/Storages/StorageODBC.cpp index 39b51d46047..9365ab57a39 100644 --- a/dbms/src/Storages/StorageODBC.cpp +++ b/dbms/src/Storages/StorageODBC.cpp @@ -37,12 +37,12 @@ BlockInputStreams StorageODBC::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned /*num_streams*/) { check(column_names); - processed_stage = QueryProcessingStage::FetchColumns; + checkQueryProcessingStage(processed_stage, context); String query = transformQueryForExternalDatabase( *query_info.query, getColumns().ordinary, remote_database_name, remote_table_name, context); diff --git a/dbms/src/Storages/StorageODBC.h b/dbms/src/Storages/StorageODBC.h index 605d35b0202..10e13f78cfb 100644 --- a/dbms/src/Storages/StorageODBC.h +++ b/dbms/src/Storages/StorageODBC.h @@ -35,7 +35,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 3e371816533..be15b0cbfb0 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2439,10 +2439,11 @@ BlockInputStreams StorageReplicatedMergeTree::read( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, const size_t max_block_size, const unsigned num_streams) { + checkQueryProcessingStage(processed_stage, context); const Settings & settings = context.getSettingsRef(); /** The `select_sequential_consistency` setting has two meanings: @@ -2480,8 +2481,7 @@ BlockInputStreams StorageReplicatedMergeTree::read( } } - return reader.read( - column_names, query_info, context, processed_stage, max_block_size, num_streams, max_block_number_to_read); + return reader.read(column_names, query_info, context, max_block_size, num_streams, max_block_number_to_read); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 457e834ea1c..7173b64ed4f 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -101,7 +101,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageStripeLog.cpp b/dbms/src/Storages/StorageStripeLog.cpp index 1bebca62322..93f06738db2 100644 --- a/dbms/src/Storages/StorageStripeLog.cpp +++ b/dbms/src/Storages/StorageStripeLog.cpp @@ -232,14 +232,14 @@ BlockInputStreams StorageStripeLog::read( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, const size_t /*max_block_size*/, unsigned num_streams) { + checkQueryProcessingStage(processed_stage, context); std::shared_lock lock(rwlock); check(column_names); - processed_stage = QueryProcessingStage::FetchColumns; NameSet column_names_set(column_names.begin(), column_names.end()); diff --git a/dbms/src/Storages/StorageStripeLog.h b/dbms/src/Storages/StorageStripeLog.h index 115f070f8d0..7399cd74b65 100644 --- a/dbms/src/Storages/StorageStripeLog.h +++ b/dbms/src/Storages/StorageStripeLog.h @@ -32,7 +32,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index cb62c911e63..cd2261532bf 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -341,12 +341,12 @@ BlockInputStreams StorageTinyLog::read( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, const size_t max_block_size, const unsigned /*num_streams*/) { + checkQueryProcessingStage(processed_stage, context); check(column_names); - processed_stage = QueryProcessingStage::FetchColumns; return BlockInputStreams(1, std::make_shared( max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size)); } diff --git a/dbms/src/Storages/StorageTinyLog.h b/dbms/src/Storages/StorageTinyLog.h index acaf7944f30..109e01636d8 100644 --- a/dbms/src/Storages/StorageTinyLog.h +++ b/dbms/src/Storages/StorageTinyLog.h @@ -31,7 +31,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override; diff --git a/dbms/src/Storages/StorageView.cpp b/dbms/src/Storages/StorageView.cpp index 06809ffb4e3..17628681847 100644 --- a/dbms/src/Storages/StorageView.cpp +++ b/dbms/src/Storages/StorageView.cpp @@ -33,11 +33,11 @@ BlockInputStreams StorageView::read( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, const size_t /*max_block_size*/, const unsigned /*num_streams*/) { - processed_stage = QueryProcessingStage::FetchColumns; + checkQueryProcessingStage(processed_stage, context); BlockInputStreams res = InterpreterSelectWithUnionQuery(inner_query, context, column_names).executeWithMultipleStreams(); /// It's expected that the columns read from storage are not constant. diff --git a/dbms/src/Storages/StorageView.h b/dbms/src/Storages/StorageView.h index 20a8f76189c..12f666a0648 100644 --- a/dbms/src/Storages/StorageView.h +++ b/dbms/src/Storages/StorageView.h @@ -25,7 +25,7 @@ public: const Names & column_names, const SelectQueryInfo & query_info, const Context & context, - QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;