From 7f92e6a21f02788a043d95e86034ed51955da3b5 Mon Sep 17 00:00:00 2001 From: Vitaliy Zakaznikov Date: Thu, 5 Dec 2019 00:29:37 +0100 Subject: [PATCH] Draft of proper support for subquries in live view tables. --- dbms/src/Storages/LiveView/StorageBlocks.h | 46 +++++++++ .../src/Storages/LiveView/StorageLiveView.cpp | 95 ++++++++++++++----- dbms/src/Storages/LiveView/StorageLiveView.h | 16 +++- 3 files changed, 134 insertions(+), 23 deletions(-) create mode 100644 dbms/src/Storages/LiveView/StorageBlocks.h diff --git a/dbms/src/Storages/LiveView/StorageBlocks.h b/dbms/src/Storages/LiveView/StorageBlocks.h new file mode 100644 index 00000000000..e41d6b405dd --- /dev/null +++ b/dbms/src/Storages/LiveView/StorageBlocks.h @@ -0,0 +1,46 @@ +#pragma once + +#include + +namespace DB +{ +class StorageBlocks : public IStorage +{ + +public: + StorageBlocks(const std::string & database_name_, const std::string & table_name_, + const ColumnsDescription & columns_, BlockInputStreams streams_, + QueryProcessingStage::Enum to_stage_) + : database_name(database_name_), table_name(table_name_), streams(streams_), to_stage(to_stage_) + { + setColumns(columns_); + } + static StoragePtr createStorage(std::string database_name, std::string table_name, const ColumnsDescription columns, BlockInputStreams streams, QueryProcessingStage::Enum to_stage) + { + return std::make_shared(std::move(database_name), std::move(table_name), std::move(columns), std::move(streams), to_stage); + } + std::string getName() const override { return "Blocks"; } + std::string getTableName() const override { return table_name; } + std::string getDatabaseName() const override { return database_name; } + QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; } + + BlockInputStreams read( + const Names & /*column_names*/, + const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum /*processed_stage*/, + size_t /*max_block_size*/, + unsigned /*num_streams*/) + { + return streams; + } + +private: + std::string database_name; + std::string table_name; + Block res_block; + BlockInputStreams streams; + QueryProcessingStage::Enum to_stage; +}; + +} diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index 0545351e47e..4e8e8d2cb30 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -33,6 +33,7 @@ limitations under the License. */ #include #include #include +#include #include #include @@ -52,13 +53,44 @@ namespace ErrorCodes extern const int SUPPORT_IS_DISABLED; } -static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name) +static ASTTableExpression * getTableExpression(ASTSelectQuery & select, size_t table_number) { - auto db_and_table = getDatabaseAndTable(query, 0); - ASTPtr subquery = extractTableExpression(query, 0); + if (!select.tables()) + return {}; + + const auto & tables_in_select_query = select.tables()->as(); + if (tables_in_select_query.children.size() <= table_number) + return {}; + + const auto & tables_element = tables_in_select_query.children[table_number]->as(); + + if (!tables_element.table_expression) + return {}; + + return tables_element.table_expression->as(); +} + + +static void extractDependentTable(ASTPtr & query, String & select_database_name, String & select_table_name, const String & database_name, const String & table_name, ASTPtr & inner_outer_query, ASTPtr & inner_subquery) +{ + ASTSelectQuery & select_query = typeid_cast(*query); + auto db_and_table = getDatabaseAndTable(select_query, 0); + ASTPtr subquery = extractTableExpression(select_query, 0); if (!db_and_table && !subquery) + { + if (inner_outer_query && inner_subquery) + { + auto table_expression = getTableExpression(inner_outer_query->as(), 0); + //String table_alias = getTableExpressionAlias(table_expression); + table_expression->subquery = nullptr; + table_expression->database_and_table_name = createTableIdentifier(database_name, table_name); + + //if (!table_alias.empty()) + // table_expression->database_and_table_name->setAlias(table_alias); + } return; + } if (db_and_table) { @@ -68,7 +100,7 @@ static void extractDependentTable(ASTSelectQuery & query, String & select_databa { db_and_table->database = select_database_name; AddDefaultDatabaseVisitor visitor(select_database_name); - visitor.visit(query); + visitor.visit(select_query); } else select_database_name = db_and_table->database; @@ -78,9 +110,10 @@ static void extractDependentTable(ASTSelectQuery & query, String & select_databa if (ast_select->list_of_selects->children.size() != 1) throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW); - auto & inner_query = ast_select->list_of_selects->children.at(0); + inner_outer_query = query; + inner_subquery = ast_select->list_of_selects->children.at(0); - extractDependentTable(inner_query->as(), select_database_name, select_table_name); + extractDependentTable(inner_subquery, select_database_name, select_table_name, database_name, table_name, inner_outer_query, inner_subquery); } else throw Exception("Logical error while creating StorageLiveView." @@ -112,6 +145,10 @@ void StorageLiveView::writeIntoLiveView( BlockInputStreams from; BlocksPtrs mergeable_blocks; BlocksPtr new_mergeable_blocks = std::make_shared(); + ASTPtr mergeable_query = live_view.getInnerQuery(); + + if (live_view.getInnerSubQuery()) + mergeable_query = live_view.getInnerSubQuery(); { std::lock_guard lock(live_view.mutex); @@ -121,7 +158,7 @@ void StorageLiveView::writeIntoLiveView( { mergeable_blocks = std::make_shared>(); BlocksPtr base_mergeable_blocks = std::make_shared(); - InterpreterSelectQuery interpreter(live_view.getInnerQuery(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); + InterpreterSelectQuery interpreter(mergeable_query, context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); auto view_mergeable_stream = std::make_shared( interpreter.execute().in); while (Block this_block = view_mergeable_stream->read()) @@ -148,8 +185,7 @@ void StorageLiveView::writeIntoLiveView( auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); BlockInputStreams streams = {std::make_shared(block)}; auto proxy_storage = std::make_shared(parent_storage, std::move(streams), QueryProcessingStage::FetchColumns); - InterpreterSelectQuery select_block(live_view.getInnerQuery(), - context, proxy_storage, + InterpreterSelectQuery select_block(mergeable_query, context, proxy_storage, QueryProcessingStage::WithMergeableState); auto data_mergeable_stream = std::make_shared( select_block.execute().in); @@ -178,10 +214,18 @@ void StorageLiveView::writeIntoLiveView( } auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName()); - auto proxy_storage = std::make_shared(parent_storage, std::move(from), QueryProcessingStage::WithMergeableState); - InterpreterSelectQuery select(live_view.getInnerQuery(), context, proxy_storage, QueryProcessingStage::Complete); + + auto blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name, parent_storage->getColumns(), std::move(from), QueryProcessingStage::WithMergeableState); + InterpreterSelectQuery select(mergeable_query->clone(), context, blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete)); BlockInputStreamPtr data = std::make_shared(select.execute().in); + if (live_view.getInnerSubQuery()) + { + auto outer_blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name, ColumnsDescription(data->getHeader().getNamesAndTypesList()), {data}, QueryProcessingStage::FetchColumns); + InterpreterSelectQuery outer_select(live_view.getInnerOuterQuery(), context, outer_blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete)); + data = std::make_shared(outer_select.execute().in); + } + /// Squashing is needed here because the view query can generate a lot of blocks /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY /// and two-level aggregation is triggered). @@ -216,8 +260,7 @@ StorageLiveView::StorageLiveView( inner_query = query.select->list_of_selects->children.at(0); - ASTSelectQuery & select_query = typeid_cast(*inner_query); - extractDependentTable(select_query, select_database_name, select_table_name); + extractDependentTable(inner_query, select_database_name, select_table_name, database_name, table_name, inner_outer_query, inner_subquery); /// If the table is not specified - use the table `system.one` if (select_table_name.empty()) @@ -260,9 +303,7 @@ Block StorageLiveView::getHeader() const if (!sample_block) { - auto storage = global_context.getTable(select_database_name, select_table_name); - sample_block = InterpreterSelectQuery(inner_query, *live_view_context, storage, - SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock(); + sample_block = InterpreterSelectQuery(inner_query->clone(), *live_view_context, SelectQueryOptions(QueryProcessingStage::Complete)).getSampleBlock(); sample_block.insert({DataTypeUInt64().createColumnConst( sample_block.rows(), 0)->convertToFullColumnIfConst(), std::make_shared(), @@ -274,7 +315,6 @@ Block StorageLiveView::getHeader() const sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst(); } } - return sample_block; } @@ -285,20 +325,31 @@ bool StorageLiveView::getNewBlocks() BlocksPtr new_blocks = std::make_shared(); BlocksMetadataPtr new_blocks_metadata = std::make_shared(); BlocksPtr new_mergeable_blocks = std::make_shared(); + ASTPtr mergeable_query = inner_query; - InterpreterSelectQuery interpreter(inner_query->clone(), *live_view_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); + if (inner_subquery) + mergeable_query = inner_subquery; + + InterpreterSelectQuery interpreter(mergeable_query->clone(), *live_view_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names()); auto mergeable_stream = std::make_shared(interpreter.execute().in); - + while (Block block = mergeable_stream->read()) new_mergeable_blocks->push_back(block); mergeable_blocks = std::make_shared>(); mergeable_blocks->push_back(new_mergeable_blocks); BlockInputStreamPtr from = std::make_shared(std::make_shared(new_mergeable_blocks), mergeable_stream->getHeader()); - auto proxy_storage = ProxyStorage::createProxyStorage(global_context.getTable(select_database_name, select_table_name), {from}, QueryProcessingStage::WithMergeableState); - InterpreterSelectQuery select(inner_query->clone(), *live_view_context, proxy_storage, SelectQueryOptions(QueryProcessingStage::Complete)); + auto blocks_storage = StorageBlocks::createStorage(database_name, table_name, global_context.getTable(select_database_name, select_table_name)->getColumns(), {from}, QueryProcessingStage::WithMergeableState); + InterpreterSelectQuery select(mergeable_query->clone(), *live_view_context, blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete)); BlockInputStreamPtr data = std::make_shared(select.execute().in); - + + if (inner_subquery) + { + auto outer_blocks_storage = StorageBlocks::createStorage(database_name, table_name,ColumnsDescription(data->getHeader().getNamesAndTypesList()), {data}, QueryProcessingStage::FetchColumns); + InterpreterSelectQuery outer_select(inner_outer_query->clone(), *live_view_context, outer_blocks_storage, SelectQueryOptions(QueryProcessingStage::Complete)); + data = std::make_shared(outer_select.execute().in); + } + /// Squashing is needed here because the view query can generate a lot of blocks /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY /// and two-level aggregation is triggered). diff --git a/dbms/src/Storages/LiveView/StorageLiveView.h b/dbms/src/Storages/LiveView/StorageLiveView.h index b3773019936..d3816384e77 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.h +++ b/dbms/src/Storages/LiveView/StorageLiveView.h @@ -51,6 +51,18 @@ public: // const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } ASTPtr getInnerQuery() const { return inner_query->clone(); } + ASTPtr getInnerSubQuery() const + { + if (inner_subquery) + return inner_subquery->clone(); + return nullptr; + }; + ASTPtr getInnerOuterQuery() const + { + if (inner_outer_query) + return inner_outer_query->clone(); + return nullptr; + }; /// It is passed inside the query and solved at its level. bool supportsSampling() const override { return true; } @@ -146,7 +158,9 @@ private: String select_table_name; String table_name; String database_name; - ASTPtr inner_query; + ASTPtr inner_query; /// stored query : SELECT * FROM ( SELECT a FROM A) + ASTPtr inner_subquery; /// stored query's subquery if any : SLECT a FROM A + ASTPtr inner_outer_query; /// the query right before innermost subquery : ... SELECT * FROM ( subquery ) Context & global_context; std::unique_ptr live_view_context;