diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index d36fe45f748..2630c896a7a 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -273,19 +273,11 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP __builtin_unreachable(); } -void DatabaseReplicated::createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper) +void DatabaseReplicated::createEmptyLogEntry(const ZooKeeperPtr & current_zookeeper) { /// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info). DDLLogEntry entry{}; - - String query_path_prefix = zookeeper_path + "/log/query-"; - String counter_prefix = zookeeper_path + "/counter/cnt-"; - String counter_path = current_zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential); - String query_path = query_path_prefix + counter_path.substr(counter_prefix.size()); - - ops.emplace_back(zkutil::makeCreateRequest(query_path, entry.toString(), zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(query_path + "/committed", getFullReplicaName(), zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1)); + DatabaseReplicatedDDLWorker::enqueueQueryImpl(current_zookeeper, entry, this, true); } void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper) @@ -296,8 +288,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt Coordination::Requests ops; ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent)); - createEmptyLogEntry(ops, current_zookeeper); current_zookeeper->multi(ops); + createEmptyLogEntry(current_zookeeper); } void DatabaseReplicated::loadStoredObjects(ContextPtr local_context, bool has_force_restore_data_flag, bool force_attach) @@ -659,10 +651,8 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node void DatabaseReplicated::drop(ContextPtr context_) { auto current_zookeeper = getZooKeeper(); - Coordination::Requests ops; - ops.emplace_back(zkutil::makeSetRequest(replica_path, DROPPED_MARK, -1)); - createEmptyLogEntry(ops, current_zookeeper); - current_zookeeper->multi(ops); + current_zookeeper->set(replica_path, DROPPED_MARK, -1); + createEmptyLogEntry(current_zookeeper); DatabaseAtomic::drop(context_); diff --git a/src/Databases/DatabaseReplicated.h b/src/Databases/DatabaseReplicated.h index b930d27c19b..e6c3fb00eb6 100644 --- a/src/Databases/DatabaseReplicated.h +++ b/src/Databases/DatabaseReplicated.h @@ -78,7 +78,7 @@ private: ClusterPtr getClusterImpl() const; void setCluster(ClusterPtr && new_cluster); - void createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper); + void createEmptyLogEntry(const ZooKeeperPtr & current_zookeeper); String zookeeper_path; String shard_name; diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index 9ae4d026bf0..548f7f6f882 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace DB { @@ -69,25 +70,63 @@ void DatabaseReplicatedDDLWorker::initializeReplication() String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry) { auto zookeeper = getAndSetZooKeeper(); - const String query_path_prefix = queue_dir + "/query-"; + return enqueueQueryImpl(zookeeper, entry, database); +} + +String DatabaseReplicatedDDLWorker::enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry, + DatabaseReplicated * const database, bool committed) +{ + const String query_path_prefix = database->zookeeper_path + "/log/query-"; /// We cannot create sequential node and it's ephemeral child in a single transaction, so allocate sequential number another way String counter_prefix = database->zookeeper_path + "/counter/cnt-"; - String counter_path = zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential); + String counter_lock_path = database->zookeeper_path + "/counter_lock"; + + String counter_path; + size_t iters = 1000; + while (--iters) + { + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCreateRequest(counter_lock_path, database->getFullReplicaName(), zkutil::CreateMode::Ephemeral)); + ops.emplace_back(zkutil::makeCreateRequest(counter_prefix, "", zkutil::CreateMode::EphemeralSequential)); + Coordination::Responses res; + + Coordination::Error code = zookeeper->tryMulti(ops, res); + if (code == Coordination::Error::ZOK) + { + counter_path = dynamic_cast(*res.back()).path_created; + break; + } + else if (code != Coordination::Error::ZNODEEXISTS) + zkutil::KeeperMultiException::check(code, ops, res); + } + + if (iters == 0) + throw Exception(ErrorCodes::UNFINISHED, + "Cannot enqueue query, because some replica are trying to enqueue another query. " + "It may happen on high queries rate or, in rare cases, after connection loss. Client should retry."); + String node_path = query_path_prefix + counter_path.substr(counter_prefix.size()); + /// Now create task in queue Coordination::Requests ops; /// Query is not committed yet, but we have to write it into log to avoid reordering ops.emplace_back(zkutil::makeCreateRequest(node_path, entry.toString(), zkutil::CreateMode::Persistent)); /// '/try' will be replaced with '/committed' or will be removed due to expired session or other error - ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral)); + if (committed) + ops.emplace_back(zkutil::makeCreateRequest(node_path + "/committed", database->getFullReplicaName(), zkutil::CreateMode::Persistent)); + else + ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral)); /// We don't need it anymore ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1)); + /// Unlock counters + ops.emplace_back(zkutil::makeRemoveRequest(counter_lock_path, -1)); /// Create status dirs ops.emplace_back(zkutil::makeCreateRequest(node_path + "/active", "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest(node_path + "/finished", "", zkutil::CreateMode::Persistent)); zookeeper->multi(ops); + return node_path; } diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index 16ad100b81a..4020906f9b2 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -29,6 +29,9 @@ public: void shutdown() override; + static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry, + DatabaseReplicated * const database, bool committed = false); + private: bool initializeMainThread() override; void initializeReplication();