This commit is contained in:
Alexander Tokmakov 2021-07-06 13:26:03 +03:00
parent 5e44fa32b4
commit c0bbe67cc9
3 changed files with 13 additions and 3 deletions

View File

@ -362,7 +362,7 @@ ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_conte
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setCurrentDatabase(database->getDatabaseName());
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(zookeeper, database->zookeeper_path, is_initial_query);
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(zookeeper, database->zookeeper_path, is_initial_query, entry_path);
query_context->initZooKeeperMetadataTransaction(txn);
if (is_initial_query)

View File

@ -169,13 +169,15 @@ class ZooKeeperMetadataTransaction
ZooKeeperPtr current_zookeeper;
String zookeeper_path;
bool is_initial_query;
String task_path;
Coordination::Requests ops;
public:
ZooKeeperMetadataTransaction(const ZooKeeperPtr & current_zookeeper_, const String & zookeeper_path_, bool is_initial_query_)
ZooKeeperMetadataTransaction(const ZooKeeperPtr & current_zookeeper_, const String & zookeeper_path_, bool is_initial_query_, const String & task_path_)
: current_zookeeper(current_zookeeper_)
, zookeeper_path(zookeeper_path_)
, is_initial_query(is_initial_query_)
, task_path(task_path_)
{
}
@ -185,6 +187,8 @@ public:
String getDatabaseZooKeeperPath() const { return zookeeper_path; }
String getTaskZooKeeperPath() const { return task_path; }
ZooKeeperPtr getZooKeeper() const { return current_zookeeper; }
void addOp(Coordination::RequestPtr && op)

View File

@ -1098,7 +1098,8 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
/// Execute drop as separate query, because [CREATE OR] REPLACE query can be considered as
/// successfully executed after RENAME/EXCHANGE query.
drop_context->resetZooKeeperMetadataTransaction();
auto drop_txn = std::make_shared<ZooKeeperMetadataTransaction>(txn->getZooKeeper(), txn->getDatabaseZooKeeperPath(), txn->isInitialQuery());
auto drop_txn = std::make_shared<ZooKeeperMetadataTransaction>(txn->getZooKeeper(), txn->getDatabaseZooKeeperPath(),
txn->isInitialQuery(), txn->getTaskZooKeeperPath());
drop_context->initZooKeeperMetadataTransaction(drop_txn);
}
return drop_context;
@ -1117,6 +1118,11 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
UInt64 name_hash = sipHash64(create.database + create.table);
UInt16 random_suffix = thread_local_rng();
if (auto txn = current_context->getZooKeeperMetadataTransaction())
{
/// Avoid different table name on database replicas
random_suffix = sipHash64(txn->getTaskZooKeeperPath());
}
create.table = fmt::format("_tmp_replace_{}_{}",
getHexUIntLowercase(name_hash),
getHexUIntLowercase(random_suffix));