diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp index 8bab744459d..04eea02dc0d 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -121,13 +122,15 @@ ObjectStorageQueueMetadata::ObjectStorageQueueMetadata( const fs::path & zookeeper_path_, const ObjectStorageQueueTableMetadata & table_metadata_, size_t cleanup_interval_min_ms_, - size_t cleanup_interval_max_ms_) + size_t cleanup_interval_max_ms_, + size_t keeper_multiread_batch_size_) : table_metadata(table_metadata_) , mode(table_metadata.getMode()) , zookeeper_path(zookeeper_path_) , buckets_num(getBucketsNum(table_metadata_)) , cleanup_interval_min_ms(cleanup_interval_min_ms_) , cleanup_interval_max_ms(cleanup_interval_max_ms_) + , keeper_multiread_batch_size(keeper_multiread_batch_size_) , log(getLogger("StorageObjectStorageQueue(" + zookeeper_path_.string() + ")")) , local_file_statuses(std::make_shared()) { @@ -386,9 +389,7 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() if (code != Coordination::Error::ZOK) { if (code == Coordination::Error::ZNONODE) - { LOG_TEST(log, "Path {} does not exist", zookeeper_failed_path.string()); - } else throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code)); } @@ -438,21 +439,35 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() /// Ordered in ascending order of timestamps. std::set sorted_nodes(node_cmp); + std::vector paths; auto fetch_nodes = [&](const Strings & nodes, const fs::path & base_path) { + auto get_paths = [&] + { + auto response = zk_client->tryGet(paths); + + for (size_t i = 0; i < response.size(); ++i) + { + if (response[i].error == Coordination::Error::ZNONODE) + { + LOG_ERROR(log, "Failed to fetch node metadata {}", paths[i]); + continue; + } + + chassert(response[i].error == Coordination::Error::ZOK); + sorted_nodes.emplace(paths[i], ObjectStorageQueueIFileMetadata::NodeMetadata::fromString(response[i].data)); + LOG_TEST(log, "Fetched metadata for node {}", paths[i]); + } + paths.clear(); + }; + for (const auto & node : nodes) { - const std::string path = base_path / node; + paths.push_back(base_path / node); try { - std::string metadata_str; - if (zk_client->tryGet(path, metadata_str)) - { - sorted_nodes.emplace(path, ObjectStorageQueueIFileMetadata::NodeMetadata::fromString(metadata_str)); - LOG_TEST(log, "Fetched metadata for node {}", path); - } - else - LOG_ERROR(log, "Failed to fetch node metadata {}", path); + if (paths.size() == keeper_multiread_batch_size) + get_paths(); } catch (const zkutil::KeeperException & e) { @@ -466,6 +481,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() throw; } } + + if (!paths.empty()) + get_paths(); }; fetch_nodes(processed_nodes, zookeeper_processed_path); @@ -482,7 +500,56 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", table_metadata.tracked_files_limit, table_metadata.tracked_files_ttl_sec, get_nodes_str()); + static constexpr size_t keeper_multi_batch_size = 100; + Coordination::Requests remove_requests; + Coordination::Responses remove_responses; + remove_requests.reserve(keeper_multi_batch_size); + remove_responses.reserve(keeper_multi_batch_size); + size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - table_metadata.tracked_files_limit : 0; + + const auto remove_nodes = [&](bool node_limit) + { + code = zk_client->tryMulti(remove_requests, remove_responses); + + if (code == Coordination::Error::ZOK) + { + if (node_limit) + nodes_to_remove -= remove_requests.size(); + } + else + { + for (size_t i = 0; i < remove_requests.size(); ++i) + { + if (remove_responses[i]->error == Coordination::Error::ZOK) + { + if (node_limit) + --nodes_to_remove; + } + else if (remove_responses[i]->error == Coordination::Error::ZRUNTIMEINCONSISTENCY) + { + /// requests with ZRUNTIMEINCONSISTENCY were not processed because the multi request was aborted before + /// so we try removing it again without multi requests + code = zk_client->tryRemove(remove_requests[i]->getPath()); + if (code == Coordination::Error::ZOK) + { + if (node_limit) + --nodes_to_remove; + } + else + { + LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", remove_requests[i]->getPath(), code); + } + } + else + { + LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", remove_requests[i]->getPath(), remove_responses[i]->error); + } + } + } + remove_requests.clear(); + }; + for (const auto & node : sorted_nodes) { if (nodes_to_remove) @@ -491,12 +558,10 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() node.metadata.file_path, node.zk_path); local_file_statuses->remove(node.metadata.file_path, /* if_exists */true); - - code = zk_client->tryRemove(node.zk_path); - if (code == Coordination::Error::ZOK) - --nodes_to_remove; - else - LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", node.zk_path, code); + remove_requests.push_back(zkutil::makeRemoveRequest(node.zk_path, -1)); + /// we either reach max multi batch size OR we already added maximum amount of nodes we want to delete based on the node limit + if (remove_requests.size() == keeper_multi_batch_size || remove_requests.size() == nodes_to_remove) + remove_nodes(/*node_limit=*/true); } else if (check_nodes_ttl) { @@ -507,10 +572,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() node.metadata.file_path, node.zk_path); local_file_statuses->remove(node.metadata.file_path, /* if_exists */true); - - code = zk_client->tryRemove(node.zk_path); - if (code != Coordination::Error::ZOK) - LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", node.zk_path, code); + remove_requests.push_back(zkutil::makeRemoveRequest(node.zk_path, -1)); + if (remove_requests.size() == keeper_multi_batch_size) + remove_nodes(/*node_limit=*/false); } else if (!nodes_to_remove) { @@ -527,6 +591,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl() } } + if (!remove_requests.empty()) + remove_nodes(/*node_limit=*/false); + LOG_TRACE(log, "Node limits check finished"); } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h index 93e726b17ab..cea17dc3572 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h @@ -57,7 +57,8 @@ public: const fs::path & zookeeper_path_, const ObjectStorageQueueTableMetadata & table_metadata_, size_t cleanup_interval_min_ms_, - size_t cleanup_interval_max_ms_); + size_t cleanup_interval_max_ms_, + size_t keeper_multiread_batch_size_); ~ObjectStorageQueueMetadata(); @@ -98,6 +99,7 @@ private: const fs::path zookeeper_path; const size_t buckets_num; const size_t cleanup_interval_min_ms, cleanup_interval_max_ms; + const size_t keeper_multiread_batch_size; LoggerPtr log; diff --git a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp index 835c63a85e7..ef0285a0b63 100644 --- a/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp +++ b/src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -40,6 +41,11 @@ namespace Setting extern const SettingsBool use_concurrency_control; } +namespace ServerSetting +{ + extern const ServerSettingsUInt64 keeper_multiread_batch_size; +} + namespace ObjectStorageQueueSetting { extern const ObjectStorageQueueSettingsUInt32 cleanup_interval_max_ms; @@ -212,7 +218,11 @@ StorageObjectStorageQueue::StorageObjectStorageQueue( zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log); auto queue_metadata = std::make_unique( - zk_path, std::move(table_metadata), (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms], (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]); + zk_path, + std::move(table_metadata), + (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms], + (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms], + getContext()->getServerSettings()[ServerSetting::keeper_multiread_batch_size]); files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata));