diff --git a/dbms/src/DataStreams/tests/union_stream2.cpp b/dbms/src/DataStreams/tests/union_stream2.cpp index ab0b583b8e5..577d3cec41c 100644 --- a/dbms/src/DataStreams/tests/union_stream2.cpp +++ b/dbms/src/DataStreams/tests/union_stream2.cpp @@ -33,7 +33,7 @@ try Names column_names; column_names.push_back("WatchID"); - StoragePtr table = context.getTable("default", "hits6"); + StoragePtr table = DatabaseCatalog::instance().getTable({"default", "hits6"}); QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context); BlockInputStreams streams = table->read(column_names, {}, context, stage, settings.max_block_size, settings.max_threads); diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index d99781ab816..e04ca8d55c7 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -627,8 +627,8 @@ void DDLWorker::processTask(DDLTask & task, const ZooKeeperPtr & zookeeper) if (!query_with_table->table.empty()) { /// It's not CREATE DATABASE - String database = query_with_table->database.empty() ? context.getCurrentDatabase() : query_with_table->database; - storage = context.tryGetTable(database, query_with_table->table); + StorageID table_id{*query_with_table, context}; + storage = DatabaseCatalog::instance().tryGetTable(table_id); } /// For some reason we check consistency of cluster definition only diff --git a/dbms/src/Interpreters/DatabaseCatalog.cpp b/dbms/src/Interpreters/DatabaseCatalog.cpp index fbf978c5d64..05551a88d1c 100644 --- a/dbms/src/Interpreters/DatabaseCatalog.cpp +++ b/dbms/src/Interpreters/DatabaseCatalog.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -149,14 +150,13 @@ void DatabaseCatalog::attachDatabase(const String & database_name, const Databas } -DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool drop) +DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name, bool drop, bool check_empty) { std::lock_guard lock{databases_mutex}; assertDatabaseExistsUnlocked(database_name); auto db = databases.find(database_name)->second; - if (!db->empty(*global_context)) - if (!db->empty(*global_context)) + if (check_empty && !db->empty(*global_context)) throw Exception("New table appeared in database being dropped or detached. Try again.", ErrorCodes::DATABASE_NOT_EMPTY); databases.erase(database_name); diff --git a/dbms/src/Interpreters/DatabaseCatalog.h b/dbms/src/Interpreters/DatabaseCatalog.h index e02f6c80c67..0439e98198e 100644 --- a/dbms/src/Interpreters/DatabaseCatalog.h +++ b/dbms/src/Interpreters/DatabaseCatalog.h @@ -82,7 +82,7 @@ public: DatabasePtr getSystemDatabase() const; void attachDatabase(const String & database_name, const DatabasePtr & database); - DatabasePtr detachDatabase(const String & database_name, bool drop = false); + DatabasePtr detachDatabase(const String & database_name, bool drop = false, bool check_empty = true); DatabasePtr getDatabase(const String & database_name, const Context & local_context) const; DatabasePtr getDatabase(const String & database_name) const; diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index b155019187b..d8f045b90d2 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -41,14 +41,12 @@ BlockIO InterpreterAlterQuery::execute() return executeDDLQueryOnCluster(query_ptr, context, getRequiredAccess()); context.checkAccess(getRequiredAccess()); - - const String & table_name = alter.table; - String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database; - StoragePtr table = context.getTable(database_name, table_name); + StorageID table_id{alter, context}; + StoragePtr table = DatabaseCatalog::instance().getTable(table_id); /// Add default database to table identifiers that we can encounter in e.g. default expressions, /// mutation expression, etc. - AddDefaultDatabaseVisitor visitor(database_name); + AddDefaultDatabaseVisitor visitor(table_id.getDatabaseName()); ASTPtr command_list_ptr = alter.command_list->ptr(); visitor.visit(command_list_ptr); diff --git a/dbms/src/Interpreters/InterpreterCheckQuery.cpp b/dbms/src/Interpreters/InterpreterCheckQuery.cpp index d75777cc4ce..f1ba695789a 100644 --- a/dbms/src/Interpreters/InterpreterCheckQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCheckQuery.cpp @@ -38,11 +38,10 @@ InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, const Co BlockIO InterpreterCheckQuery::execute() { const auto & check = query_ptr->as(); - const String & table_name = check.table; - String database_name = check.database.empty() ? context.getCurrentDatabase() : check.database; + StorageID table_id{check, context}; - context.checkAccess(AccessType::SHOW, database_name, table_name); - StoragePtr table = context.getTable(database_name, table_name); + context.checkAccess(AccessType::SHOW, table_id.database_name, table_id.table_name); + StoragePtr table = DatabaseCatalog::instance().getTable(table_id); auto check_results = table->checkData(query_ptr, context); Block block; diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index addb6edd836..69ccb387bd9 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -147,6 +147,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) bool renamed = false; try { + //FIXME is it possible to attach db only after it was loaded? (no, loadStoredObjects adds view dependencies) DatabaseCatalog::instance().attachDatabase(database_name, database); added = true; @@ -163,7 +164,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) if (renamed) Poco::File(metadata_file_tmp_path).remove(); if (added) - DatabaseCatalog::instance().detachDatabase(database_name); + DatabaseCatalog::instance().detachDatabase(database_name, false, false); throw; } @@ -411,8 +412,8 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS } else if (!create.as_table.empty()) { - String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database; - StoragePtr as_storage = context.getTable(as_database_name, create.as_table); + String as_database_name = context.resolveDatabase(create.as_database); + StoragePtr as_storage = DatabaseCatalog::instance().getTable({as_database_name, create.as_table}); /// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table. as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId()); diff --git a/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp b/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp index 63fa814aadc..c27f07f86b6 100644 --- a/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp +++ b/dbms/src/Interpreters/InterpreterKillQueryQuery.cpp @@ -236,23 +236,22 @@ BlockIO InterpreterKillQueryQuery::execute() header.insert(0, {ColumnString::create(), std::make_shared(), "kill_status"}); MutableColumns res_columns = header.cloneEmptyColumns(); - String database_name, table_name; + auto table_id = StorageID::createEmpty(); for (size_t i = 0; i < mutations_block.rows(); ++i) { - database_name = database_col.getDataAt(i).toString(); - table_name = table_col.getDataAt(i).toString(); + table_id = StorageID{database_col.getDataAt(i).toString(), table_col.getDataAt(i).toString()}; auto mutation_id = mutation_id_col.getDataAt(i).toString(); CancellationCode code = CancellationCode::Unknown; if (!query.test) { - auto storage = context.tryGetTable(database_name, table_name); + auto storage = DatabaseCatalog::instance().tryGetTable(table_id); if (!storage) code = CancellationCode::NotFound; else { - if (!context.getAccessRights()->isGranted(&Poco::Logger::get("InterpreterKillQueryQuery"), AccessType::KILL_MUTATION, database_name, table_name)) + if (!context.getAccessRights()->isGranted(&Poco::Logger::get("InterpreterKillQueryQuery"), AccessType::KILL_MUTATION, table_id.database_name, table_id.table_name)) continue; code = storage->killMutation(mutation_id); } @@ -261,9 +260,9 @@ BlockIO InterpreterKillQueryQuery::execute() insertResultRow(i, code, mutations_block, header, res_columns); } - if (res_columns[0]->empty() && !table_name.empty()) + if (res_columns[0]->empty() && table_id) throw Exception( - "Not allowed to kill mutation on " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name), + "Not allowed to kill mutation on " + table_id.getNameForLogs(), ErrorCodes::ACCESS_DENIED); res_io.in = std::make_shared(header.cloneWithColumns(std::move(res_columns))); diff --git a/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp b/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp index 74551329343..470dede3540 100644 --- a/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp +++ b/dbms/src/Interpreters/InterpreterOptimizeQuery.cpp @@ -25,7 +25,7 @@ BlockIO InterpreterOptimizeQuery::execute() context.checkAccess(getRequiredAccess()); - StoragePtr table = context.getTable(ast.database, ast.table); + StoragePtr table = DatabaseCatalog::instance().getTable(StorageID{ast, context}); table->optimize(query_ptr, ast.partition, ast.final, ast.deduplicate, context); return {}; } diff --git a/dbms/src/Interpreters/InterpreterRenameQuery.cpp b/dbms/src/Interpreters/InterpreterRenameQuery.cpp index 820676fa2d8..c6e2fbf48f7 100644 --- a/dbms/src/Interpreters/InterpreterRenameQuery.cpp +++ b/dbms/src/Interpreters/InterpreterRenameQuery.cpp @@ -93,7 +93,7 @@ BlockIO InterpreterRenameQuery::execute() for (auto & elem : descriptions) { database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), context); - auto from_table = context.getTable(elem.from_database_name, elem.from_table_name); + auto from_table = database_catalog.getTable({elem.from_database_name, elem.from_table_name}); auto from_table_lock = from_table->lockExclusively(context.getCurrentQueryId()); database_catalog.getDatabase(elem.from_database_name)->renameTable( diff --git a/dbms/src/Interpreters/InterpreterSystemQuery.cpp b/dbms/src/Interpreters/InterpreterSystemQuery.cpp index ca6de13d0fd..81b22919e55 100644 --- a/dbms/src/Interpreters/InterpreterSystemQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSystemQuery.cpp @@ -313,14 +313,14 @@ BlockIO InterpreterSystemQuery::execute() StoragePtr InterpreterSystemQuery::tryRestartReplica(const String & database_name, const String & table_name, Context & system_context) { context.checkAccess(AccessType::RESTART_REPLICA, database_name, table_name); - auto database = DatabaseCatalog::instance().getDatabase(database_name, system_context); + auto database = DatabaseCatalog::instance().getDatabase(database_name); auto table_ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, table_name); ASTPtr create_ast; /// Detach actions { - auto table = system_context.tryGetTable(database_name, table_name); + auto table = DatabaseCatalog::instance().tryGetTable({database_name, table_name}); if (!table || !dynamic_cast(table.get())) return nullptr; @@ -385,11 +385,11 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context) void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query) { - String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase(); + String database_name = context.resolveDatabase(query.database); const String & table_name = query.table; context.checkAccess(AccessType::SYNC_REPLICA, database_name, table_name); - StoragePtr table = context.getTable(database_name, table_name); + StoragePtr table = DatabaseCatalog::instance().getTable({database_name, table_name}); if (auto storage_replicated = dynamic_cast(table.get())) { @@ -409,11 +409,11 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query) void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query) { - String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase(); + String database_name = context.resolveDatabase(query.database); String & table_name = query.table; context.checkAccess(AccessType::FLUSH_DISTRIBUTED, database_name, table_name); - if (auto storage_distributed = dynamic_cast(context.getTable(database_name, table_name).get())) + if (auto storage_distributed = dynamic_cast(DatabaseCatalog::instance().getTable({database_name, table_name}).get())) storage_distributed->flushClusterNodesAllData(); else throw Exception("Table " + database_name + "." + table_name + " is not distributed", ErrorCodes::BAD_ARGUMENTS); diff --git a/dbms/src/Interpreters/InterpreterWatchQuery.cpp b/dbms/src/Interpreters/InterpreterWatchQuery.cpp index 6a1fac7b97b..f03b91a2e42 100644 --- a/dbms/src/Interpreters/InterpreterWatchQuery.cpp +++ b/dbms/src/Interpreters/InterpreterWatchQuery.cpp @@ -41,28 +41,18 @@ BlockIO InterpreterWatchQuery::execute() BlockIO res; const ASTWatchQuery & query = typeid_cast(*query_ptr); - String database; - String table; - /// Get database - if (!query.database.empty()) - database = query.database; - else - database = context.getCurrentDatabase(); - - /// Get table - table = query.table; + StorageID table_id{query, context}; /// Get storage - storage = context.tryGetTable(database, table); + storage = DatabaseCatalog::instance().tryGetTable(table_id); if (!storage) - throw Exception("Table " + backQuoteIfNeed(database) + "." + - backQuoteIfNeed(table) + " doesn't exist.", + throw Exception("Table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); /// List of columns to read to execute the query. Names required_columns = storage->getColumns().getNamesOfPhysical(); - context.checkAccess(AccessType::SELECT, database, table, required_columns); + context.checkAccess(AccessType::SELECT, table_id.database_name, table_id.table_name, required_columns); /// Get context settings for this query const Settings & settings = context.getSettingsRef(); diff --git a/dbms/src/Interpreters/SystemLog.h b/dbms/src/Interpreters/SystemLog.h index 202f9ce2055..e0153506a40 100644 --- a/dbms/src/Interpreters/SystemLog.h +++ b/dbms/src/Interpreters/SystemLog.h @@ -127,8 +127,9 @@ protected: private: /* Saving thread data */ Context & context; - const String database_name; - const String table_name; + const StorageID table_id; + //const String database_name; + //const String table_name; const String storage_def; StoragePtr table; bool is_prepared = false; @@ -170,11 +171,12 @@ SystemLog::SystemLog(Context & context_, const String & table_name_, const String & storage_def_, size_t flush_interval_milliseconds_) - : context(context_), - database_name(database_name_), table_name(table_name_), storage_def(storage_def_), + : context(context_) + , table_id(database_name_, table_name_) + , storage_def(storage_def_), flush_interval_milliseconds(flush_interval_milliseconds_) { - log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")"); + log = &Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")"); saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); }); } @@ -338,8 +340,8 @@ void SystemLog::flushImpl(const std::vector & to_flush, /// This is needed to support DEFAULT-columns in table. std::unique_ptr insert = std::make_unique(); - insert->database = database_name; - insert->table = table_name; + insert->database = table_id.database_name; + insert->table = table_id.table_name; ASTPtr query_ptr(insert.release()); InterpreterInsertQuery interpreter(query_ptr, context); @@ -363,9 +365,9 @@ void SystemLog::flushImpl(const std::vector & to_flush, template void SystemLog::prepareTable() { - String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name); + String description = table_id.getNameForLogs(); - table = context.tryGetTable(database_name, table_name); + table = DatabaseCatalog::instance().tryGetTable(table_id); if (table) { @@ -376,18 +378,18 @@ void SystemLog::prepareTable() { /// Rename the existing table. int suffix = 0; - while (DatabaseCatalog::instance().isTableExist({database_name, table_name + "_" + toString(suffix)}, context)) + while (DatabaseCatalog::instance().isTableExist({table_id.database_name, table_id.table_name + "_" + toString(suffix)}, context)) ++suffix; auto rename = std::make_shared(); ASTRenameQuery::Table from; - from.database = database_name; - from.table = table_name; + from.database = table_id.database_name; + from.table = table_id.table_name; ASTRenameQuery::Table to; - to.database = database_name; - to.table = table_name + "_" + toString(suffix); + to.database = table_id.database_name; + to.table = table_id.table_name + "_" + toString(suffix); ASTRenameQuery::Element elem; elem.from = from; @@ -414,8 +416,8 @@ void SystemLog::prepareTable() auto create = std::make_shared(); - create->database = database_name; - create->table = table_name; + create->database = table_id.database_name; + create->table = table_id.table_name; Block sample = LogElement::createBlock(); @@ -433,7 +435,7 @@ void SystemLog::prepareTable() interpreter.setInternal(true); interpreter.execute(); - table = context.getTable(database_name, table_name); + table = DatabaseCatalog::instance().getTable(table_id); } is_prepared = true; diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 58cd66e4ee3..fbf80b3af00 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -5,9 +5,7 @@ #include #include #include -#include #include -#include #include #include #include @@ -25,8 +23,6 @@ #include #include #include -#include -#include #include #include #include @@ -68,17 +64,14 @@ StorageBuffer::StorageBuffer( size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, - const String & destination_database_, - const String & destination_table_, + const StorageID & destination_id_, bool allow_materialized_) : IStorage(table_id_) , global_context(context_) , num_shards(num_shards_), buffers(num_shards_) , min_thresholds(min_thresholds_) , max_thresholds(max_thresholds_) - , destination_database(destination_database_) - , destination_table(destination_table_) - , no_destination(destination_database.empty() && destination_table.empty()) + , destination_id(destination_id_) , allow_materialized(allow_materialized_) , log(&Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")")) { @@ -142,9 +135,9 @@ private: QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context & context) const { - if (!no_destination) + if (destination_id) { - auto destination = context.getTable(destination_database, destination_table); + auto destination = DatabaseCatalog::instance().getTable(destination_id); if (destination.get() == this) throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); @@ -186,9 +179,9 @@ Pipes StorageBuffer::readWithProcessors( { Pipes pipes_from_dst; - if (!no_destination) + if (destination_id) { - auto destination = context.getTable(destination_database, destination_table); + auto destination = DatabaseCatalog::instance().getTable(destination_id); if (destination.get() == this) throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); @@ -219,7 +212,7 @@ Pipes StorageBuffer::readWithProcessors( { if (!destination->hasColumn(column_name)) { - LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) + LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs() << " doesn't have column " << backQuoteIfNeed(column_name) << ". The default values are used."); boost::range::remove_erase(columns_intersection, column_name); continue; @@ -228,7 +221,7 @@ Pipes StorageBuffer::readWithProcessors( const auto & col = getColumn(column_name); if (!dst_col.type->equals(*col.type)) { - LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) + LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs() << " has different type of column " << backQuoteIfNeed(column_name) << " (" << dst_col.type->getName() << " != " << col.type->getName() << "). Data from destination table are converted."); header_after_adding_defaults.getByName(column_name) = ColumnWithTypeAndName(dst_col.type, column_name); @@ -237,7 +230,7 @@ Pipes StorageBuffer::readWithProcessors( if (columns_intersection.empty()) { - LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) + LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs() << " has no common columns with block in buffer. Block of data is skipped."); } else @@ -366,9 +359,9 @@ public: return; StoragePtr destination; - if (!storage.no_destination) + if (storage.destination_id) { - destination = storage.global_context.tryGetTable(storage.destination_database, storage.destination_table); + destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id); if (destination.get() == &storage) throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP); } @@ -378,7 +371,7 @@ public: /// If the block already exceeds the maximum limit, then we skip the buffer. if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes) { - if (!storage.no_destination) + if (storage.destination_id) { LOG_TRACE(storage.log, "Writing block with " << rows << " rows, " << bytes << " bytes directly."); storage.writeBlockToDestination(block, destination); @@ -462,10 +455,10 @@ BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const Contex bool StorageBuffer::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const { - if (no_destination) + if (!destination_id) return false; - auto destination = global_context.getTable(destination_database, destination_table); + auto destination = DatabaseCatalog::instance().getTable(destination_id); if (destination.get() == this) throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP); @@ -621,7 +614,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds."); - if (no_destination) + if (!destination_id) return; /** For simplicity, buffer is locked during write. @@ -632,7 +625,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc */ try { - writeBlockToDestination(block_to_write, global_context.tryGetTable(destination_database, destination_table)); + writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id)); } catch (...) { @@ -656,12 +649,12 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table) { - if (no_destination || !block) + if (!destination_id || !block) return; if (!table) { - LOG_ERROR(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) << " doesn't exist. Block of data is discarded."); + LOG_ERROR(log, "Destination table " << destination_id.getNameForLogs() << " doesn't exist. Block of data is discarded."); return; } @@ -669,8 +662,8 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl auto insert = std::make_shared(); - insert->database = destination_database; - insert->table = destination_table; + insert->database = destination_id.database_name; + insert->table = destination_id.table_name; /** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table. * This will support some of the cases (but not all) when the table structure does not match. @@ -685,7 +678,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl auto column = block.getByName(dst_col.name); if (!column.type->equals(*dst_col.type)) { - LOG_WARNING(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) + LOG_WARNING(log, "Destination table " << destination_id.getNameForLogs() << " have different type of column " << backQuoteIfNeed(column.name) << " (" << dst_col.type->getName() << " != " << column.type->getName() << "). Block of data is converted."); @@ -699,14 +692,14 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl if (block_to_write.columns() == 0) { - LOG_ERROR(log, "Destination table " << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) + LOG_ERROR(log, "Destination table " << destination_id.getNameForLogs() << " have no common columns with block in buffer. Block of data is discarded."); return; } if (block_to_write.columns() != block.columns()) LOG_WARNING(log, "Not all columns from block in buffer exist in destination table " - << backQuoteIfNeed(destination_database) << "." << backQuoteIfNeed(destination_table) << ". Some columns are discarded."); + << destination_id.getNameForLogs() << ". Some columns are discarded."); auto list_of_columns = std::make_shared(); insert->columns = list_of_columns; @@ -788,6 +781,7 @@ void registerStorageBuffer(StorageFactory & factory) " destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + //FIXME currentDatabase() at the moment of table creation can be different from currentDatabase() at the moment when table is loaded|used engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context); engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.local_context); @@ -803,6 +797,14 @@ void registerStorageBuffer(StorageFactory & factory) UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber(), engine_args[7]->as().value); UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber(), engine_args[8]->as().value); + /// If destination_id is not set, do not write data from the buffer, but simply empty the buffer. + StorageID destination_id = StorageID::createEmpty(); + if (!destination_table.empty()) + { + destination_id.database_name = args.context.resolveDatabase(destination_database); + destination_id.table_name = destination_table; + } + return StorageBuffer::create( args.table_id, args.columns, @@ -811,7 +813,7 @@ void registerStorageBuffer(StorageFactory & factory) num_buckets, StorageBuffer::Thresholds{min_time, min_rows, min_bytes}, StorageBuffer::Thresholds{max_time, max_rows, max_bytes}, - destination_database, destination_table, + destination_id, static_cast(args.local_context.getSettingsRef().insert_allow_materialized_columns)); }); } diff --git a/dbms/src/Storages/StorageBuffer.h b/dbms/src/Storages/StorageBuffer.h index d10288dacc7..772a8af2497 100644 --- a/dbms/src/Storages/StorageBuffer.h +++ b/dbms/src/Storages/StorageBuffer.h @@ -76,9 +76,9 @@ public: bool supportsSampling() const override { return true; } bool supportsPrewhere() const override { - if (no_destination) + if (!destination_id) return false; - auto dest = global_context.tryGetTable(destination_database, destination_table); + auto dest = DatabaseCatalog::instance().tryGetTable(destination_id); if (dest && dest.get() != this) return dest->supportsPrewhere(); return false; @@ -112,9 +112,7 @@ private: const Thresholds min_thresholds; const Thresholds max_thresholds; - const String destination_database; - const String destination_table; - bool no_destination; /// If set, do not write data from the buffer, but simply empty the buffer. + StorageID destination_id; bool allow_materialized; Poco::Logger * log; @@ -146,8 +144,7 @@ protected: size_t num_shards_, const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, - const String & destination_database_, - const String & destination_table_, + const StorageID & destination_id, bool allow_materialized_); }; diff --git a/dbms/src/Storages/StorageID.cpp b/dbms/src/Storages/StorageID.cpp new file mode 100644 index 00000000000..2605b4e2c31 --- /dev/null +++ b/dbms/src/Storages/StorageID.cpp @@ -0,0 +1,50 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +StorageID::StorageID(const ASTQueryWithTableAndOutput & query, const Context & local_context) +{ + database_name = local_context.resolveDatabase(query.database); + table_name = query.table; + uuid = query.uuid; + assertNotEmpty(); +} + +String StorageID::getNameForLogs() const +{ + assertNotEmpty(); + return (database_name.empty() ? "" : backQuoteIfNeed(database_name) + ".") + backQuoteIfNeed(table_name) + + (hasUUID() ? " (UUID " + toString(uuid) + ")" : ""); +} + +bool StorageID::operator<(const StorageID & rhs) const +{ + assertNotEmpty(); + /// It's needed for ViewDependencies + if (!hasUUID() && !rhs.hasUUID()) + /// If both IDs don't have UUID, compare them like pair of strings + return std::tie(database_name, table_name) < std::tie(rhs.database_name, rhs.table_name); + else if (hasUUID() && rhs.hasUUID()) + /// If both IDs have UUID, compare UUIDs and ignore database and table name + return uuid < rhs.uuid; + else + /// All IDs without UUID are less, then all IDs with UUID + return !hasUUID(); +} + +void StorageID::assertNotEmpty() const +{ + if (empty()) + throw Exception("Both table name and UUID are empty", ErrorCodes::LOGICAL_ERROR); + if (table_name == TABLE_WITH_UUID_NAME_PLACEHOLDER && !hasUUID()) + throw Exception("Table name was replaced with placeholder, but UUID is Nil", ErrorCodes::LOGICAL_ERROR); + if (table_name.empty() && !database_name.empty()) + throw Exception("Table name is empty, but database name is not", ErrorCodes::LOGICAL_ERROR); +} + +} diff --git a/dbms/src/Storages/StorageID.h b/dbms/src/Storages/StorageID.h index b93a62bd6ba..b940d3b4180 100644 --- a/dbms/src/Storages/StorageID.h +++ b/dbms/src/Storages/StorageID.h @@ -1,8 +1,6 @@ #pragma once #include #include -#include -#include #include namespace DB @@ -15,19 +13,24 @@ namespace ErrorCodes static constexpr char const * TABLE_WITH_UUID_NAME_PLACEHOLDER = "_"; +class ASTQueryWithTableAndOutput; +class Context; + struct StorageID { String database_name; String table_name; - UUID uuid = UUID{UInt128(0, 0)}; + UUID uuid = UUIDHelpers::Nil; - StorageID(const String & database, const String & table, UUID uuid_ = UUID{UInt128(0, 0)}) + StorageID(const String & database, const String & table, UUID uuid_ = UUIDHelpers::Nil) : database_name(database), table_name(table), uuid(uuid_) { assertNotEmpty(); } + StorageID(const ASTQueryWithTableAndOutput & query, const Context & local_context); + String getDatabaseName() const { assertNotEmpty(); @@ -46,12 +49,7 @@ struct StorageID return (database_name.empty() ? "" : database_name + ".") + table_name; } - String getNameForLogs() const - { - assertNotEmpty(); - return (database_name.empty() ? "" : backQuoteIfNeed(database_name) + ".") + backQuoteIfNeed(table_name) - + (hasUUID() ? " (UUID " + toString(uuid) + ")" : ""); - } + String getNameForLogs() const; explicit operator bool () const { @@ -68,30 +66,9 @@ struct StorageID return uuid != UUID{UInt128(0, 0)}; } - bool operator<(const StorageID & rhs) const - { - assertNotEmpty(); - /// It's needed for ViewDependencies - if (!hasUUID() && !rhs.hasUUID()) - /// If both IDs don't have UUID, compare them like pair of strings - return std::tie(database_name, table_name) < std::tie(rhs.database_name, rhs.table_name); - else if (hasUUID() && rhs.hasUUID()) - /// If both IDs have UUID, compare UUIDs and ignore database and table name - return uuid < rhs.uuid; - else - /// All IDs without UUID are less, then all IDs with UUID - return !hasUUID(); - } + bool operator<(const StorageID & rhs) const; - void assertNotEmpty() const - { - if (empty()) - throw Exception("Both table name and UUID are empty", ErrorCodes::LOGICAL_ERROR); - if (table_name == TABLE_WITH_UUID_NAME_PLACEHOLDER && !hasUUID()) - throw Exception("Table name was replaced with placeholder, but UUID is Nil", ErrorCodes::LOGICAL_ERROR); - if (table_name.empty() && !database_name.empty()) - throw Exception("Table name is empty, but database name is not", ErrorCodes::LOGICAL_ERROR); - } + void assertNotEmpty() const; /// Avoid implicit construction of empty StorageID. However, it's needed for deferred initialization. static StorageID createEmpty() { return {}; } diff --git a/dbms/src/Storages/StorageMaterializedView.cpp b/dbms/src/Storages/StorageMaterializedView.cpp index 11e9acb248a..349ff9c34da 100644 --- a/dbms/src/Storages/StorageMaterializedView.cpp +++ b/dbms/src/Storages/StorageMaterializedView.cpp @@ -146,7 +146,7 @@ StorageMaterializedView::StorageMaterializedView( create_interpreter.setInternal(true); create_interpreter.execute(); - target_table_id = global_context.getTable(manual_create_query->database, manual_create_query->table)->getStorageID(); + target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->database, manual_create_query->table})->getStorageID(); } if (!select_table_id.empty()) diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index bce6214e1ae..56f0da2eb81 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -1044,8 +1044,8 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma break; case PartitionCommand::MoveDestinationType::TABLE: checkPartitionCanBeDropped(command.partition); - String dest_database = command.to_database.empty() ? context.getCurrentDatabase() : command.to_database; - auto dest_storage = context.getTable(dest_database, command.to_table); + String dest_database = context.resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}); movePartitionToTable(dest_storage, command.partition, context); break; } @@ -1056,8 +1056,8 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma case PartitionCommand::REPLACE_PARTITION: { checkPartitionCanBeDropped(command.partition); - String from_database = command.from_database.empty() ? context.getCurrentDatabase() : command.from_database; - auto from_storage = context.getTable(from_database, command.from_table); + String from_database = context.resolveDatabase(command.from_database); + auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}); replacePartitionFrom(from_storage, command.partition, command.replace, context); } break; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index a2931a35f31..6c11b1a5662 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1670,14 +1670,14 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) StoragePtr source_table; TableStructureReadLockHolder table_lock_holder_src_table; - String source_table_name = entry_replace.from_database + "." + entry_replace.from_table; + StorageID source_table_id{entry_replace.from_database, entry_replace.from_table}; auto clone_data_parts_from_source_table = [&] () -> size_t { - source_table = global_context.tryGetTable(entry_replace.from_database, entry_replace.from_table); + source_table = DatabaseCatalog::instance().tryGetTable(source_table_id); if (!source_table) { - LOG_DEBUG(log, "Can't use " << source_table_name << " as source table for REPLACE PARTITION command. It does not exist."); + LOG_DEBUG(log, "Can't use " << source_table_id.getNameForLogs() << " as source table for REPLACE PARTITION command. It does not exist."); return 0; } @@ -1688,7 +1688,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) } catch (Exception &) { - LOG_INFO(log, "Can't use " << source_table_name << " as source table for REPLACE PARTITION command. Will fetch all parts." + LOG_INFO(log, "Can't use " << source_table_id.getNameForLogs() << " as source table for REPLACE PARTITION command. Will fetch all parts." << " Reason: " << getCurrentExceptionMessage(false)); return 0; } @@ -1704,7 +1704,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) auto src_part = src_data->getPartIfExists(part_desc->src_part_info, valid_states); if (!src_part) { - LOG_DEBUG(log, "There is no part " << part_desc->src_part_name << " in " << source_table_name); + LOG_DEBUG(log, "There is no part " << part_desc->src_part_name << " in " << source_table_id.getNameForLogs()); continue; } @@ -1716,7 +1716,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) if (checksum_hex != part_desc->checksum_hex) { - LOG_DEBUG(log, "Part " << part_desc->src_part_name << " of " << source_table_name << " has inappropriate checksum"); + LOG_DEBUG(log, "Part " << part_desc->src_part_name << " of " << source_table_id.getNameForLogs() << " has inappropriate checksum"); /// TODO: check version continue; } @@ -3556,8 +3556,8 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part break; case PartitionCommand::MoveDestinationType::TABLE: checkPartitionCanBeDropped(command.partition); - String dest_database = command.to_database.empty() ? query_context.getCurrentDatabase() : command.to_database; - auto dest_storage = query_context.getTable(dest_database, command.to_table); + String dest_database = query_context.resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}); movePartitionToTable(dest_storage, command.partition, query_context); break; } @@ -3567,8 +3567,8 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part case PartitionCommand::REPLACE_PARTITION: { checkPartitionCanBeDropped(command.partition); - String from_database = command.from_database.empty() ? query_context.getCurrentDatabase() : command.from_database; - auto from_storage = query_context.getTable(from_database, command.from_table); + String from_database = query_context.resolveDatabase(command.from_database); + auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}); replacePartitionFrom(from_storage, command.partition, command.replace, query_context); } break; diff --git a/dbms/src/Storages/StorageS3.cpp b/dbms/src/Storages/StorageS3.cpp index 4cdea957b3d..42984522d23 100644 --- a/dbms/src/Storages/StorageS3.cpp +++ b/dbms/src/Storages/StorageS3.cpp @@ -28,6 +28,7 @@ #include #include +#include #include