Backport #73357 to 24.10: Use multi requests in cleanup thread of ObjectStorageQueueMetadata

This commit is contained in:
robot-clickhouse 2024-12-17 15:08:00 +00:00
parent 739b6aa6ea
commit e5a307adbc
3 changed files with 103 additions and 24 deletions

View File

@ -17,6 +17,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/getRandomASCIIString.h>
#include <Common/randomSeed.h>
#include <Common/DNSResolver.h>
#include <numeric>
@ -121,13 +122,15 @@ ObjectStorageQueueMetadata::ObjectStorageQueueMetadata(
const fs::path & zookeeper_path_,
const ObjectStorageQueueTableMetadata & table_metadata_,
size_t cleanup_interval_min_ms_,
size_t cleanup_interval_max_ms_)
size_t cleanup_interval_max_ms_,
size_t keeper_multiread_batch_size_)
: table_metadata(table_metadata_)
, mode(table_metadata.getMode())
, zookeeper_path(zookeeper_path_)
, buckets_num(getBucketsNum(table_metadata_))
, cleanup_interval_min_ms(cleanup_interval_min_ms_)
, cleanup_interval_max_ms(cleanup_interval_max_ms_)
, keeper_multiread_batch_size(keeper_multiread_batch_size_)
, log(getLogger("StorageObjectStorageQueue(" + zookeeper_path_.string() + ")"))
, local_file_statuses(std::make_shared<LocalFileStatuses>())
{
@ -386,9 +389,7 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
if (code != Coordination::Error::ZOK)
{
if (code == Coordination::Error::ZNONODE)
{
LOG_TEST(log, "Path {} does not exist", zookeeper_failed_path.string());
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code));
}
@ -438,21 +439,35 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
/// Ordered in ascending order of timestamps.
std::set<Node, decltype(node_cmp)> sorted_nodes(node_cmp);
std::vector<std::string> paths;
auto fetch_nodes = [&](const Strings & nodes, const fs::path & base_path)
{
auto get_paths = [&]
{
auto response = zk_client->tryGet(paths);
for (size_t i = 0; i < response.size(); ++i)
{
if (response[i].error == Coordination::Error::ZNONODE)
{
LOG_ERROR(log, "Failed to fetch node metadata {}", paths[i]);
continue;
}
chassert(response[i].error == Coordination::Error::ZOK);
sorted_nodes.emplace(paths[i], ObjectStorageQueueIFileMetadata::NodeMetadata::fromString(response[i].data));
LOG_TEST(log, "Fetched metadata for node {}", paths[i]);
}
paths.clear();
};
for (const auto & node : nodes)
{
const std::string path = base_path / node;
paths.push_back(base_path / node);
try
{
std::string metadata_str;
if (zk_client->tryGet(path, metadata_str))
{
sorted_nodes.emplace(path, ObjectStorageQueueIFileMetadata::NodeMetadata::fromString(metadata_str));
LOG_TEST(log, "Fetched metadata for node {}", path);
}
else
LOG_ERROR(log, "Failed to fetch node metadata {}", path);
if (paths.size() == keeper_multiread_batch_size)
get_paths();
}
catch (const zkutil::KeeperException & e)
{
@ -466,6 +481,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
throw;
}
}
if (!paths.empty())
get_paths();
};
fetch_nodes(processed_nodes, zookeeper_processed_path);
@ -482,7 +500,56 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}",
table_metadata.tracked_files_limit, table_metadata.tracked_files_ttl_sec, get_nodes_str());
static constexpr size_t keeper_multi_batch_size = 100;
Coordination::Requests remove_requests;
Coordination::Responses remove_responses;
remove_requests.reserve(keeper_multi_batch_size);
remove_responses.reserve(keeper_multi_batch_size);
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - table_metadata.tracked_files_limit : 0;
const auto remove_nodes = [&](bool node_limit)
{
code = zk_client->tryMulti(remove_requests, remove_responses);
if (code == Coordination::Error::ZOK)
{
if (node_limit)
nodes_to_remove -= remove_requests.size();
}
else
{
for (size_t i = 0; i < remove_requests.size(); ++i)
{
if (remove_responses[i]->error == Coordination::Error::ZOK)
{
if (node_limit)
--nodes_to_remove;
}
else if (remove_responses[i]->error == Coordination::Error::ZRUNTIMEINCONSISTENCY)
{
/// requests with ZRUNTIMEINCONSISTENCY were not processed because the multi request was aborted before
/// so we try removing it again without multi requests
code = zk_client->tryRemove(remove_requests[i]->getPath());
if (code == Coordination::Error::ZOK)
{
if (node_limit)
--nodes_to_remove;
}
else
{
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", remove_requests[i]->getPath(), code);
}
}
else
{
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", remove_requests[i]->getPath(), remove_responses[i]->error);
}
}
}
remove_requests.clear();
};
for (const auto & node : sorted_nodes)
{
if (nodes_to_remove)
@ -491,12 +558,10 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
node.metadata.file_path, node.zk_path);
local_file_statuses->remove(node.metadata.file_path, /* if_exists */true);
code = zk_client->tryRemove(node.zk_path);
if (code == Coordination::Error::ZOK)
--nodes_to_remove;
else
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", node.zk_path, code);
remove_requests.push_back(zkutil::makeRemoveRequest(node.zk_path, -1));
/// we either reach max multi batch size OR we already added maximum amount of nodes we want to delete based on the node limit
if (remove_requests.size() == keeper_multi_batch_size || remove_requests.size() == nodes_to_remove)
remove_nodes(/*node_limit=*/true);
}
else if (check_nodes_ttl)
{
@ -507,10 +572,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
node.metadata.file_path, node.zk_path);
local_file_statuses->remove(node.metadata.file_path, /* if_exists */true);
code = zk_client->tryRemove(node.zk_path);
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", node.zk_path, code);
remove_requests.push_back(zkutil::makeRemoveRequest(node.zk_path, -1));
if (remove_requests.size() == keeper_multi_batch_size)
remove_nodes(/*node_limit=*/false);
}
else if (!nodes_to_remove)
{
@ -527,6 +591,9 @@ void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
}
}
if (!remove_requests.empty())
remove_nodes(/*node_limit=*/false);
LOG_TRACE(log, "Node limits check finished");
}

View File

@ -57,7 +57,8 @@ public:
const fs::path & zookeeper_path_,
const ObjectStorageQueueTableMetadata & table_metadata_,
size_t cleanup_interval_min_ms_,
size_t cleanup_interval_max_ms_);
size_t cleanup_interval_max_ms_,
size_t keeper_multiread_batch_size_);
~ObjectStorageQueueMetadata();
@ -98,6 +99,7 @@ private:
const fs::path zookeeper_path;
const size_t buckets_num;
const size_t cleanup_interval_min_ms, cleanup_interval_max_ms;
const size_t keeper_multiread_batch_size;
LoggerPtr log;

View File

@ -2,6 +2,7 @@
#include <Common/ProfileEvents.h>
#include <Core/Settings.h>
#include <Core/ServerSettings.h>
#include <IO/CompressionMethod.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterInsertQuery.h>
@ -40,6 +41,11 @@ namespace Setting
extern const SettingsBool use_concurrency_control;
}
namespace ServerSetting
{
extern const ServerSettingsUInt64 keeper_multiread_batch_size;
}
namespace ObjectStorageQueueSetting
{
extern const ObjectStorageQueueSettingsUInt32 cleanup_interval_max_ms;
@ -212,7 +218,11 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
zk_path, *queue_settings_, storage_metadata.getColumns(), configuration_->format, context_, is_attach, log);
auto queue_metadata = std::make_unique<ObjectStorageQueueMetadata>(
zk_path, std::move(table_metadata), (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms], (*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms]);
zk_path,
std::move(table_metadata),
(*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_min_ms],
(*queue_settings_)[ObjectStorageQueueSetting::cleanup_interval_max_ms],
getContext()->getServerSettings()[ServerSetting::keeper_multiread_batch_size]);
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, std::move(queue_metadata));