Address PR comments

This commit is contained in:
Antonio Andelic 2022-09-22 13:03:27 +00:00
parent 448f8184f3
commit 44a3d6babe
4 changed files with 40 additions and 25 deletions

View File

@ -296,7 +296,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
snapshot_s3.startup();
snapshot_s3.startup(config);
server = std::make_unique<KeeperServer>(configuration_and_settings, config, responses_queue, snapshots_queue);
@ -325,7 +325,6 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
/// Start it after keeper server start
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
update_configuration_thread = ThreadFromGlobalPool([this] { updateConfigurationThread(); });
updateConfiguration(config);
LOG_DEBUG(log, "Dispatcher initialized");
}

View File

@ -66,16 +66,16 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
auto endpoint = config.getString(config_prefix + ".endpoint");
auto new_uri = S3::URI{Poco::URI(endpoint)};
std::unique_lock client_lock{snapshot_s3_client_mutex};
{
std::lock_guard client_lock{snapshot_s3_client_mutex};
// if client is not changed (same auth settings, same endpoint) we don't need to update
if (snapshot_s3_client && snapshot_s3_client->client && auth_settings == snapshot_s3_client->auth_settings
&& snapshot_s3_client->uri.uri == new_uri.uri)
return;
}
LOG_INFO(log, "S3 configuration was updated");
client_lock.unlock();
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
HeaderCollection headers = auth_settings.headers;
@ -108,9 +108,10 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
auto new_client = std::make_shared<KeeperSnapshotManagerS3::S3Configuration>(std::move(new_uri), std::move(auth_settings), std::move(client));
client_lock.lock();
{
std::lock_guard client_lock{snapshot_s3_client_mutex};
snapshot_s3_client = std::move(new_client);
client_lock.unlock();
}
LOG_INFO(log, "S3 client was updated");
}
catch (...)
@ -189,6 +190,7 @@ void KeeperSnapshotManagerS3::snapshotS3Thread()
}
// First we need to verify that there isn't already a lock file for the snapshot we want to upload
// Only leader uploads a snapshot, but there can be a rare case where we have 2 leaders in NuRaft
if (file_exists(lock_file))
{
LOG_ERROR(log, "Lock file for {} already, exists. Probably a different node is already uploading the snapshot", snapshot_name);
@ -228,21 +230,30 @@ void KeeperSnapshotManagerS3::snapshotS3Thread()
continue;
}
SCOPE_EXIT(
{
LOG_INFO(log, "Removing lock file");
try
{
Aws::S3::Model::DeleteObjectRequest delete_request;
delete_request.SetBucket(s3_client->uri.bucket);
delete_request.SetKey(lock_file);
auto delete_outcome = s3_client->client->DeleteObject(delete_request);
if (!delete_outcome.IsSuccess())
throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType());
}
catch (...)
{
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});
WriteBufferFromS3 snapshot_writer = create_writer(snapshot_name);
copyData(snapshot_file, snapshot_writer);
snapshot_writer.finalize();
LOG_INFO(log, "Successfully uploaded {} to S3", snapshot_path);
LOG_INFO(log, "Removing lock file");
Aws::S3::Model::DeleteObjectRequest delete_request;
delete_request.SetBucket(s3_client->uri.bucket);
delete_request.SetKey(lock_file);
auto delete_outcome = s3_client->client->DeleteObject(delete_request);
if (!delete_outcome.IsSuccess())
throw S3Exception(delete_outcome.GetError().GetMessage(), delete_outcome.GetError().GetErrorType());
}
catch (...)
{
@ -261,8 +272,9 @@ void KeeperSnapshotManagerS3::uploadSnapshot(const std::string & path)
LOG_WARNING(log, "Failed to add snapshot {} to S3 queue", path);
}
void KeeperSnapshotManagerS3::startup()
void KeeperSnapshotManagerS3::startup(const Poco::Util::AbstractConfiguration & config)
{
updateS3Configuration(config);
snapshot_s3_thread = ThreadFromGlobalPool([this] { snapshotS3Thread(); });
}

View File

@ -24,7 +24,7 @@ public:
void updateS3Configuration(const Poco::Util::AbstractConfiguration & config);
void uploadSnapshot(const std::string & path);
void startup();
void startup(const Poco::Util::AbstractConfiguration & config);
void shutdown();
private:
using SnapshotS3Queue = ConcurrentBoundedQueue<std::string>;
@ -54,7 +54,7 @@ public:
void updateS3Configuration(const Poco::Util::AbstractConfiguration &) {}
void uploadSnapshot(const std::string &) {}
void startup() {}
void startup(const Poco::Util::AbstractConfiguration & config) {}
void shutdown() {}
};

View File

@ -77,6 +77,8 @@ def wait_node(node):
def test_s3_upload(started_cluster):
node1_zk = get_fake_zk(node1.name)
# we defined in configs snapshot_distance as 50
# so after 50 requests we should generate a snapshot
for _ in range(210):
node1_zk.create("/test", sequence=True)
@ -99,6 +101,8 @@ def test_s3_upload(started_cluster):
destroy_zk_client(node1_zk)
node1.stop_clickhouse(kill=True)
# wait for new leader to be picked and that it continues
# uploading snapshots
wait_node(node2)
node2_zk = get_fake_zk(node2.name)
for _ in range(200):