From 8b3a245a1dcc0af1e0c95ea09a96d549a17df378 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 10 Mar 2020 22:36:17 +0300 Subject: [PATCH] improve temporary tables --- dbms/programs/server/TCPHandler.cpp | 13 +-- dbms/src/Core/ExternalTable.cpp | 6 +- .../DataStreams/tests/expression_stream.cpp | 2 +- dbms/src/DataStreams/tests/filter_stream.cpp | 2 +- dbms/src/Databases/DatabaseOnDisk.cpp | 1 + dbms/src/Interpreters/Context.cpp | 87 +++---------------- dbms/src/Interpreters/Context.h | 10 +-- dbms/src/Interpreters/DatabaseCatalog.cpp | 79 ++++++++++++++++- dbms/src/Interpreters/DatabaseCatalog.h | 27 ++++++ dbms/src/Interpreters/ExpressionAnalyzer.cpp | 5 -- dbms/src/Interpreters/ExpressionAnalyzer.h | 5 +- dbms/src/Interpreters/ExternalTablesVisitor.h | 43 --------- .../Interpreters/GlobalSubqueriesVisitor.h | 10 +-- .../Interpreters/InterpreterCreateQuery.cpp | 17 ++-- .../Interpreters/InterpreterSelectQuery.cpp | 2 +- .../tests/expression_analyzer.cpp | 2 +- .../src/Storages/LiveView/StorageLiveView.cpp | 23 ++--- dbms/src/Storages/LiveView/StorageLiveView.h | 4 +- dbms/src/Storages/StorageInput.cpp | 4 +- dbms/src/Storages/StorageInput.h | 2 +- .../Storages/System/StorageSystemNumbers.cpp | 4 +- .../Storages/System/StorageSystemNumbers.h | 2 +- .../Storages/System/attachSystemTables.cpp | 4 +- dbms/src/Storages/tests/system_numbers.cpp | 2 +- .../src/TableFunctions/TableFunctionInput.cpp | 2 +- .../TableFunctions/TableFunctionNumbers.cpp | 2 +- .../TableFunctions/TableFunctionRemote.cpp | 9 +- .../0_stateless/00979_live_view_watch_live.py | 2 +- 28 files changed, 189 insertions(+), 182 deletions(-) delete mode 100644 dbms/src/Interpreters/ExternalTablesVisitor.h diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index fe5ec5a3c6b..f2b59894113 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -992,14 +992,15 @@ bool TCPHandler::receiveData(bool scalar) auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal); StoragePtr storage; /// If such a table does not exist, create it. - if (resolved.empty()) + if (resolved) + storage = DatabaseCatalog::instance().getTable(resolved); + else { NamesAndTypesList columns = block.getNamesAndTypesList(); - storage = StorageMemory::create(temporary_id, ColumnsDescription{columns}, ConstraintsDescription{}); - storage->startup(); - query_context->addExternalTable(temporary_id.table_name, storage); - } else - storage = DatabaseCatalog::instance().getTable(resolved); + auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}); + storage = temporary_table.getTable(); + query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table)); + } /// The data will be written directly to the table. state.io.out = storage->write(ASTPtr(), *query_context); } diff --git a/dbms/src/Core/ExternalTable.cpp b/dbms/src/Core/ExternalTable.cpp index 3858054bcb2..fc075aae6b3 100644 --- a/dbms/src/Core/ExternalTable.cpp +++ b/dbms/src/Core/ExternalTable.cpp @@ -170,9 +170,9 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, /// Create table NamesAndTypesList columns = sample_block.getNamesAndTypesList(); - StoragePtr storage = StorageMemory::create(StorageID("_external", data->table_name), ColumnsDescription{columns}, ConstraintsDescription{}); - storage->startup(); - context.addExternalTable(data->table_name, storage); + auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns}); + auto storage = temporary_table.getTable(); + context.addExternalTable(data->table_name, std::move(temporary_table)); BlockOutputStreamPtr output = storage->write(ASTPtr(), context); /// Write data diff --git a/dbms/src/DataStreams/tests/expression_stream.cpp b/dbms/src/DataStreams/tests/expression_stream.cpp index 655e64c8afd..bd4117f5aab 100644 --- a/dbms/src/DataStreams/tests/expression_stream.cpp +++ b/dbms/src/DataStreams/tests/expression_stream.cpp @@ -47,7 +47,7 @@ try chain.finalize(); ExpressionActionsPtr expression = chain.getLastActions(); - StoragePtr table = StorageSystemNumbers::create("numbers", false); + StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false); Names column_names; column_names.push_back("number"); diff --git a/dbms/src/DataStreams/tests/filter_stream.cpp b/dbms/src/DataStreams/tests/filter_stream.cpp index fa3217f2022..5e324251440 100644 --- a/dbms/src/DataStreams/tests/filter_stream.cpp +++ b/dbms/src/DataStreams/tests/filter_stream.cpp @@ -52,7 +52,7 @@ try chain.finalize(); ExpressionActionsPtr expression = chain.getLastActions(); - StoragePtr table = StorageSystemNumbers::create("numbers", false); + StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false); Names column_names; column_names.push_back("number"); diff --git a/dbms/src/Databases/DatabaseOnDisk.cpp b/dbms/src/Databases/DatabaseOnDisk.cpp index 4ca866b006f..c005e3160c0 100644 --- a/dbms/src/Databases/DatabaseOnDisk.cpp +++ b/dbms/src/Databases/DatabaseOnDisk.cpp @@ -53,6 +53,7 @@ std::pair createTableFromAST( { const auto & table_function = ast_create_query.as_table_function->as(); const auto & factory = TableFunctionFactory::instance(); + //FIXME storage will have wrong database name StoragePtr storage = factory.get(table_function.name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table); return {ast_create_query.table, storage}; } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 7bd2354f304..38a921379c6 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -274,62 +274,6 @@ void NamedSession::release() parent.releaseSession(*this); } -struct TemporaryTableHolder : boost::noncopyable -{ - TemporaryTableHolder(const Context & context_, - IDatabase & external_tables_, - const StoragePtr & table, - const ASTPtr & query = {}) - : context(context_), external_tables(external_tables_) - { - ASTCreateQuery * create = dynamic_cast(query.get()); - if (create) - { - if (create->uuid == UUIDHelpers::Nil) - create->uuid = UUIDHelpers::generateV4(); - id = create->uuid; - } - else - id = UUIDHelpers::generateV4(); - String global_name = "_data_" + toString(id); - external_tables.createTable(context, global_name, table, query ? query->clone() : query); - if (create) - { - create->database = DatabaseCatalog::TEMPORARY_DATABASE; - create->table = global_name; - create->uuid = id; - } - } - - TemporaryTableHolder(TemporaryTableHolder && other) - : context(other.context), external_tables(other.external_tables), id(other.id) - { - other.id = UUIDHelpers::Nil; - } - - ~TemporaryTableHolder() - { - external_tables.removeTable(context, "_data_" + toString(id)); - } - - StorageID getGlobalTableID() const - { - return StorageID{DatabaseCatalog::TEMPORARY_DATABASE, "_data_" + toString(id), id}; - } - - StoragePtr getTable() const - { - auto table = external_tables.tryGetTable(context, "_data_" + toString(id)); - if (!table) - throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR); - return table; - } - - const Context & context; - IDatabase & external_tables; - UUID id; -}; - /** Set of known objects (environment), that could be used in query. * Shared (global) part. Order of members (especially, order of destruction) is very important. @@ -915,27 +859,12 @@ Tables Context::getExternalTables() const } -void Context::addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast) +void Context::addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table) { - auto external_db = DatabaseCatalog::instance().getDatabaseForTemporaryTables(); - auto holder = std::make_shared(*this, *external_db, storage, ast); auto lock = getLock(); if (external_tables_mapping.end() != external_tables_mapping.find(table_name)) throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); - - external_tables_mapping.emplace(table_name, std::move(holder)); -} - - -void Context::addScalar(const String & name, const Block & block) -{ - scalars[name] = block; -} - - -bool Context::hasScalar(const String & name) const -{ - return scalars.count(name); + external_tables_mapping.emplace(table_name, std::make_shared(std::move(temporary_table))); } @@ -954,6 +883,18 @@ bool Context::removeExternalTable(const String & table_name) } +void Context::addScalar(const String & name, const Block & block) +{ + scalars[name] = block; +} + + +bool Context::hasScalar(const String & name) const +{ + return scalars.count(name); +} + + StoragePtr Context::executeTableFunction(const ASTPtr & table_expression) { /// Slightly suboptimal. diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 76f30572426..52a8364fc70 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -130,8 +130,6 @@ struct IHostContext using IHostContextPtr = std::shared_ptr; -struct TemporaryTableHolder; - /** A set of known objects that can be used in the query. * Consists of a shared part (always common to all sessions and queries) * and copied part (which can be its own for each session or query). @@ -165,7 +163,6 @@ private: String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification. /// Thus, used in HTTP interface. If not specified - then some globally default format is used. - using TemporaryTablesMapping = std::map>; TemporaryTablesMapping external_tables_mapping; Scalars scalars; @@ -309,13 +306,14 @@ public: StorageID tryResolveStorageID(StorageID storage_id, StorageNamespace where = StorageNamespace::ResolveAll) const; StorageID resolveStorageIDImpl(StorageID storage_id, StorageNamespace where, std::optional * exception) const; + Tables getExternalTables() const; + void addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table); + bool removeExternalTable(const String & table_name); + const Scalars & getScalars() const; const Block & getScalar(const String & name) const; - Tables getExternalTables() const; - void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {}); void addScalar(const String & name, const Block & block); bool hasScalar(const String & name) const; - bool removeExternalTable(const String & table_name); StoragePtr executeTableFunction(const ASTPtr & table_expression); diff --git a/dbms/src/Interpreters/DatabaseCatalog.cpp b/dbms/src/Interpreters/DatabaseCatalog.cpp index 4cca73143f5..bbf2839ba26 100644 --- a/dbms/src/Interpreters/DatabaseCatalog.cpp +++ b/dbms/src/Interpreters/DatabaseCatalog.cpp @@ -1,12 +1,12 @@ #include #include #include -#include -#include +#include #include #include #include #include +#include namespace DB { @@ -22,6 +22,81 @@ namespace ErrorCodes extern const int DATABASE_ACCESS_DENIED; } +TemporaryTableHolder::TemporaryTableHolder(const Context & context_, + const TemporaryTableHolder::Creator & creator, const ASTPtr & query) + : context(context_.getGlobalContext()) + , temporary_tables(*DatabaseCatalog::instance().getDatabaseForTemporaryTables()) +{ + ASTPtr original_create; + ASTCreateQuery * create = dynamic_cast(query.get()); + String global_name; + if (query) + { + original_create = create->clone(); + if (create->uuid == UUIDHelpers::Nil) + create->uuid = UUIDHelpers::generateV4(); + id = create->uuid; + create->table = "_tmp_" + toString(id); + global_name = create->table; + create->database = DatabaseCatalog::TEMPORARY_DATABASE; + } + else + { + id = UUIDHelpers::generateV4(); + global_name = "_tmp_" + toString(id); + } + auto table_id = StorageID(DatabaseCatalog::TEMPORARY_DATABASE, global_name, id); + auto table = creator(table_id); + temporary_tables.createTable(context, global_name, table, original_create); + table->startup(); +} + + +TemporaryTableHolder::TemporaryTableHolder(const Context & context_, const ColumnsDescription & columns, const ASTPtr & query) + : TemporaryTableHolder + ( + context_, + [&](const StorageID & table_id) + { + return StorageMemory::create(table_id, ColumnsDescription{columns}, ConstraintsDescription{}); + }, + query + ) +{ +} + +TemporaryTableHolder::TemporaryTableHolder(TemporaryTableHolder && rhs) + : context(rhs.context), temporary_tables(rhs.temporary_tables), id(rhs.id) +{ + rhs.id = UUIDHelpers::Nil; +} + +TemporaryTableHolder & TemporaryTableHolder::operator = (TemporaryTableHolder && rhs) +{ + id = rhs.id; + rhs.id = UUIDHelpers::Nil; + return *this; +} + +TemporaryTableHolder::~TemporaryTableHolder() +{ + if (id != UUIDHelpers::Nil) + temporary_tables.removeTable(context, "_tmp_" + toString(id)); +} + +StorageID TemporaryTableHolder::getGlobalTableID() const +{ + return StorageID{DatabaseCatalog::TEMPORARY_DATABASE, "_tmp_" + toString(id), id}; +} + +StoragePtr TemporaryTableHolder::getTable() const +{ + auto table = temporary_tables.tryGetTable(context, "_tmp_" + toString(id)); + if (!table) + throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR); + return table; +} + void DatabaseCatalog::loadDatabases() { diff --git a/dbms/src/Interpreters/DatabaseCatalog.h b/dbms/src/Interpreters/DatabaseCatalog.h index aadb51c37ca..998639285b5 100644 --- a/dbms/src/Interpreters/DatabaseCatalog.h +++ b/dbms/src/Interpreters/DatabaseCatalog.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include #include #include @@ -54,6 +55,32 @@ private: std::unique_lock table_lock; }; +class ColumnsDescription; + +struct TemporaryTableHolder : boost::noncopyable +{ + typedef std::function Creator; + + TemporaryTableHolder(const Context & context, const Creator & creator, const ASTPtr & query = {}); + + /// Creates temporary table with Engine=Memory + TemporaryTableHolder(const Context & context, const ColumnsDescription & columns, const ASTPtr & query = {}); + + TemporaryTableHolder(TemporaryTableHolder && rhs); + TemporaryTableHolder & operator = (TemporaryTableHolder && rhs); + + ~TemporaryTableHolder(); + + StorageID getGlobalTableID() const; + + StoragePtr getTable() const; + + const Context & context; + IDatabase & temporary_tables; + UUID id; +}; + +using TemporaryTablesMapping = std::map>; class DatabaseCatalog : boost::noncopyable { diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index a6565912f6d..f4bd3b5a908 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -59,7 +59,6 @@ #include -#include #include #include @@ -261,10 +260,6 @@ void ExpressionAnalyzer::analyzeAggregation() void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global) { - /// Adds existing external tables (not subqueries) to the external_tables dictionary. - ExternalTablesVisitor::Data tables_data{context, external_tables}; - ExternalTablesVisitor(tables_data).visit(query); - if (do_global) { GlobalSubqueriesVisitor::Data subqueries_data(context, subquery_depth, isRemoteStorage(), diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.h b/dbms/src/Interpreters/ExpressionAnalyzer.h index ac48bfbd5cd..a9b43d3610b 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.h +++ b/dbms/src/Interpreters/ExpressionAnalyzer.h @@ -9,6 +9,7 @@ #include #include #include +#include namespace DB @@ -51,7 +52,7 @@ struct ExpressionAnalyzerData bool has_global_subqueries = false; /// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries. - Tables external_tables; + TemporaryTablesMapping external_tables; }; @@ -247,7 +248,7 @@ public: const PreparedSets & getPreparedSets() const { return prepared_sets; } /// Tables that will need to be sent to remote servers for distributed query processing. - const Tables & getExternalTables() const { return external_tables; } + const TemporaryTablesMapping & getExternalTables() const { return external_tables; } ExpressionActionsPtr simpleSelectActions(); diff --git a/dbms/src/Interpreters/ExternalTablesVisitor.h b/dbms/src/Interpreters/ExternalTablesVisitor.h deleted file mode 100644 index 73fd40e8f98..00000000000 --- a/dbms/src/Interpreters/ExternalTablesVisitor.h +++ /dev/null @@ -1,43 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -/// If node is ASTIdentifier try to extract external_storage. -class ExternalTablesMatcher -{ -public: - struct Data - { - const Context & context; - Tables & external_tables; - }; - - static void visit(ASTPtr & ast, Data & data) - { - if (const auto * t = ast->as()) - visit(*t, ast, data); - } - - static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; } - -private: - static void visit(const ASTIdentifier & node, ASTPtr &, Data & data) - { - if (auto opt_name = IdentifierSemantic::getTableName(node)) - if (auto resolved_id = data.context.tryResolveStorageID(StorageID("", *opt_name), Context::ResolveExternal)) - data.external_tables[*opt_name] = DatabaseCatalog::instance().getTable(resolved_id); - } -}; - -/// Finds in the query the usage of external tables. Fills in external_tables. -using ExternalTablesVisitor = InDepthNodeVisitor; - -} diff --git a/dbms/src/Interpreters/GlobalSubqueriesVisitor.h b/dbms/src/Interpreters/GlobalSubqueriesVisitor.h index 08f727cea64..18d1b1b31cd 100644 --- a/dbms/src/Interpreters/GlobalSubqueriesVisitor.h +++ b/dbms/src/Interpreters/GlobalSubqueriesVisitor.h @@ -34,12 +34,12 @@ public: size_t subquery_depth; bool is_remote; size_t external_table_id; - Tables & external_tables; + TemporaryTablesMapping & external_tables; SubqueriesForSets & subqueries_for_sets; bool & has_global_subqueries; Data(const Context & context_, size_t subquery_depth_, bool is_remote_, - Tables & tables, SubqueriesForSets & subqueries_for_sets_, bool & has_global_subqueries_) + TemporaryTablesMapping & tables, SubqueriesForSets & subqueries_for_sets_, bool & has_global_subqueries_) : context(context_), subquery_depth(subquery_depth_), is_remote(is_remote_), @@ -99,8 +99,8 @@ public: Block sample = interpreter->getSampleBlock(); NamesAndTypesList columns = sample.getNamesAndTypesList(); - StoragePtr external_storage = StorageMemory::create(StorageID("_external", external_table_name), ColumnsDescription{columns}, ConstraintsDescription{}); - external_storage->startup(); + auto external_storage_holder = std::make_shared(context, ColumnsDescription{columns}); + StoragePtr external_storage = external_storage_holder->getTable(); /** We replace the subquery with the name of the temporary table. * It is in this form, the request will go to the remote server. @@ -129,7 +129,7 @@ public: else ast = database_and_table_name; - external_tables[external_table_name] = external_storage; + external_tables[external_table_name] = external_storage_holder; subqueries_for_sets[external_table_name].source = interpreter->execute().in; subqueries_for_sets[external_table_name].table = external_storage; diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index a4f7d0f84b6..8f39fdc8715 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -606,8 +606,15 @@ bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create, throw Exception("Table " + create.database + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); } } - else if (context.isExternalTableExist(table_name) && create.if_not_exists) - return false; + else + { + if (context.isExternalTableExist(table_name) && create.if_not_exists) + return false; + + auto temporary_table = TemporaryTableHolder(context, properties.columns, query_ptr); + context.getSessionContext().addExternalTable(table_name, std::move(temporary_table)); + return true; + } StoragePtr res; /// NOTE: CREATE query may be rewritten by Storage creator or table function @@ -615,6 +622,7 @@ bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create, { const auto & table_function = create.as_table_function->as(); const auto & factory = TableFunctionFactory::instance(); + //FIXME storage will have wrong database name res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table); } else @@ -628,10 +636,7 @@ bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create, false); } - if (need_add_to_database) - database->createTable(context, table_name, res, query_ptr); - else - context.getSessionContext().addExternalTable(table_name, res, query_ptr); + database->createTable(context, table_name, res, query_ptr); /// We must call "startup" and "shutdown" while holding DDLGuard. /// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 16f8d0f2236..2f88a0f70ac 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -322,7 +322,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( /// Save the new temporary tables in the query context for (const auto & it : query_analyzer->getExternalTables()) if (!context->isExternalTableExist(it.first)) - context->addExternalTable(it.first, it.second); + context->addExternalTable(it.first, std::move(*it.second)); } if (!options.only_analyze || options.modify_inplace) diff --git a/dbms/src/Interpreters/tests/expression_analyzer.cpp b/dbms/src/Interpreters/tests/expression_analyzer.cpp index 356b557e85f..2f8b6b2aef2 100644 --- a/dbms/src/Interpreters/tests/expression_analyzer.cpp +++ b/dbms/src/Interpreters/tests/expression_analyzer.cpp @@ -102,7 +102,7 @@ int main() DatabaseCatalog::instance().attachDatabase("system", system_database); //context.setCurrentDatabase("system"); system_database->attachTable("one", StorageSystemOne::create("one")); - system_database->attachTable("numbers", StorageSystemNumbers::create("numbers", false)); + system_database->attachTable("numbers", StorageSystemNumbers::create(StorageID("system", "numbers"), false)); size_t success = 0; for (auto & entry : queries) diff --git a/dbms/src/Storages/LiveView/StorageLiveView.cpp b/dbms/src/Storages/LiveView/StorageLiveView.cpp index d7567eaafe0..a2362b74a94 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.cpp +++ b/dbms/src/Storages/LiveView/StorageLiveView.cpp @@ -140,11 +140,12 @@ BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes) auto block_context = std::make_unique(global_context); block_context->makeQueryContext(); - auto blocks_storage_id = getBlocksStorageID(); - auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, getParentStorage()->getColumns(), - std::move(pipes), QueryProcessingStage::WithMergeableState); - - block_context->addExternalTable(blocks_storage_id.table_name, blocks_storage); + auto creator = [&](const StorageID & blocks_id_global) + { + return StorageBlocks::createStorage(blocks_id_global, getParentStorage()->getColumns(), + std::move(pipes), QueryProcessingStage::WithMergeableState); + }; + block_context->addExternalTable(getBlocksTableName(), TemporaryTableHolder(global_context, creator)); InterpreterSelectQuery select(getInnerBlocksQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete)); BlockInputStreamPtr data = std::make_shared(select.execute().in); @@ -196,8 +197,6 @@ void StorageLiveView::writeIntoLiveView( } } - auto blocks_storage_id = live_view.getBlocksStorageID(); - if (!is_block_processed) { ASTPtr mergeable_query = live_view.getInnerQuery(); @@ -208,10 +207,14 @@ void StorageLiveView::writeIntoLiveView( Pipes pipes; pipes.emplace_back(std::make_shared(block.cloneEmpty(), Chunk(block.getColumns(), block.rows()))); - auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, - live_view.getParentStorage()->getColumns(), std::move(pipes), QueryProcessingStage::FetchColumns); + auto creator = [&](const StorageID & blocks_id_global) + { + return StorageBlocks::createStorage(blocks_id_global, live_view.getParentStorage()->getColumns(), + std::move(pipes), QueryProcessingStage::FetchColumns); + }; + TemporaryTableHolder blocks_storage(context, creator); - InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage, + InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage.getTable(), QueryProcessingStage::WithMergeableState); auto data_mergeable_stream = std::make_shared( diff --git a/dbms/src/Storages/LiveView/StorageLiveView.h b/dbms/src/Storages/LiveView/StorageLiveView.h index 0d5b2874869..8402d447279 100644 --- a/dbms/src/Storages/LiveView/StorageLiveView.h +++ b/dbms/src/Storages/LiveView/StorageLiveView.h @@ -49,9 +49,9 @@ public: ~StorageLiveView() override; String getName() const override { return "LiveView"; } bool isView() const override { return true; } - StorageID getBlocksStorageID() const + String getBlocksTableName() const { - return StorageID("", getStorageID().table_name + "_blocks"); + return getStorageID().table_name + "_blocks"; } StoragePtr getParentStorage() const { return DatabaseCatalog::instance().getTable(select_table_id); } diff --git a/dbms/src/Storages/StorageInput.cpp b/dbms/src/Storages/StorageInput.cpp index b44f11bae70..e30ae55e715 100644 --- a/dbms/src/Storages/StorageInput.cpp +++ b/dbms/src/Storages/StorageInput.cpp @@ -18,8 +18,8 @@ namespace ErrorCodes extern const int INVALID_USAGE_OF_INPUT; } -StorageInput::StorageInput(const String & table_name_, const ColumnsDescription & columns_) - : IStorage({"", table_name_}) +StorageInput::StorageInput(const StorageID & table_id, const ColumnsDescription & columns_) + : IStorage(table_id) { setColumns(columns_); } diff --git a/dbms/src/Storages/StorageInput.h b/dbms/src/Storages/StorageInput.h index 0a724df10ad..16d195682e9 100644 --- a/dbms/src/Storages/StorageInput.h +++ b/dbms/src/Storages/StorageInput.h @@ -29,6 +29,6 @@ private: BlockInputStreamPtr input_stream; protected: - StorageInput(const String & table_name_, const ColumnsDescription & columns_); + StorageInput(const StorageID & table_id, const ColumnsDescription & columns_); }; } diff --git a/dbms/src/Storages/System/StorageSystemNumbers.cpp b/dbms/src/Storages/System/StorageSystemNumbers.cpp index f5a5bb58b72..0f7ea8bee38 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/System/StorageSystemNumbers.cpp @@ -115,8 +115,8 @@ private: } -StorageSystemNumbers::StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional limit_, UInt64 offset_, bool even_distribution_) - : IStorage({"system", name_}), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_) +StorageSystemNumbers::StorageSystemNumbers(const StorageID & table_id, bool multithreaded_, std::optional limit_, UInt64 offset_, bool even_distribution_) + : IStorage(table_id), multithreaded(multithreaded_), even_distribution(even_distribution_), limit(limit_), offset(offset_) { setColumns(ColumnsDescription({{"number", std::make_shared()}})); } diff --git a/dbms/src/Storages/System/StorageSystemNumbers.h b/dbms/src/Storages/System/StorageSystemNumbers.h index 178284f2d3f..8482e5eeca9 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.h +++ b/dbms/src/Storages/System/StorageSystemNumbers.h @@ -48,7 +48,7 @@ private: protected: /// If even_distribution is true, numbers are distributed evenly between streams. /// Otherwise, streams concurrently increment atomic. - StorageSystemNumbers(const std::string & name_, bool multithreaded_, std::optional limit_ = std::nullopt, UInt64 offset_ = 0, bool even_distribution_ = true); + StorageSystemNumbers(const StorageID & table_id, bool multithreaded_, std::optional limit_ = std::nullopt, UInt64 offset_ = 0, bool even_distribution_ = true); }; } diff --git a/dbms/src/Storages/System/attachSystemTables.cpp b/dbms/src/Storages/System/attachSystemTables.cpp index 06f00783384..511bf31d881 100644 --- a/dbms/src/Storages/System/attachSystemTables.cpp +++ b/dbms/src/Storages/System/attachSystemTables.cpp @@ -51,8 +51,8 @@ namespace DB void attachSystemTablesLocal(IDatabase & system_database) { system_database.attachTable("one", StorageSystemOne::create("one")); - system_database.attachTable("numbers", StorageSystemNumbers::create("numbers", false)); - system_database.attachTable("numbers_mt", StorageSystemNumbers::create("numbers_mt", true)); + system_database.attachTable("numbers", StorageSystemNumbers::create(StorageID("system", "numbers"), false)); + system_database.attachTable("numbers_mt", StorageSystemNumbers::create(StorageID("system", "numbers_mt"), true)); system_database.attachTable("databases", StorageSystemDatabases::create("databases")); system_database.attachTable("tables", StorageSystemTables::create("tables")); system_database.attachTable("columns", StorageSystemColumns::create("columns")); diff --git a/dbms/src/Storages/tests/system_numbers.cpp b/dbms/src/Storages/tests/system_numbers.cpp index 0abb7ce7b4e..9fc84d76f5c 100644 --- a/dbms/src/Storages/tests/system_numbers.cpp +++ b/dbms/src/Storages/tests/system_numbers.cpp @@ -15,7 +15,7 @@ try { using namespace DB; - StoragePtr table = StorageSystemNumbers::create("numbers", false); + StoragePtr table = StorageSystemNumbers::create(StorageID("test", "numbers"), false); Names column_names; column_names.push_back("number"); diff --git a/dbms/src/TableFunctions/TableFunctionInput.cpp b/dbms/src/TableFunctions/TableFunctionInput.cpp index 45aca31de0e..41bb292c2b2 100644 --- a/dbms/src/TableFunctions/TableFunctionInput.cpp +++ b/dbms/src/TableFunctions/TableFunctionInput.cpp @@ -41,7 +41,7 @@ StoragePtr TableFunctionInput::executeImpl(const ASTPtr & ast_function, const Co String structure = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context)->as().value.safeGet(); auto columns = parseColumnsListFromString(structure, context); - StoragePtr storage = StorageInput::create(table_name, columns); + StoragePtr storage = StorageInput::create(StorageID(getDatabaseName(), table_name), columns); storage->startup(); diff --git a/dbms/src/TableFunctions/TableFunctionNumbers.cpp b/dbms/src/TableFunctions/TableFunctionNumbers.cpp index 615a54dd1b4..bb414f4783f 100644 --- a/dbms/src/TableFunctions/TableFunctionNumbers.cpp +++ b/dbms/src/TableFunctions/TableFunctionNumbers.cpp @@ -35,7 +35,7 @@ StoragePtr TableFunctionNumbers::executeImpl(const ASTPtr & ast_f context.checkAccess(AccessType::numbers); - auto res = StorageSystemNumbers::create(table_name, multithreaded, length, offset, false); + auto res = StorageSystemNumbers::create(StorageID(getDatabaseName(), table_name), multithreaded, length, offset, false); res->startup(); return res; } diff --git a/dbms/src/TableFunctions/TableFunctionRemote.cpp b/dbms/src/TableFunctions/TableFunctionRemote.cpp index 7d86c6b4579..502e5afd9c3 100644 --- a/dbms/src/TableFunctions/TableFunctionRemote.cpp +++ b/dbms/src/TableFunctions/TableFunctionRemote.cpp @@ -188,17 +188,20 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C secure); } - auto structure_remote_table = getStructureOfRemoteTable(*cluster, {remote_database, remote_table}, context, remote_table_function_ptr); + auto remote_table_id = StorageID::createEmpty(); + remote_table_id.database_name = remote_database; + remote_table_id.table_name = remote_table; + auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr); StoragePtr res = remote_table_function_ptr ? StorageDistributed::createWithOwnCluster( - StorageID("", table_name), + StorageID(getDatabaseName(), table_name), structure_remote_table, remote_table_function_ptr, cluster, context) : StorageDistributed::createWithOwnCluster( - StorageID("", table_name), + StorageID(getDatabaseName(), table_name), structure_remote_table, remote_database, remote_table, diff --git a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py index 8c5bc5b8eb2..901852f3408 100755 --- a/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py +++ b/dbms/tests/queries/0_stateless/00979_live_view_watch_live.py @@ -23,7 +23,7 @@ with client(name='client1>', log=log) as client1, client(name='client2>', log=lo client1.send('DROP TABLE IF EXISTS test.lv') client1.expect(prompt) - client1.send(' DROP TABLE IF EXISTS test.mt') + client1.send('DROP TABLE IF EXISTS test.mt') client1.expect(prompt) client1.send('CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple()') client1.expect(prompt)