improve CREATE OR REPLACE query

This commit is contained in:
Alexander Tokmakov 2021-07-01 16:21:38 +03:00
parent 956b1f588d
commit d9a77e3a1a
11 changed files with 186 additions and 62 deletions

View File

@ -8,6 +8,7 @@
#include <Common/Macros.h>
#include <Common/randomSeed.h>
#include <Common/renameat2.h>
#include <Common/hex.h>
#include <Core/Defines.h>
#include <Core/Settings.h>
@ -31,6 +32,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterCreateQuery.h>
@ -796,36 +798,6 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
}
if (create.replace_table)
{
if (database->getUUID() == UUIDHelpers::Nil)
throw Exception(ErrorCodes::INCORRECT_QUERY,
"{} query is supported only for Atomic databases",
create.create_or_replace ? "CREATE OR REPLACE TABLE" : "REPLACE TABLE");
UUID uuid_of_table_to_replace;
if (create.create_or_replace)
{
uuid_of_table_to_replace = getContext()->tryResolveStorageID(StorageID(create.database, create.table)).uuid;
if (uuid_of_table_to_replace == UUIDHelpers::Nil)
{
/// Convert to usual CREATE
create.replace_table = false;
assert(!database->isTableExist(create.table, getContext()));
}
else
create.table = "_tmp_replace_" + toString(uuid_of_table_to_replace);
}
else
{
uuid_of_table_to_replace = getContext()->resolveStorageID(StorageID(create.database, create.table)).uuid;
if (uuid_of_table_to_replace == UUIDHelpers::Nil)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist",
backQuoteIfNeed(create.database), backQuoteIfNeed(create.table));
create.table = "_tmp_replace_" + toString(uuid_of_table_to_replace);
}
}
}
@ -1105,11 +1077,27 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
{
auto ast_drop = std::make_shared<ASTDropQuery>();
String table_to_replace_name = create.table;
bool created = false;
bool replaced = false;
{
auto database = DatabaseCatalog::instance().getDatabase(create.database);
if (database->getUUID() == UUIDHelpers::Nil)
throw Exception(ErrorCodes::INCORRECT_QUERY,
"{} query is supported only for Atomic databases",
create.create_or_replace ? "CREATE OR REPLACE TABLE" : "REPLACE TABLE");
UInt64 name_hash = sipHash64(create.database + create.table);
UInt16 random_suffix = thread_local_rng();
create.table = fmt::format("_tmp_replace_{}_{}",
getHexUIntLowercase(name_hash),
getHexUIntLowercase(random_suffix));
}
bool created = false;
bool renamed = false;
try
{
/// Create temporary table (random name will be generated)
[[maybe_unused]] bool done = doCreateTable(create, properties);
assert(done);
ast_drop->table = create.table;
@ -1117,9 +1105,12 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
ast_drop->database = create.database;
ast_drop->kind = ASTDropQuery::Drop;
created = true;
if (!create.replace_table)
return fillTableIfNeeded(create);
/// Try fill temporary table
BlockIO fill_io = fillTableIfNeeded(create);
executeTrivialBlockIO(fill_io, getContext());
/// Replace target table with created one
auto ast_rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Element elem
{
@ -1130,19 +1121,27 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
ast_rename->elements.push_back(std::move(elem));
ast_rename->exchange = true;
ast_rename->dictionary = create.is_dictionary;
/// Will execute ordinary RENAME instead of EXCHANGE if the target table does not exist
ast_rename->rename_if_cannot_exchange = create.create_or_replace;
InterpreterRenameQuery(ast_rename, getContext()).execute();
replaced = true;
InterpreterRenameQuery interpreter_rename{ast_rename, getContext()};
interpreter_rename.execute();
renamed = true;
InterpreterDropQuery(ast_drop, getContext()).execute();
if (!interpreter_rename.renamedInsteadOfExchange())
{
/// Target table was replaced with new one, drop old table
InterpreterDropQuery(ast_drop, getContext()).execute();
}
create.table = table_to_replace_name;
return fillTableIfNeeded(create);
return {};
}
catch (...)
{
if (created && create.replace_table && !replaced)
/// Drop temporary table if it was successfully created, but was not renamed to target name
if (created && !renamed)
InterpreterDropQuery(ast_drop, getContext()).execute();
throw;
}

View File

@ -76,8 +76,20 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
for (const auto & elem : descriptions)
{
if (!rename.exchange)
bool exchange_tables = rename.exchange;
if (exchange_tables)
{
bool allow_rename_instead_of_exchange = descriptions.size() == 1 && rename.rename_if_cannot_exchange;
if (allow_rename_instead_of_exchange && !database_catalog.isTableExist(StorageID(elem.to_database_name, elem.to_table_name), getContext()))
{
exchange_tables = false;
renamed_instead_of_exchange = true;
}
}
else
{
database_catalog.assertTableDoesntExist(StorageID(elem.to_database_name, elem.to_table_name), getContext());
}
DatabasePtr database = database_catalog.getDatabase(elem.from_database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
@ -100,7 +112,7 @@ BlockIO InterpreterRenameQuery::executeToTables(const ASTRenameQuery & rename, c
elem.from_table_name,
*database_catalog.getDatabase(elem.to_database_name),
elem.to_table_name,
rename.exchange,
exchange_tables,
rename.dictionary);
}
}

View File

@ -55,6 +55,8 @@ public:
BlockIO execute() override;
void extendQueryLogElemImpl(QueryLogElement & elem, const ASTPtr & ast, ContextPtr) const override;
bool renamedInsteadOfExchange() const { return renamed_instead_of_exchange; }
private:
BlockIO executeToTables(const ASTRenameQuery & rename, const RenameDescriptions & descriptions, TableGuards & ddl_guards);
static BlockIO executeToDatabase(const ASTRenameQuery & rename, const RenameDescriptions & descriptions);
@ -62,6 +64,7 @@ private:
AccessRightsElements getRequiredAccess() const;
ASTPtr query_ptr;
bool renamed_instead_of_exchange{false};
};
}

View File

@ -1060,26 +1060,30 @@ void executeQuery(
}
else if (pipeline.initialized())
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
if (ast_query_with_output && ast_query_with_output->out_file)
if (pipeline.isCompleted())
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
pipeline.setProgressCallback(context->getProgressCallback());
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
if (!pipeline.isCompleted())
else
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
WriteBuffer * out_buf = &ostr;
std::optional<WriteBufferFromFile> out_file_buf;
if (ast_query_with_output && ast_query_with_output->out_file)
{
if (!allow_into_outfile)
throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
out_buf = &*out_file_buf;
}
String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
? getIdentifierName(ast_query_with_output->format)
: context->getDefaultFormat();
pipeline.addSimpleTransform([](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
@ -1105,10 +1109,6 @@ void executeQuery(
pipeline.setOutputFormat(std::move(out));
}
else
{
pipeline.setProgressCallback(context->getProgressCallback());
}
{
auto executor = pipeline.execute();
@ -1125,4 +1125,32 @@ void executeQuery(
streams.onFinish();
}
void executeTrivialBlockIO(BlockIO & streams, ContextPtr context)
{
try
{
if (streams.out)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query stream requires input, but no input buffer provided, it's a bug");
if (streams.in)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query stream requires output, but no output buffer provided, it's a bug");
if (!streams.pipeline.initialized())
return;
if (!streams.pipeline.isCompleted())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query pipeline requires output, but no output buffer provided, it's a bug");
streams.pipeline.setProgressCallback(context->getProgressCallback());
auto executor = streams.pipeline.execute();
executor->execute(streams.pipeline.getNumThreads());
}
catch (...)
{
streams.onException();
throw;
}
streams.onFinish();
}
}

View File

@ -53,4 +53,8 @@ BlockIO executeQuery(
bool allow_processors /// If can use processors pipeline
);
/// Executes BlockIO returned from executeQuery(...)
/// if built pipeline does not require any input and does not produce any output.
void executeTrivialBlockIO(BlockIO & streams, ContextPtr context);
}

View File

@ -34,6 +34,9 @@ public:
bool database{false}; /// For RENAME DATABASE
bool dictionary{false}; /// For RENAME DICTIONARY
/// Special flag for CREATE OR REPLACE. Do not throw if the second table does not exist.
bool rename_if_cannot_exchange{false};
/** Get the text that identifies this element. */
String getID(char) const override { return "Rename"; }

View File

@ -1137,6 +1137,11 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
return {};
}
void StorageDistributed::flush()
{
flushClusterNodesAllData(getContext());
}
void StorageDistributed::flushClusterNodesAllData(ContextPtr local_context)
{
/// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE

View File

@ -98,6 +98,7 @@ public:
void startup() override;
void shutdown() override;
void flush() override;
void drop() override;
bool storesDataOnDisk() const override { return true; }

View File

@ -0,0 +1,20 @@
test flush on replace
1 s1
2 s2
3 s3
exception on create and fill
0
1 1 s1
2 2 s2
3 3 s3
1 1 s1
2 2 s2
3 3 s3
1 1 s1
2 2 s2
3 3 s3
4 4 s4
buf
dist
join
t

View File

@ -0,0 +1,48 @@
drop table if exists t;
drop table if exists dist;
drop table if exists buf;
drop table if exists join;
select 'test flush on replace';
create table t (n UInt64, s String default 's' || toString(n)) engine=Memory;
create table dist (n int) engine=Distributed(test_shard_localhost, currentDatabase(), t);
create table buf (n int) engine=Buffer(currentDatabase(), dist, 1, 10, 100, 10, 100, 1000, 1000);
insert into buf values (1);
replace table buf (n int) engine=Distributed(test_shard_localhost, currentDatabase(), dist);
replace table dist (n int) engine=Buffer(currentDatabase(), t, 1, 10, 100, 10, 100, 1000, 1000);
insert into buf values (2);
replace table buf (n int) engine=Buffer(currentDatabase(), dist, 1, 10, 100, 10, 100, 1000, 1000);
replace table dist (n int) engine=Distributed(test_shard_localhost, currentDatabase(), t);
insert into buf values (3);
replace table buf (n int) engine=Null;
replace table dist (n int) engine=Null;
select * from t order by n;
select 'exception on create and fill';
-- table is not created if select fails
create or replace table join engine=Join(ANY, INNER, n) as select * from t where throwIf(n); -- { serverError 395 }
select count() from system.tables where database=currentDatabase() and name='join';
-- table is created and filled
create or replace table join engine=Join(ANY, INNER, n) as select * from t;
select * from numbers(10) as t any join join on t.number=join.n order by n;
-- table is not replaced if select fails
insert into t(n) values (4);
replace table join engine=Join(ANY, INNER, n) as select * from t where throwIf(n); -- { serverError 395 }
select * from numbers(10) as t any join join on t.number=join.n order by n;
-- table is replaced
replace table join engine=Join(ANY, INNER, n) as select * from t;
select * from numbers(10) as t any join join on t.number=join.n order by n;
select name from system.tables where database=currentDatabase() order by name;
drop table t;
drop table dist;
drop table buf;
drop table join;

View File

@ -109,6 +109,7 @@
"00510_materizlized_view_and_deduplication_zookeeper",
"00738_lock_for_inner_table",
"01153_attach_mv_uuid",
"01157_replace_table",
/// Sometimes cannot lock file most likely due to concurrent or adjacent tests, but we don't care how it works in Ordinary database.
"rocksdb",
"01914_exchange_dictionaries" /// Requires Atomic database