diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index de858bdbdc5..ccd92fb650c 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -31,6 +32,7 @@ #include #include +#include #include #include #include @@ -796,36 +798,6 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data create.uuid = UUIDHelpers::Nil; create.to_inner_uuid = UUIDHelpers::Nil; } - - if (create.replace_table) - { - if (database->getUUID() == UUIDHelpers::Nil) - throw Exception(ErrorCodes::INCORRECT_QUERY, - "{} query is supported only for Atomic databases", - create.create_or_replace ? "CREATE OR REPLACE TABLE" : "REPLACE TABLE"); - - UUID uuid_of_table_to_replace; - if (create.create_or_replace) - { - uuid_of_table_to_replace = getContext()->tryResolveStorageID(StorageID(create.database, create.table)).uuid; - if (uuid_of_table_to_replace == UUIDHelpers::Nil) - { - /// Convert to usual CREATE - create.replace_table = false; - assert(!database->isTableExist(create.table, getContext())); - } - else - create.table = "_tmp_replace_" + toString(uuid_of_table_to_replace); - } - else - { - uuid_of_table_to_replace = getContext()->resolveStorageID(StorageID(create.database, create.table)).uuid; - if (uuid_of_table_to_replace == UUIDHelpers::Nil) - throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", - backQuoteIfNeed(create.database), backQuoteIfNeed(create.table)); - create.table = "_tmp_replace_" + toString(uuid_of_table_to_replace); - } - } } @@ -1105,11 +1077,27 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create, { auto ast_drop = std::make_shared(); String table_to_replace_name = create.table; - bool created = false; - bool replaced = false; + { + auto database = DatabaseCatalog::instance().getDatabase(create.database); + if (database->getUUID() == UUIDHelpers::Nil) + throw Exception(ErrorCodes::INCORRECT_QUERY, + "{} query is supported only for Atomic databases", + create.create_or_replace ? "CREATE OR REPLACE TABLE" : "REPLACE TABLE"); + + + UInt64 name_hash = sipHash64(create.database + create.table); + UInt16 random_suffix = thread_local_rng(); + create.table = fmt::format("_tmp_replace_{}_{}", + getHexUIntLowercase(name_hash), + getHexUIntLowercase(random_suffix)); + } + + bool created = false; + bool renamed = false; try { + /// Create temporary table (random name will be generated) [[maybe_unused]] bool done = doCreateTable(create, properties); assert(done); ast_drop->table = create.table; @@ -1117,9 +1105,12 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create, ast_drop->database = create.database; ast_drop->kind = ASTDropQuery::Drop; created = true; - if (!create.replace_table) - return fillTableIfNeeded(create); + /// Try fill temporary table + BlockIO fill_io = fillTableIfNeeded(create); + executeTrivialBlockIO(fill_io, getContext()); + + /// Replace target table with created one auto ast_rename = std::make_shared(); ASTRenameQuery::Element elem { @@ -1130,19 +1121,27 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create, ast_rename->elements.push_back(std::move(elem)); ast_rename->exchange = true; ast_rename->dictionary = create.is_dictionary; + /// Will execute ordinary RENAME instead of EXCHANGE if the target table does not exist + ast_rename->rename_if_cannot_exchange = create.create_or_replace; - InterpreterRenameQuery(ast_rename, getContext()).execute(); - replaced = true; + InterpreterRenameQuery interpreter_rename{ast_rename, getContext()}; + interpreter_rename.execute(); + renamed = true; - InterpreterDropQuery(ast_drop, getContext()).execute(); + if (!interpreter_rename.renamedInsteadOfExchange()) + { + /// Target table was replaced with new one, drop old table + InterpreterDropQuery(ast_drop, getContext()).execute(); + } create.table = table_to_replace_name; - return fillTableIfNeeded(create); + return {}; } catch (...) { - if (created && create.replace_table && !replaced) + /// Drop temporary table if it was successfully created, but was not renamed to target name + if (created && !renamed) InterpreterDropQuery(ast_drop, getContext()).execute(); throw; } diff --git a/src/Interpreters/InterpreterRenameQuery.cpp b/src/Interpreters/InterpreterRenameQuery.cpp index 515559ad903..989cdad93d5 100644 --- a/src/Interpreters/InterpreterRenameQuery.cpp +++ b/src/Interpreters/InterpreterRenameQuery.cpp @@ -76,8 +76,20 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c for (const auto & elem : descriptions) { - if (!rename.exchange) + bool exchange_tables = rename.exchange; + if (exchange_tables) + { + bool allow_rename_instead_of_exchange = descriptions.size() == 1 && rename.rename_if_cannot_exchange; + if (allow_rename_instead_of_exchange && !database_catalog.isTableExist(StorageID(elem.to_database_name, elem.to_table_name), getContext())) + { + exchange_tables = false; + renamed_instead_of_exchange = true; + } + } + else + { database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), getContext()); + } DatabasePtr database = database_catalog.getDatabase(elem.from_database_name); if (typeid_cast(database.get()) @@ -100,7 +112,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c elem.from_table_name, *database_catalog.getDatabase(elem.to_database_name), elem.to_table_name, - rename.exchange, + exchange_tables, rename.dictionary); } } diff --git a/src/Interpreters/InterpreterRenameQuery.h b/src/Interpreters/InterpreterRenameQuery.h index 49fdd50f52d..dfcd741754e 100644 --- a/src/Interpreters/InterpreterRenameQuery.h +++ b/src/Interpreters/InterpreterRenameQuery.h @@ -55,6 +55,8 @@ public: BlockIO execute() override; void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr) const override; + bool renamedInsteadOfExchange() const { return renamed_instead_of_exchange; } + private: BlockIO executeToTables(const ASTRenameQuery & rename, const RenameDescriptions & descriptions, TableGuards & ddl_guards); static BlockIO executeToDatabase(const ASTRenameQuery & rename, const RenameDescriptions & descriptions); @@ -62,6 +64,7 @@ private: AccessRightsElements getRequiredAccess() const; ASTPtr query_ptr; + bool renamed_instead_of_exchange{false}; }; } diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index c69a5bcd3e1..8f9a8b8a3e9 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -1060,26 +1060,30 @@ void executeQuery( } else if (pipeline.initialized()) { - const ASTQueryWithOutput * ast_query_with_output = dynamic_cast(ast.get()); - - WriteBuffer * out_buf = &ostr; - std::optional out_file_buf; - if (ast_query_with_output && ast_query_with_output->out_file) + if (pipeline.isCompleted()) { - if (!allow_into_outfile) - throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); - - const auto & out_file = typeid_cast(*ast_query_with_output->out_file).value.safeGet(); - out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); - out_buf = &*out_file_buf; + pipeline.setProgressCallback(context->getProgressCallback()); } - - String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) - ? getIdentifierName(ast_query_with_output->format) - : context->getDefaultFormat(); - - if (!pipeline.isCompleted()) + else { + const ASTQueryWithOutput * ast_query_with_output = dynamic_cast(ast.get()); + + WriteBuffer * out_buf = &ostr; + std::optional out_file_buf; + if (ast_query_with_output && ast_query_with_output->out_file) + { + if (!allow_into_outfile) + throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); + + const auto & out_file = typeid_cast(*ast_query_with_output->out_file).value.safeGet(); + out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); + out_buf = &*out_file_buf; + } + + String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) + ? getIdentifierName(ast_query_with_output->format) + : context->getDefaultFormat(); + pipeline.addSimpleTransform([](const Block & header) { return std::make_shared(header); @@ -1105,10 +1109,6 @@ void executeQuery( pipeline.setOutputFormat(std::move(out)); } - else - { - pipeline.setProgressCallback(context->getProgressCallback()); - } { auto executor = pipeline.execute(); @@ -1125,4 +1125,32 @@ void executeQuery( streams.onFinish(); } +void executeTrivialBlockIO(BlockIO & streams, ContextPtr context) +{ + try + { + if (streams.out) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Query stream requires input, but no input buffer provided, it's a bug"); + if (streams.in) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Query stream requires output, but no output buffer provided, it's a bug"); + + if (!streams.pipeline.initialized()) + return; + + if (!streams.pipeline.isCompleted()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Query pipeline requires output, but no output buffer provided, it's a bug"); + + streams.pipeline.setProgressCallback(context->getProgressCallback()); + auto executor = streams.pipeline.execute(); + executor->execute(streams.pipeline.getNumThreads()); + } + catch (...) + { + streams.onException(); + throw; + } + + streams.onFinish(); +} + } diff --git a/src/Interpreters/executeQuery.h b/src/Interpreters/executeQuery.h index 6448b26a652..5eebb09c36b 100644 --- a/src/Interpreters/executeQuery.h +++ b/src/Interpreters/executeQuery.h @@ -53,4 +53,8 @@ BlockIO executeQuery( bool allow_processors /// If can use processors pipeline ); +/// Executes BlockIO returned from executeQuery(...) +/// if built pipeline does not require any input and does not produce any output. +void executeTrivialBlockIO(BlockIO & streams, ContextPtr context); + } diff --git a/src/Parsers/ASTRenameQuery.h b/src/Parsers/ASTRenameQuery.h index 611f81dc9e9..e0c58e3462e 100644 --- a/src/Parsers/ASTRenameQuery.h +++ b/src/Parsers/ASTRenameQuery.h @@ -34,6 +34,9 @@ public: bool database{false}; /// For RENAME DATABASE bool dictionary{false}; /// For RENAME DICTIONARY + /// Special flag for CREATE OR REPLACE. Do not throw if the second table does not exist. + bool rename_if_cannot_exchange{false}; + /** Get the text that identifies this element. */ String getID(char) const override { return "Rename"; } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index f4d6ec5c6f7..f30f2fb8e09 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1137,6 +1137,11 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type) return {}; } +void StorageDistributed::flush() +{ + flushClusterNodesAllData(getContext()); +} + void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context) { /// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index c63abbc6aa4..1a74b48a615 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -98,6 +98,7 @@ public: void startup() override; void shutdown() override; + void flush() override; void drop() override; bool storesDataOnDisk() const override { return true; } diff --git a/tests/queries/0_stateless/01157_replace_table.reference b/tests/queries/0_stateless/01157_replace_table.reference new file mode 100644 index 00000000000..9fddaf99847 --- /dev/null +++ b/tests/queries/0_stateless/01157_replace_table.reference @@ -0,0 +1,20 @@ +test flush on replace +1 s1 +2 s2 +3 s3 +exception on create and fill +0 +1 1 s1 +2 2 s2 +3 3 s3 +1 1 s1 +2 2 s2 +3 3 s3 +1 1 s1 +2 2 s2 +3 3 s3 +4 4 s4 +buf +dist +join +t diff --git a/tests/queries/0_stateless/01157_replace_table.sql b/tests/queries/0_stateless/01157_replace_table.sql new file mode 100644 index 00000000000..337290bb4d2 --- /dev/null +++ b/tests/queries/0_stateless/01157_replace_table.sql @@ -0,0 +1,48 @@ +drop table if exists t; +drop table if exists dist; +drop table if exists buf; +drop table if exists join; + +select 'test flush on replace'; +create table t (n UInt64, s String default 's' || toString(n)) engine=Memory; +create table dist (n int) engine=Distributed(test_shard_localhost, currentDatabase(), t); +create table buf (n int) engine=Buffer(currentDatabase(), dist, 1, 10, 100, 10, 100, 1000, 1000); + +insert into buf values (1); +replace table buf (n int) engine=Distributed(test_shard_localhost, currentDatabase(), dist); +replace table dist (n int) engine=Buffer(currentDatabase(), t, 1, 10, 100, 10, 100, 1000, 1000); + +insert into buf values (2); +replace table buf (n int) engine=Buffer(currentDatabase(), dist, 1, 10, 100, 10, 100, 1000, 1000); +replace table dist (n int) engine=Distributed(test_shard_localhost, currentDatabase(), t); + +insert into buf values (3); +replace table buf (n int) engine=Null; +replace table dist (n int) engine=Null; + +select * from t order by n; + +select 'exception on create and fill'; +-- table is not created if select fails +create or replace table join engine=Join(ANY, INNER, n) as select * from t where throwIf(n); -- { serverError 395 } +select count() from system.tables where database=currentDatabase() and name='join'; + +-- table is created and filled +create or replace table join engine=Join(ANY, INNER, n) as select * from t; +select * from numbers(10) as t any join join on t.number=join.n order by n; + +-- table is not replaced if select fails +insert into t(n) values (4); +replace table join engine=Join(ANY, INNER, n) as select * from t where throwIf(n); -- { serverError 395 } +select * from numbers(10) as t any join join on t.number=join.n order by n; + +-- table is replaced +replace table join engine=Join(ANY, INNER, n) as select * from t; +select * from numbers(10) as t any join join on t.number=join.n order by n; + +select name from system.tables where database=currentDatabase() order by name; + +drop table t; +drop table dist; +drop table buf; +drop table join; diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index 7c1f998e91d..c9d517b0285 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -109,6 +109,7 @@ "00510_materizlized_view_and_deduplication_zookeeper", "00738_lock_for_inner_table", "01153_attach_mv_uuid", + "01157_replace_table", /// Sometimes cannot lock file most likely due to concurrent or adjacent tests, but we don't care how it works in Ordinary database. "rocksdb", "01914_exchange_dictionaries" /// Requires Atomic database