make some replicated DDL faster

This commit is contained in:
Alexander Tokmakov 2022-04-05 00:51:48 +02:00
parent e6c9a36ac7
commit 4e9ec5dc2f
7 changed files with 200 additions and 54 deletions

View File

@ -701,24 +701,34 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path, const String &
}
}
void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node)
bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, const String & keep_child_node)
{
Strings children;
if (tryGetChildren(path, children) != Coordination::Error::ZOK)
return;
return false;
bool removed_as_expected = true;
while (!children.empty())
{
Coordination::Requests ops;
Strings batch;
ops.reserve(MULTI_BATCH_SIZE);
batch.reserve(MULTI_BATCH_SIZE);
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
String child_path = fs::path(path) / children.back();
tryRemoveChildrenRecursive(child_path);
/// Will try to avoid recursive getChildren calls if child_path probably has no children.
/// It may be extremely slow when path contain a lot of leaf children.
if (!probably_flat)
tryRemoveChildrenRecursive(child_path);
if (likely(keep_child_node.empty() || keep_child_node != children.back()))
{
batch.push_back(child_path);
ops.emplace_back(zkutil::makeRemoveRequest(child_path, -1));
}
children.pop_back();
}
@ -726,10 +736,39 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, const Strin
/// this means someone is concurrently removing these children and we will have
/// to remove them one by one.
Coordination::Responses responses;
if (tryMulti(ops, responses) != Coordination::Error::ZOK)
for (const std::string & child : batch)
tryRemove(child);
if (tryMulti(ops, responses) == Coordination::Error::ZOK)
continue;
removed_as_expected = false;
std::vector<zkutil::ZooKeeper::FutureRemove> futures;
futures.reserve(batch.size());
for (const std::string & child : batch)
futures.push_back(asyncTryRemoveNoThrow(child, -1));
for (size_t i = 0; i < batch.size(); ++i)
{
auto res = futures[i].get();
if (res.error == Coordination::Error::ZOK)
continue;
if (res.error == Coordination::Error::ZNONODE)
continue;
if (res.error == Coordination::Error::ZNOTEMPTY)
{
if (probably_flat)
{
/// It actually has children, let's remove them
tryRemoveChildrenRecursive(batch[i]);
tryRemove(batch[i]);
}
continue;
}
throw KeeperException(res.error, batch[i]);
}
}
return removed_as_expected;
}
void ZooKeeper::removeRecursive(const std::string & path)

View File

@ -225,7 +225,10 @@ public:
/// If keep_child_node is not empty, this method will not remove path/keep_child_node (but will remove its subtree).
/// It can be useful to keep some child node as a flag which indicates that path is currently removing.
void removeChildrenRecursive(const std::string & path, const String & keep_child_node = {});
void tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node = {});
/// If probably_flat is true, this method will optimistically try to remove children non-recursive
/// and will fall back to recursive removal if it gets ZNOTEMPTY for some child.
/// Returns true if no kind of fallback happened.
bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, const String & keep_child_node = {});
/// Remove all children nodes (non recursive).
void removeChildren(const std::string & path);

View File

@ -675,6 +675,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
}
}
current_zookeeper->set(replica_path + "/log_ptr", toString(max_log_ptr));
ddl_worker->updateLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr));
}
std::map<String, String> DatabaseReplicated::tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr)

View File

@ -66,9 +66,14 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
logs_to_keep = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/logs_to_keep"));
if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr)
{
database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr);
}
else
{
last_skipped_entry_name.emplace(DDLTaskBase::getLogEntryName(our_log_ptr));
updateLogPointer(DDLTaskBase::getLogEntryName(our_log_ptr));
}
}
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)
@ -140,10 +145,10 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr
/// but it requires more complex logic around /try node.
auto zookeeper = getAndSetZooKeeper();
UInt32 our_log_ptr = parse<UInt32>(zookeeper->get(database->replica_path + "/log_ptr"));
UInt32 our_log_ptr = getLogPointer();
UInt32 max_log_ptr = parse<UInt32>(zookeeper->get(database->zookeeper_path + "/max_log_ptr"));
assert(our_log_ptr <= max_log_ptr);
if (database->db_settings.max_replication_lag_to_enqueue < max_log_ptr - our_log_ptr)
if (our_log_ptr + database->db_settings.max_replication_lag_to_enqueue < max_log_ptr)
throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, "
"because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr);
@ -203,7 +208,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
}
}
UInt32 our_log_ptr = parse<UInt32>(zookeeper->get(fs::path(database->replica_path) / "log_ptr"));
UInt32 our_log_ptr = getLogPointer();
UInt32 entry_num = DatabaseReplicatedTask::getLogEntryNumber(entry_name);
if (entry_num <= our_log_ptr)
@ -308,4 +313,18 @@ bool DatabaseReplicatedDDLWorker::canRemoveQueueEntry(const String & entry_name,
return entry_number + logs_to_keep < max_log_ptr;
}
void DatabaseReplicatedDDLWorker::updateLogPointer(const String & processed_entry_name)
{
updateMaxDDLEntryID(processed_entry_name);
assert(max_id.load() == parse<UInt32>(getAndSetZooKeeper()->get(fs::path(database->replica_path) / "log_ptr")));
}
UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const
{
/// NOTE it main not be equal to the log_ptr in zk:
/// - max_id can be equal to log_ptr - 1 due to race condition (when it's updated in zk, but not updated in memory yet)
/// - max_id can be greater than log_ptr, because log_ptr is not updated for failed and dummy entries
return max_id.load();
}
}

View File

@ -32,6 +32,8 @@ public:
static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
DatabaseReplicated * const database, bool committed = false); /// NOLINT
void updateLogPointer(const String & processed_entry_name);
UInt32 getLogPointer() const;
private:
bool initializeMainThread() override;
void initializeReplication();

View File

@ -876,7 +876,7 @@ void DDLWorker::cleanupQueue(Int64, const ZooKeeperPtr & zookeeper)
/// We recursively delete all nodes except node_path/finished to prevent staled hosts from
/// creating node_path/active node (see createStatusDirs(...))
zookeeper->tryRemoveChildrenRecursive(node_path, "finished");
zookeeper->tryRemoveChildrenRecursive(node_path, /* probably_flat */ false, "finished");
/// And then we remove node_path and node_path/finished in a single transaction
Coordination::Requests ops;

View File

@ -318,19 +318,22 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
bool skip_sanity_checks = false;
if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
/// It does not make sense for CREATE query
if (attach)
{
skip_sanity_checks = true;
current_zookeeper->remove(replica_path + "/flags/force_restore_data");
if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
{
skip_sanity_checks = true;
current_zookeeper->remove(replica_path + "/flags/force_restore_data");
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag {}/flags/force_restore_data).", replica_path);
}
else if (has_force_restore_data_flag)
{
skip_sanity_checks = true;
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag {}/flags/force_restore_data).", replica_path);
}
else if (has_force_restore_data_flag)
{
skip_sanity_checks = true;
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
}
}
loadDataParts(skip_sanity_checks);
@ -568,35 +571,31 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{
auto zookeeper = getZooKeeper();
/// Working with quorum.
zookeeper->createIfNotExists(zookeeper_path + "/quorum", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/parallel", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String());
/// Tracking lag of replicas.
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", String());
zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", String());
/// Mutations
zookeeper->createIfNotExists(zookeeper_path + "/mutations", String());
zookeeper->createIfNotExists(replica_path + "/mutation_pointer", String());
std::vector<zkutil::ZooKeeper::FutureCreate> futures;
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/quorum/parallel", String(), zkutil::CreateMode::Persistent));
/// Nodes for remote fs zero-copy replication
const auto settings = getSettings();
if (settings->allow_remote_fs_zero_copy_replication)
{
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3", String());
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3/shared", String());
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_hdfs", String());
zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_hdfs/shared", String());
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_s3", String(), zkutil::CreateMode::Persistent));
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_s3/shared", String(), zkutil::CreateMode::Persistent));
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_hdfs", String(), zkutil::CreateMode::Persistent));
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_hdfs/shared", String(), zkutil::CreateMode::Persistent));
}
/// Part movement.
zookeeper->createIfNotExists(zookeeper_path + "/part_moves_shard", String());
zookeeper->createIfNotExists(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString());
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/part_moves_shard", String(), zkutil::CreateMode::Persistent));
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString(), zkutil::CreateMode::Persistent));
/// For ALTER PARTITION with multi-leaders
zookeeper->createIfNotExists(zookeeper_path + "/alter_partition_version", String());
futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/alter_partition_version", String(), zkutil::CreateMode::Persistent));
for (auto & future : futures)
{
auto res = future.get();
if (res.error != Coordination::Error::ZOK && res.error != Coordination::Error::ZNODEEXISTS)
throw Coordination::Exception(fmt::format("Failed to create new nodes at {}", zookeeper_path), res.error);
}
}
@ -671,6 +670,16 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name,
zkutil::CreateMode::Persistent));
/// The following 4 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum/last_part", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum/failed_parts", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/mutations", "",
zkutil::CreateMode::Persistent));
/// And create first replica atomically. See also "createReplica" method that is used to create not the first replicas.
ops.emplace_back(zkutil::makeCreateRequest(replica_path, "",
@ -694,6 +703,14 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
zkutil::CreateMode::Persistent));
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/min_unprocessed_insert_time", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/max_processed_insert_time", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/mutation_pointer", "",
zkutil::CreateMode::Persistent));
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
@ -759,6 +776,14 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version),
zkutil::CreateMode::Persistent));
/// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes()
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/min_unprocessed_insert_time", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/max_processed_insert_time", "",
zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/mutation_pointer", "",
zkutil::CreateMode::Persistent));
/// Check version of /replicas to see if there are any replicas created at the same moment of time.
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version));
@ -840,18 +865,42 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
/// "The local set of parts of table X doesn't look like the set of parts in ZooKeeper"
///
{
Strings children = zookeeper->getChildren(remote_replica_path);
/// Remove metadata first
[[maybe_unused]] auto code = zookeeper->tryRemove(fs::path(remote_replica_path) / "metadata");
assert(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE);
if (std::find(children.begin(), children.end(), "metadata") != children.end())
zookeeper->remove(fs::path(remote_replica_path) / "metadata");
for (const auto & child : children)
/// Then try to remove paths that are known to be flat (all children are leafs)
Strings flat_nodes = {"flags", "parts", "queue"};
for (const auto & node : flat_nodes)
{
if (child != "metadata")
zookeeper->removeRecursive(fs::path(remote_replica_path) / child);
bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true);
if (!removed_quickly)
LOG_WARNING(logger, "Cannot quickly remove node {} and its children (replica: {}). Will remove recursively.",
node, remote_replica_path);
}
zookeeper->remove(remote_replica_path);
/// Then try to remove nodes that are known to have no children (and should always exist)
Coordination::Requests ops;
for (const auto & node : flat_nodes)
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/" + node, -1));
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/columns", -1));
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/host", -1));
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/is_lost", -1));
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/log_pointer", -1));
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/max_processed_insert_time", -1));
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/min_unprocessed_insert_time", -1));
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/metadata_version", -1));
ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/mutation_pointer", -1));
Coordination::Responses res;
code = zookeeper->tryMulti(ops, res);
if (code != Coordination::Error::ZOK)
LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (replica: {}). Will remove recursively.",
Coordination::errorMessage(code), remote_replica_path);
/// And finally remove everything else recursively
zookeeper->tryRemoveRecursive(remote_replica_path);
}
/// It may left some garbage if replica_path subtree are concurrently modified
@ -911,17 +960,50 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger)
{
bool completely_removed = false;
Strings flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids", "log"};
/// First try to remove paths that are known to be flat
for (const auto & node : flat_nodes)
{
bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true);
if (!removed_quickly)
LOG_WARNING(logger, "Cannot quickly remove node {} and its children (table: {}). Will remove recursively.",
node, zookeeper_path);
}
/// Then try to remove nodes that are known to have no children (and should always exist)
Coordination::Requests ops;
for (const auto & node : flat_nodes)
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/" + node, -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/alter_partition_version", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/columns", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/max_processed_insert_time", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/min_unprocessed_insert_time", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata_version", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/mutation_pointer", -1));
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1));
Coordination::Responses res;
auto code = zookeeper->tryMulti(ops, res);
if (code != Coordination::Error::ZOK)
LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (table: {}). Will remove recursively.",
Coordination::errorMessage(code), zookeeper_path);
Strings children;
Coordination::Error code = zookeeper->tryGetChildren(zookeeper_path, children);
code = zookeeper->tryGetChildren(zookeeper_path, children);
if (code == Coordination::Error::ZNONODE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is a race condition between creation and removal of replicated table. It's a bug");
for (const auto & child : children)
{
if (child != "dropped")
zookeeper->tryRemoveRecursive(fs::path(zookeeper_path) / child);
}
Coordination::Requests ops;
ops.clear();
Coordination::Responses responses;
ops.emplace_back(zkutil::makeRemoveRequest(metadata_drop_lock->getPath(), -1));
ops.emplace_back(zkutil::makeRemoveRequest(fs::path(zookeeper_path) / "dropped", -1));
@ -4327,7 +4409,7 @@ std::optional<UInt64> StorageReplicatedMergeTree::totalBytes(const Settings & se
void StorageReplicatedMergeTree::assertNotReadonly() const
{
if (is_readonly)
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (zookeeper path: {})", zookeeper_path);
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {})", replica_path);
}