From 882125dd6d9fb64638902b5e6a025147f47aa1fb Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Tue, 18 Sep 2018 19:09:21 +0800 Subject: [PATCH] ISSUES-3134 fix merge and distributed engine query stage --- dbms/src/Storages/StorageMerge.cpp | 277 ++++++++++-------- dbms/src/Storages/StorageMerge.h | 10 +- .../00717_merge_and_distributed.reference | 57 +++- .../00717_merge_and_distributed.sql | 46 ++- 4 files changed, 261 insertions(+), 129 deletions(-) diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index 5b8dbc80d21..63a2a66afaa 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB @@ -138,8 +139,8 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & { auto stage_in_source_tables = QueryProcessingStage::FetchColumns; - auto database = context.getDatabase(source_database); - auto iterator = database->getIterator(context); + DatabasePtr database = context.getDatabase(source_database); + DatabaseIteratorPtr iterator = database->getIterator(context); size_t selected_table_size = 0; @@ -149,14 +150,18 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context & { auto & table = iterator->table(); if (table.get() != this) + { ++selected_table_size; + stage_in_source_tables = table->getQueryProcessingStage(context); + } + } iterator->next(); } - auto fetch_or_mergeable_stage = std::min(stage_in_source_tables, QueryProcessingStage::WithMergeableState); - return selected_table_size == 1 ? stage_in_source_tables : fetch_or_mergeable_stage; + auto fetch_stage = std::max(stage_in_source_tables, QueryProcessingStage::WithMergeableState); + return selected_table_size == 1 ? fetch_stage : QueryProcessingStage::WithMergeableState; } @@ -184,34 +189,6 @@ BlockInputStreams StorageMerge::read( real_column_names.push_back(name); } - /** First we make list of selected tables to find out its size. - * This is necessary to correctly pass the recommended number of threads to each table. - */ - StorageListWithLocks selected_tables = getSelectedTables(); - - const ASTPtr & query = query_info.query; - - for (const auto & elem : selected_tables) - { - /// If PREWHERE is used in query, you need to make sure that all tables support this. - if (typeid_cast(*query).prewhere_expression) - if (!elem.first->supportsPrewhere()) - throw Exception("Storage " + elem.first->getName() + " doesn't support PREWHERE.", - ErrorCodes::ILLEGAL_PREWHERE); - } - - Block virtual_columns_block = getBlockWithVirtualColumns(selected_tables); - - /// If _table column is requested, try filtering - if (has_table_virtual_column) - { - VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context); - auto values = VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_table"); - - /// Remove unused tables from the list - selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); }); - } - /** Just in case, turn off optimization "transfer to PREWHERE", * since there is no certainty that it works when one of table is MergeTree and other is not. */ @@ -219,111 +196,48 @@ BlockInputStreams StorageMerge::read( modified_context.getSettingsRef().optimize_move_to_prewhere = false; /// What will be result structure depending on query processed stage in source tables? - Block header; + Block header = getQueryHeader(column_names, query_info, context, processed_stage); + /** First we make list of selected tables to find out its size. + * This is necessary to correctly pass the recommended number of threads to each table. + */ + StorageListWithLocks selected_tables = getSelectedTables(query_info.query, has_table_virtual_column, true); + + size_t remaining_streams = num_streams; size_t tables_count = selected_tables.size(); - size_t curr_table_number = 0; - for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it, ++curr_table_number) + for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it) { - StoragePtr table = it->first; - auto & table_lock = it->second; + size_t current_need_streams = tables_count >= num_streams ? 1 : (num_streams / tables_count); + size_t current_streams = std::min(current_need_streams, remaining_streams); + remaining_streams -= current_streams; - /// If there are only virtual columns in query, you must request at least one other column. - if (real_column_names.size() == 0) - real_column_names.push_back(ExpressionActions::getSmallestColumn(table->getColumns().getAllPhysical())); - - /// Substitute virtual column for its value when querying tables. - ASTPtr modified_query_ast = query->clone(); - VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_table", table->getTableName()); - - SelectQueryInfo modified_query_info; - modified_query_info.query = modified_query_ast; - modified_query_info.prewhere_info = query_info.prewhere_info; - modified_query_info.sets = query_info.sets; + StoragePtr storage = it->first; + TableStructureReadLockPtr struct_lock = it->second; BlockInputStreams source_streams; - if (curr_table_number < num_streams) + if (current_streams) { - source_streams = table->read( - real_column_names, - modified_query_info, - modified_context, - processed_stage, - max_block_size, - tables_count >= num_streams ? 1 : (num_streams / tables_count)); - - if (!header) - { - switch (processed_stage) - { - case QueryProcessingStage::FetchColumns: - { - header = getSampleBlockForColumns(column_names); - - if (query_info.prewhere_info) - { - query_info.prewhere_info->prewhere_actions->execute(header); - header = materializeBlock(header); - if (query_info.prewhere_info->remove_prewhere_column) - header.erase(query_info.prewhere_info->prewhere_column_name); - } - - break; - } - case QueryProcessingStage::WithMergeableState: - case QueryProcessingStage::Complete: - header = materializeBlock(InterpreterSelectQuery( - query_info.query, context, std::make_shared(getSampleBlockForColumns(column_names)), - processed_stage, true).getSampleBlock()); - break; - } - } - - if (has_table_virtual_column) - for (auto & stream : source_streams) - stream = std::make_shared>( - stream, std::make_shared(), table->getTableName(), "_table"); - - /// Subordinary tables could have different but convertible types, like numeric types of different width. - /// We must return streams with structure equals to structure of Merge table. - for (auto & stream : source_streams) - stream = std::make_shared(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); + source_streams = createSourceStreams( + query_info, processed_stage, max_block_size, real_column_names, modified_context, header, storage, + struct_lock, current_streams, has_table_virtual_column); } else { - /// If many streams, initialize it lazily, to avoid long delay before start of query processing. source_streams.emplace_back(std::make_shared(header, [=]() -> BlockInputStreamPtr { - BlockInputStreams streams = table->read( - real_column_names, - modified_query_info, - modified_context, - processed_stage, - max_block_size, - 1); + BlockInputStreams streams = createSourceStreams(query_info, processed_stage, max_block_size, real_column_names, + modified_context, header, storage, + struct_lock, current_streams, has_table_virtual_column); - if (streams.empty()) - { - return std::make_shared(header); - } - else - { - BlockInputStreamPtr stream = streams.size() > 1 ? std::make_shared(streams) : streams[0]; + if (streams.size() != 1) + throw Exception("LogicalError: the lazy stream size must to be one.", ErrorCodes::LOGICAL_ERROR); - if (has_table_virtual_column) - stream = std::make_shared>( - stream, std::make_shared(), table->getTableName(), "_table"); - - return std::make_shared(context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); - } + return streams[0]; })); } - for (auto & stream : source_streams) - stream->addTableLock(table_lock); - res.insert(res.end(), source_streams.begin(), source_streams.end()); } @@ -334,15 +248,63 @@ BlockInputStreams StorageMerge::read( return res; } -/// Construct a block consisting only of possible values of virtual columns -Block StorageMerge::getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const +BlockInputStreams StorageMerge::createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, + const size_t max_block_size, const Names & real_column_names, + const Context & modified_context, const Block & header, const StoragePtr & storage, + const TableStructureReadLockPtr & struct_lock, size_t current_streams, + bool has_table_virtual_column) const { - auto column = ColumnString::create(); + SelectQueryInfo modified_query_info; + modified_query_info.query = query_info.query->clone(); + VirtualColumnUtils::rewriteEntityInAst(modified_query_info.query, "_table", storage->getTableName()); + modified_query_info.prewhere_info = query_info.prewhere_info; + modified_query_info.sets = query_info.sets; - for (const auto & elem : selected_tables) - column->insert(elem.first->getTableName()); + BlockInputStreams source_streams; - return Block{ColumnWithTypeAndName(std::move(column), std::make_shared(), "_table")}; + if (processed_stage <= storage->getQueryProcessingStage(modified_context)) + { + source_streams = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size, + current_streams); + } + else if (processed_stage > storage->getQueryProcessingStage(modified_context)) + { + typeid_cast(modified_query_info.query.get())->replaceDatabaseAndTable(source_database, storage->getTableName()); + + /// TODO: set num_streams + InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, Names{}, processed_stage}; + BlockInputStreamPtr interpreter_stream = interpreter.execute().in; + + /** Materialization is needed, since from distributed storage the constants come materialized. + * If you do not do this, different types (Const and non-Const) columns will be produced in different threads, + * And this is not allowed, since all code is based on the assumption that in the block stream all types are the same. + */ + source_streams.emplace_back(std::make_shared(interpreter_stream)); + } + + if (!current_streams) + { + BlockInputStreamPtr stream = + source_streams.size() > 1 ? std::make_shared(source_streams) : source_streams[0]; + + source_streams.resize(1); + source_streams[0] = stream; + } + + for (BlockInputStreamPtr & source_stream : source_streams) + { + if (has_table_virtual_column) + source_stream = std::make_shared>( + source_stream, std::make_shared(), storage->getTableName(), "_table"); + + /// Subordinary tables could have different but convertible types, like numeric types of different width. + /// We must return streams with structure equals to structure of Merge table. + source_stream = std::make_shared( + modified_context, source_stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name); + + source_stream->addTableLock(struct_lock); + } + return source_streams; } StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const @@ -367,6 +329,47 @@ StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables() const } +StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock) const +{ + StorageListWithLocks selected_tables; + DatabasePtr database = context.getDatabase(source_database); + DatabaseIteratorPtr iterator = database->getIterator(context); + + auto virtual_column = ColumnString::create(); + + while (iterator->isValid()) + { + if (table_name_regexp.match(iterator->name())) + { + StoragePtr storage = iterator->table(); + + if (query && typeid_cast(query.get())->prewhere_expression && !storage->supportsPrewhere()) + throw Exception("Storage " + storage->getName() + " doesn't support PREWHERE.", + ErrorCodes::ILLEGAL_PREWHERE); + + if (storage.get() != this) + { + virtual_column->insert(storage->getTableName()); + selected_tables.emplace_back(storage, get_lock ? storage->lockStructure(false, __PRETTY_FUNCTION__) : TableStructureReadLockPtr{}); + } + } + + iterator->next(); + } + + if (has_virtual_column) + { + Block virtual_columns_block = Block{ColumnWithTypeAndName(std::move(virtual_column), std::make_shared(), "_table")}; + VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context); + auto values = VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_table"); + + /// Remove unused tables from the list + selected_tables.remove_if([&] (const auto & elem) { return values.find(elem.first->getTableName()) == values.end(); }); + } + + return selected_tables; +} + void StorageMerge::alter(const AlterCommands & params, const String & database_name, const String & table_name, const Context & context) { for (const auto & param : params) @@ -381,6 +384,32 @@ void StorageMerge::alter(const AlterCommands & params, const String & database_n setColumns(new_columns); } +Block StorageMerge::getQueryHeader( + const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage) +{ + switch (processed_stage) + { + case QueryProcessingStage::FetchColumns: + { + Block header = getSampleBlockForColumns(column_names); + if (query_info.prewhere_info) + { + query_info.prewhere_info->prewhere_actions->execute(header); + header = materializeBlock(header); + if (query_info.prewhere_info->remove_prewhere_column) + header.erase(query_info.prewhere_info->prewhere_column_name); + } + return header; + } + case QueryProcessingStage::WithMergeableState: + case QueryProcessingStage::Complete: + return materializeBlock(InterpreterSelectQuery( + query_info.query, context, std::make_shared(getSampleBlockForColumns(column_names)), + processed_stage, true).getSampleBlock()); + } + throw Exception("Logical Error: unknow processed stage.", ErrorCodes::LOGICAL_ERROR); +} + void registerStorageMerge(StorageFactory & factory) { diff --git a/dbms/src/Storages/StorageMerge.h b/dbms/src/Storages/StorageMerge.h index 7ebc59947c6..a4570bacf30 100644 --- a/dbms/src/Storages/StorageMerge.h +++ b/dbms/src/Storages/StorageMerge.h @@ -58,7 +58,7 @@ private: StorageListWithLocks getSelectedTables() const; - Block getBlockWithVirtualColumns(const StorageListWithLocks & selected_tables) const; + StorageMerge::StorageListWithLocks getSelectedTables(const ASTPtr & query, bool has_virtual_column, bool get_lock) const; template StoragePtr getFirstTable(F && predicate) const; @@ -70,6 +70,14 @@ protected: const String & source_database_, const String & table_name_regexp_, const Context & context_); + + Block getQueryHeader(const Names & column_names, const SelectQueryInfo & query_info, + const Context & context, QueryProcessingStage::Enum processed_stage); + + BlockInputStreams createSourceStreams(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, + const size_t max_block_size, const Names &real_column_names, const Context & modified_context, + const Block & header, const StoragePtr & storage, const TableStructureReadLockPtr & struct_lock, + size_t current_streams, bool has_table_virtual_column) const; }; } diff --git a/dbms/tests/queries/0_stateless/00717_merge_and_distributed.reference b/dbms/tests/queries/0_stateless/00717_merge_and_distributed.reference index e582eb77773..f64ded6ac5d 100644 --- a/dbms/tests/queries/0_stateless/00717_merge_and_distributed.reference +++ b/dbms/tests/queries/0_stateless/00717_merge_and_distributed.reference @@ -1,2 +1,55 @@ -300 -300 +--------------Single Local------------ +2018-08-01 100 +2018-08-01 100 test_local_1 +100 test_local_1 +2018-08-01 100 +2018-08-01 100 +2018-08-01 100 +2018-08-01 100 +--------------Single Distributed------------ +2018-08-01 100 +2018-08-01 100 test_distributed_1 +100 test_distributed_1 +2018-08-01 100 +2018-08-01 100 +2018-08-01 100 +2018-08-01 100 +--------------Local Merge Local------------ +2018-08-01 100 +2018-08-01 200 +2018-08-01 100 test_local_1 +2018-08-01 200 test_local_2 +100 test_local_1 +200 test_local_2 +2018-08-01 100 +2018-08-01 100 +2018-08-01 100 +2018-08-01 200 +2018-08-01 100 +2018-08-01 200 +--------------Local Merge Distributed------------ +2018-08-01 200 +2018-08-01 100 +2018-08-01 200 test_distributed_2 +2018-08-01 100 test_local_1 +200 test_distributed_2 +100 test_local_1 +2018-08-01 100 +2018-08-01 100 +2018-08-01 100 +2018-08-01 200 +2018-08-01 100 +2018-08-01 200 +--------------Distributed Merge Distributed------------ +2018-08-01 100 +2018-08-01 200 +2018-08-01 100 test_distributed_1 +2018-08-01 200 test_distributed_2 +100 test_distributed_1 +200 test_distributed_2 +2018-08-01 100 +2018-08-01 100 +2018-08-01 100 +2018-08-01 200 +2018-08-01 100 +2018-08-01 200 diff --git a/dbms/tests/queries/0_stateless/00717_merge_and_distributed.sql b/dbms/tests/queries/0_stateless/00717_merge_and_distributed.sql index 0cb83772ea1..5a46431c055 100644 --- a/dbms/tests/queries/0_stateless/00717_merge_and_distributed.sql +++ b/dbms/tests/queries/0_stateless/00717_merge_and_distributed.sql @@ -11,8 +11,50 @@ CREATE TABLE test.test_distributed_2 AS test.test_local_2 ENGINE = Distributed(' INSERT INTO test.test_local_1 VALUES ('2018-08-01',100); INSERT INTO test.test_local_2 VALUES ('2018-08-01',200); -SELECT sum(value) FROM merge('test', 'test_local_1|test_distributed_2'); -SELECT sum(value) FROM merge('test', 'test_distributed_1|test_distributed_2'); +SELECT '--------------Single Local------------'; +SELECT * FROM merge('test', 'test_local_1'); +SELECT *, _table FROM merge('test', 'test_local_1') ORDER BY _table; +SELECT sum(value), _table FROM merge('test', 'test_local_1') GROUP BY _table ORDER BY _table; +SELECT * FROM merge('test', 'test_local_1') WHERE _table = 'test_local_1'; +SELECT * FROM merge('test', 'test_local_1') PREWHERE _table = 'test_local_1'; +SELECT * FROM merge('test', 'test_local_1') WHERE _table in ('test_local_1', 'test_local_2'); +SELECT * FROM merge('test', 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); + +SELECT '--------------Single Distributed------------'; +SELECT * FROM merge('test', 'test_distributed_1'); +SELECT *, _table FROM merge('test', 'test_distributed_1') ORDER BY _table; +SELECT sum(value), _table FROM merge('test', 'test_distributed_1') GROUP BY _table ORDER BY _table; +SELECT * FROM merge('test', 'test_distributed_1') WHERE _table = 'test_distributed_1'; +SELECT * FROM merge('test', 'test_distributed_1') PREWHERE _table = 'test_distributed_1'; +SELECT * FROM merge('test', 'test_distributed_1') WHERE _table in ('test_distributed_1', 'test_distributed_2'); +SELECT * FROM merge('test', 'test_distributed_1') PREWHERE _table in ('test_distributed_1', 'test_distributed_2'); + +SELECT '--------------Local Merge Local------------'; +SELECT * FROM merge('test', 'test_local_1|test_local_2'); +SELECT *, _table FROM merge('test', 'test_local_1|test_local_2') ORDER BY _table; +SELECT sum(value), _table FROM merge('test', 'test_local_1|test_local_2') GROUP BY _table ORDER BY _table; +SELECT * FROM merge('test', 'test_local_1|test_local_2') WHERE _table = 'test_local_1'; +SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; +SELECT * FROM merge('test', 'test_local_1|test_local_2') WHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; +SELECT * FROM merge('test', 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; + +SELECT '--------------Local Merge Distributed------------'; +SELECT * FROM merge('test', 'test_local_1|test_distributed_2'); +SELECT *, _table FROM merge('test', 'test_local_1|test_distributed_2') ORDER BY _table; +SELECT sum(value), _table FROM merge('test', 'test_local_1|test_distributed_2') GROUP BY _table ORDER BY _table; +SELECT * FROM merge('test', 'test_local_1|test_distributed_2') WHERE _table = 'test_local_1'; +SELECT * FROM merge('test', 'test_local_1|test_distributed_2') PREWHERE _table = 'test_local_1'; +SELECT * FROM merge('test', 'test_local_1|test_distributed_2') WHERE _table in ('test_local_1', 'test_distributed_2') ORDER BY value; +SELECT * FROM merge('test', 'test_local_1|test_distributed_2') PREWHERE _table in ('test_local_1', 'test_distributed_2') ORDER BY value; + +SELECT '--------------Distributed Merge Distributed------------'; +SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2'); +SELECT *, _table FROM merge('test', 'test_distributed_1|test_distributed_2') ORDER BY _table; +SELECT sum(value), _table FROM merge('test', 'test_distributed_1|test_distributed_2') GROUP BY _table ORDER BY _table; +SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') WHERE _table = 'test_distributed_1'; +SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') PREWHERE _table = 'test_distributed_1'; +SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') WHERE _table in ('test_distributed_1', 'test_distributed_2') ORDER BY value; +SELECT * FROM merge('test', 'test_distributed_1|test_distributed_2') PREWHERE _table in ('test_distributed_1', 'test_distributed_2') ORDER BY value; DROP TABLE IF EXISTS test.test_local_1; DROP TABLE IF EXISTS test.test_local_2;