Add bucket versioning

This commit is contained in:
kssenii 2024-05-30 16:24:03 +02:00
parent b3a58a6294
commit 9238045eea
9 changed files with 109 additions and 44 deletions

View File

@ -181,7 +181,6 @@ S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::createNodeMetadata(
std::string S3QueueIFileMetadata::getProcessorInfo(const std::string & processor_id)
{
/// Add information which will be useful for debugging just in case.
/// TODO: add it for Unordered mode as well.
Poco::JSON::Object json;
json.set("hostname", DNSResolver::instance().getHostName());
json.set("processor_id", processor_id);
@ -241,7 +240,6 @@ void S3QueueIFileMetadata::setFailed(const std::string & exception)
ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles);
file_status->onFailed(exception);
node_metadata.last_exception = exception;
if (max_loading_retries == 0)

View File

@ -91,8 +91,17 @@ protected:
NodeMetadata node_metadata;
LoggerPtr log;
/// processing node is ephemeral, so we cannot verify with it if
/// this node was created by a certain processor on a previous s3 queue processing stage,
/// because we could get a session expired in between the stages
/// and someone else could just create this processing node.
/// Therefore we also create a persistent processing node
/// which is updated on each creation of ephemeral processing node.
/// We use the version of this node to verify the version of the processing ephemeral node.
const std::string processing_node_id_path;
/// Id of the processor.
std::optional<std::string> processing_id;
/// Version of the processing id persistent node.
std::optional<int> processing_id_version;
static std::string getNodeName(const std::string & path);

View File

@ -118,6 +118,7 @@ S3QueueMetadata::S3QueueMetadata(const fs::path & zookeeper_path_, const S3Queue
, zookeeper_path(zookeeper_path_)
, buckets_num(getBucketsNum(settings_))
, log(getLogger("StorageS3Queue(" + zookeeper_path_.string() + ")"))
, local_file_statuses(std::make_shared<LocalFileStatuses>())
{
if (settings.mode == S3QueueMode::UNORDERED
&& (settings.s3queue_tracked_files_limit || settings.s3queue_tracked_file_ttl_sec))
@ -160,7 +161,9 @@ S3QueueMetadata::FileStatuses S3QueueMetadata::getFileStateses() const
return local_file_statuses->getAll();
}
S3QueueMetadata::FileMetadataPtr S3QueueMetadata::getFileMetadata(const std::string & path)
S3QueueMetadata::FileMetadataPtr S3QueueMetadata::getFileMetadata(
const std::string & path,
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info)
{
auto file_status = local_file_statuses->get(path, /* create */true);
switch (settings.mode)
@ -170,6 +173,7 @@ S3QueueMetadata::FileMetadataPtr S3QueueMetadata::getFileMetadata(const std::str
zookeeper_path,
path,
file_status,
bucket_info,
buckets_num,
settings.s3queue_loading_retries,
log);

View File

@ -61,7 +61,7 @@ public:
void checkSettings(const S3QueueSettings & settings) const;
void shutdown();
FileMetadataPtr getFileMetadata(const std::string & path);
FileMetadataPtr getFileMetadata(const std::string & path, S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
FileStatusPtr getFileStatus(const std::string & path);
FileStatuses getFileStateses() const;

View File

@ -44,6 +44,7 @@ S3QueueOrderedFileMetadata::S3QueueOrderedFileMetadata(
const std::filesystem::path & zk_path_,
const std::string & path_,
FileStatusPtr file_status_,
BucketInfoPtr bucket_info_,
size_t buckets_num_,
size_t max_loading_retries_,
LoggerPtr log_)
@ -57,6 +58,7 @@ S3QueueOrderedFileMetadata::S3QueueOrderedFileMetadata(
log_)
, buckets_num(buckets_num_)
, zk_path(zk_path_)
, bucket_info(bucket_info_)
{
}
@ -85,16 +87,41 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
{
const auto zk_client = getZooKeeper();
const auto bucket_lock_path = zk_path / "buckets" / toString(bucket) / "lock";
const auto bucket_lock_id_path = zk_path / "buckets" / toString(bucket) / "lock_id";
const auto processor_info = getProcessorInfo(processor);
auto code = zk_client->tryCreate(bucket_lock_path, processor_info, zkutil::CreateMode::Ephemeral);
Coordination::Requests requests;
/// Create bucket lock node as ephemeral node.
requests.push_back(zkutil::makeCreateRequest(bucket_lock_path, "", zkutil::CreateMode::Ephemeral));
/// Crate bucket lock id node as persistent node if it does not exist yet.
requests.push_back(
zkutil::makeCreateRequest(
bucket_lock_id_path, processor_info, zkutil::CreateMode::Persistent, /* ignore_if_exists */true));
/// Update bucket lock id path. We use its version as a version of ephemeral bucket lock node.
/// (See comment near S3QueueIFileMetadata::processing_node_version).
requests.push_back(zkutil::makeSetRequest(bucket_lock_id_path, processor_info, -1));
Coordination::Responses responses;
const auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
const auto * set_response = dynamic_cast<const Coordination::SetResponse *>(responses[2].get());
const auto bucket_lock_version = set_response->stat.version;
LOG_TEST(
getLogger("S3QueueOrderedFileMetadata"),
"Processor {} acquired bucket {} for processing", processor, bucket);
"Processor {} acquired bucket {} for processing (bucket lock version: {})",
processor, bucket, bucket_lock_version);
return std::make_shared<BucketHolder>(bucket, bucket_lock_path, zk_client);
return std::make_shared<BucketHolder>(
bucket,
bucket_lock_version,
bucket_lock_path,
bucket_lock_id_path,
zk_client);
}
if (code == Coordination::Error::ZNODEEXISTS)
@ -117,8 +144,10 @@ std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueOrderedFileMetad
CREATED_PROCESSING_PATH = 2,
/// update processing id
SET_PROCESSING_ID = 4,
/// bucket version did not change
CHECKED_BUCKET_VERSION = 5,
/// max_processed_node version did not change
CHECKED_MAX_PROCESSED_PATH = 5,
CHECKED_MAX_PROCESSED_PATH = 6,
};
const auto zk_client = getZooKeeper();
@ -151,6 +180,12 @@ std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueOrderedFileMetad
processing_node_id_path, processor_info, zkutil::CreateMode::Persistent, /* ignore_if_exists */true));
requests.push_back(zkutil::makeSetRequest(processing_node_id_path, processor_info, -1));
if (bucket_info)
requests.push_back(zkutil::makeCheckRequest(bucket_info->bucket_lock_id_path, bucket_info->bucket_version));
/// TODO: for ordered processing with buckets it should be enough to check only bucket lock version,
/// so may be remove creation and check for processing_node_id if bucket_info is set?
if (has_processed_node)
{
requests.push_back(zkutil::makeCheckRequest(processed_node_path, processed_node_stat.version));
@ -178,7 +213,13 @@ std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueOrderedFileMetad
if (is_request_failed(CREATED_PROCESSING_PATH))
return {false, FileStatus::State::Processing};
if (is_request_failed(CHECKED_MAX_PROCESSED_PATH))
if (bucket_info && is_request_failed(CHECKED_BUCKET_VERSION))
{
LOG_TEST(log, "Version of bucket lock changed: {}. Will retry for file `{}`", code, path);
continue;
}
if (is_request_failed(bucket_info ? CHECKED_MAX_PROCESSED_PATH : CHECKED_BUCKET_VERSION))
{
LOG_TEST(log, "Version of max processed file changed: {}. Will retry for file `{}`", code, path);
continue;

View File

@ -12,11 +12,20 @@ class S3QueueOrderedFileMetadata : public S3QueueIFileMetadata
public:
using Processor = std::string;
using Bucket = size_t;
struct BucketInfo
{
Bucket bucket;
int bucket_version;
std::string bucket_lock_path;
std::string bucket_lock_id_path;
};
using BucketInfoPtr = std::shared_ptr<const BucketInfo>;
explicit S3QueueOrderedFileMetadata(
const std::filesystem::path & zk_path_,
const std::string & path_,
FileStatusPtr file_status_,
BucketInfoPtr bucket_info_,
size_t buckets_num_,
size_t max_loading_retries_,
LoggerPtr log_);
@ -40,6 +49,7 @@ public:
private:
const size_t buckets_num;
const std::string zk_path;
const BucketInfoPtr bucket_info;
std::pair<bool, FileStatus::State> setProcessingImpl() override;
void setProcessedImpl() override;
@ -79,11 +89,19 @@ struct S3QueueOrderedFileMetadata::BucketHolder
{
BucketHolder(
const Bucket & bucket_,
int bucket_version_,
const std::string & bucket_lock_path_,
const std::string & bucket_lock_id_path_,
zkutil::ZooKeeperPtr zk_client_)
: bucket(bucket_), bucket_lock_path(bucket_lock_path_), zk_client(zk_client_) {}
: bucket_info(std::make_shared<BucketInfo>(BucketInfo{
.bucket = bucket_,
.bucket_version = bucket_version_,
.bucket_lock_path = bucket_lock_path_,
.bucket_lock_id_path = bucket_lock_id_path_}))
, zk_client(zk_client_) {}
Bucket getBucket() const { return bucket; }
Bucket getBucket() const { return bucket_info->bucket; }
BucketInfoPtr getBucketInfo() const { return bucket_info; }
void release()
{
@ -91,9 +109,18 @@ struct S3QueueOrderedFileMetadata::BucketHolder
return;
released = true;
LOG_TEST(getLogger("S3QueueBucketHolder"), "Releasing bucket {}", bucket);
LOG_TEST(getLogger("S3QueueBucketHolder"), "Releasing bucket {}", bucket_info->bucket);
zk_client->remove(bucket_lock_path);
Coordination::Requests requests;
/// Check that bucket lock version has not changed
/// (which could happen if session had expired as bucket_lock_path is ephemeral node).
requests.push_back(zkutil::makeCheckRequest(bucket_info->bucket_lock_id_path, bucket_info->bucket_version));
/// Remove bucket lock.
requests.push_back(zkutil::makeRemoveRequest(bucket_info->bucket_lock_path, -1));
Coordination::Responses responses;
const auto code = zk_client->tryMulti(requests, responses);
zkutil::KeeperMultiException::check(code, requests, responses);
}
~BucketHolder()
@ -109,8 +136,7 @@ struct S3QueueOrderedFileMetadata::BucketHolder
}
private:
const Bucket bucket;
const std::string bucket_lock_path;
BucketInfoPtr bucket_info;
const zkutil::ZooKeeperPtr zk_client;
bool released = false;
};

View File

@ -60,13 +60,17 @@ size_t StorageS3QueueSource::FileIterator::estimatedKeysCount()
StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
{
ObjectInfoPtr object_info;
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info;
while (!shutdown_called)
{
auto val = metadata->useBucketsForProcessing()
? getNextKeyFromAcquiredBucket(processor)
: glob_iterator->next(processor);
if (metadata->useBucketsForProcessing())
std::tie(object_info, bucket_info) = getNextKeyFromAcquiredBucket(processor);
else
object_info = glob_iterator->next(processor);
if (!val)
if (!object_info)
return {};
if (shutdown_called)
@ -75,14 +79,14 @@ StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl
return {};
}
auto file_metadata = metadata->getFileMetadata(val->relative_path);
auto file_metadata = metadata->getFileMetadata(object_info->relative_path, bucket_info);
if (file_metadata->setProcessing())
return std::make_shared<S3QueueObjectInfo>(*val, file_metadata);
return std::make_shared<S3QueueObjectInfo>(*object_info, file_metadata);
}
return {};
}
StorageS3QueueSource::ObjectInfoPtr
std::pair<StorageS3QueueSource::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
{
/// We need this lock to maintain consistency between listing s3 directory
@ -138,7 +142,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Current bucket: {}, will process file: {}",
bucket, object_info->getFileName());
return object_info;
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
}
LOG_TEST(log, "Cache of bucket {} is empty", bucket);
@ -208,7 +212,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
LOG_TEST(log, "Acquired bucket: {}, will process file: {}",
bucket, object_info->getFileName());
return object_info;
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
}
}
@ -239,7 +243,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
continue;
}
/// Bucket is already acquired, process the file.
return object_info;
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
}
else
{
@ -255,7 +259,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
object_info = bucket_cache.keys.front();
bucket_cache.keys.pop_front();
}
return object_info;
return std::pair{object_info, bucket_holder_it->second->getBucketInfo()};
}
else
{

View File

@ -78,7 +78,7 @@ public:
bool iterator_finished = false;
std::unordered_map<size_t, S3QueueOrderedFileMetadata::BucketHolderPtr> bucket_holders;
ObjectInfoPtr getNextKeyFromAcquiredBucket(size_t processor);
std::pair<ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
};
StorageS3QueueSource(

View File

@ -1428,23 +1428,6 @@ def test_settings_check(started_cluster):
)
)
assert (
"Existing table metadata in ZooKeeper differs in s3queue_processing_threads_num setting. Stored in ZooKeeper: 5, local: 2"
in create_table(
started_cluster,
node_2,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 2,
"s3queue_buckets": 2,
},
expect_error=True,
)
)
node.query(f"DROP TABLE {table_name} SYNC")