From cfbf1cc1e2b0a2cba581a36f9fc8298b143adf89 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 15 May 2024 13:41:23 +0200 Subject: [PATCH] S3Queue rework ordered mode --- src/Storages/S3Queue/S3QueueFilesMetadata.cpp | 455 +++++++++--------- src/Storages/S3Queue/S3QueueFilesMetadata.h | 64 +-- src/Storages/S3Queue/S3QueueSettings.h | 5 +- src/Storages/S3Queue/S3QueueSource.cpp | 239 ++++++--- src/Storages/S3Queue/S3QueueSource.h | 28 +- src/Storages/S3Queue/S3QueueTableMetadata.cpp | 21 +- src/Storages/S3Queue/S3QueueTableMetadata.h | 2 +- src/Storages/S3Queue/StorageS3Queue.cpp | 42 +- src/Storages/S3Queue/StorageS3Queue.h | 2 - src/Storages/StorageS3.cpp | 30 +- src/Storages/StorageS3.h | 16 +- .../configs/zookeeper.xml | 15 + .../integration/test_storage_s3_queue/test.py | 97 ++-- 13 files changed, 570 insertions(+), 446 deletions(-) diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index e1583b8329c..fd293759462 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -54,6 +55,15 @@ namespace pcg64 rng(randomSeed()); return min + rng() % (max - min + 1); } + + size_t getBucketsNum(const S3QueueSettings & settings) + { + if (settings.s3queue_buckets) + return settings.s3queue_buckets; + if (settings.s3queue_processing_threads_num) + return settings.s3queue_processing_threads_num; + return 1; + } } std::unique_lock S3QueueFilesMetadata::LocalFileStatuses::lock() const @@ -133,12 +143,11 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con , max_loading_retries(settings_.s3queue_loading_retries.value) , min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value) , max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value) - , shards_num(settings_.s3queue_total_shards_num) - , threads_per_shard(settings_.s3queue_processing_threads_num) + , buckets_num(getBucketsNum(settings_)) + , zookeeper_path(zookeeper_path_) , zookeeper_processing_path(zookeeper_path_ / "processing") - , zookeeper_processed_path(zookeeper_path_ / "processed") , zookeeper_failed_path(zookeeper_path_ / "failed") - , zookeeper_shards_path(zookeeper_path_ / "shards") + , zookeeper_buckets_path(zookeeper_path_ / "buckets") , zookeeper_cleanup_lock_path(zookeeper_path_ / "cleanup_lock") , log(getLogger("StorageS3Queue(" + zookeeper_path_.string() + ")")) { @@ -148,6 +157,8 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con task->activate(); task->scheduleAfter(generateRescheduleInterval(min_cleanup_interval_ms, max_cleanup_interval_ms)); } + + LOG_TEST(log, "Using {} buckets", buckets_num); } S3QueueFilesMetadata::~S3QueueFilesMetadata() @@ -173,6 +184,57 @@ S3QueueFilesMetadata::FileStatusPtr S3QueueFilesMetadata::getFileStatus(const st return local_file_statuses.get(path, /* create */false); } +bool S3QueueFilesMetadata::useBucketsForProcessing() const +{ + return mode == S3QueueMode::ORDERED && (buckets_num > 1); +} + +S3QueueFilesMetadata::Bucket S3QueueFilesMetadata::getBucketForPath(const std::string & path) const +{ + return sipHash64(path) % buckets_num; +} + +std::string S3QueueFilesMetadata::getProcessorInfo(const std::string & processor_id) +{ + Poco::JSON::Object json; + json.set("hostname", DNSResolver::instance().getHostName()); + json.set("processor_id", processor_id); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); +} + +bool S3QueueFilesMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor) +{ + const auto zk_client = getZooKeeper(); + const auto bucket_lock_path = getBucketLockPath(bucket); + const auto processor_info = getProcessorInfo(processor); + + zk_client->createAncestors(bucket_lock_path); + + auto code = zk_client->tryCreate(bucket_lock_path, processor_info, zkutil::CreateMode::Ephemeral); + if (code == Coordination::Error::ZOK) + return true; + + if (code == Coordination::Error::ZNODEEXISTS) + return false; + + if (Coordination::isHardwareError(code)) + return false; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code)); +} + +void S3QueueFilesMetadata::releaseBucket(const Bucket & bucket) +{ + const auto zk_client = getZooKeeper(); + const auto bucket_lock_path = getBucketLockPath(bucket); + zk_client->remove(bucket_lock_path); /// TODO: Add version + LOG_TEST(log, "Released the bucket: {}", bucket); +} + std::string S3QueueFilesMetadata::getNodeName(const std::string & path) { /// Since with are dealing with paths in s3 which can have "/", @@ -184,6 +246,38 @@ std::string S3QueueFilesMetadata::getNodeName(const std::string & path) return toString(path_hash.get64()); } +std::string S3QueueFilesMetadata::getProcessingPath(const std::string & path_hash) const +{ + return zookeeper_processing_path / path_hash; +} + +std::string S3QueueFilesMetadata::getFailedPath(const std::string & path_hash) const +{ + return zookeeper_failed_path / path_hash; +} + + +std::string S3QueueFilesMetadata::getProcessedPath(const std::string & path, const std::string & path_hash) const +{ + if (mode == S3QueueMode::UNORDERED) + { + return zookeeper_path / "processed" / path_hash; + } + else if (useBucketsForProcessing()) + { + return zookeeper_path / "buckets" / toString(getBucketForPath(path)) / "processed"; + } + else + { + return zookeeper_path / "processed"; + } +} + +fs::path S3QueueFilesMetadata::getBucketLockPath(const Bucket & bucket) const +{ + return zookeeper_path / "buckets" / toString(bucket) / "lock"; +} + S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata( const std::string & path, const std::string & exception, @@ -204,126 +298,6 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata( return metadata; } -bool S3QueueFilesMetadata::isShardedProcessing() const -{ - return getProcessingIdsNum() > 1 && mode == S3QueueMode::ORDERED; -} - -size_t S3QueueFilesMetadata::registerNewShard() -{ - if (!isShardedProcessing()) - { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Cannot register a new shard, because processing is not sharded"); - } - - const auto zk_client = getZooKeeper(); - zk_client->createIfNotExists(zookeeper_shards_path, ""); - - std::string shard_node_path; - size_t shard_id = 0; - for (size_t i = 0; i < shards_num; ++i) - { - const auto node_path = getZooKeeperPathForShard(i); - auto err = zk_client->tryCreate(node_path, "", zkutil::CreateMode::Persistent); - if (err == Coordination::Error::ZOK) - { - shard_node_path = node_path; - shard_id = i; - break; - } - else if (err == Coordination::Error::ZNODEEXISTS) - continue; - else - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Unexpected error: {}", magic_enum::enum_name(err)); - } - - if (shard_node_path.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to register a new shard"); - - LOG_TRACE(log, "Using shard {} (zk node: {})", shard_id, shard_node_path); - return shard_id; -} - -std::string S3QueueFilesMetadata::getZooKeeperPathForShard(size_t shard_id) const -{ - return zookeeper_shards_path / ("shard" + toString(shard_id)); -} - -void S3QueueFilesMetadata::registerNewShard(size_t shard_id) -{ - if (!isShardedProcessing()) - { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Cannot register a new shard, because processing is not sharded"); - } - - const auto zk_client = getZooKeeper(); - const auto node_path = getZooKeeperPathForShard(shard_id); - zk_client->createAncestors(node_path); - - auto err = zk_client->tryCreate(node_path, "", zkutil::CreateMode::Persistent); - if (err != Coordination::Error::ZOK) - { - if (err == Coordination::Error::ZNODEEXISTS) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot register shard {}: already exists", shard_id); - else - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Unexpected error: {}", magic_enum::enum_name(err)); - } -} - -bool S3QueueFilesMetadata::isShardRegistered(size_t shard_id) -{ - const auto zk_client = getZooKeeper(); - const auto node_path = getZooKeeperPathForShard(shard_id); - return zk_client->exists(node_path); -} - -void S3QueueFilesMetadata::unregisterShard(size_t shard_id) -{ - if (!isShardedProcessing()) - { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Cannot unregister a shard, because processing is not sharded"); - } - - const auto zk_client = getZooKeeper(); - const auto node_path = getZooKeeperPathForShard(shard_id); - auto error_code = zk_client->tryRemove(node_path); - if (error_code != Coordination::Error::ZOK - && error_code != Coordination::Error::ZNONODE) - throw zkutil::KeeperException::fromPath(error_code, node_path); -} - -size_t S3QueueFilesMetadata::getProcessingIdsNum() const -{ - return shards_num * threads_per_shard; -} - -std::vector S3QueueFilesMetadata::getProcessingIdsForShard(size_t shard_id) const -{ - std::vector res(threads_per_shard); - std::iota(res.begin(), res.end(), shard_id * threads_per_shard); - return res; -} - -bool S3QueueFilesMetadata::isProcessingIdBelongsToShard(size_t id, size_t shard_id) const -{ - return shard_id * threads_per_shard <= id && id < (shard_id + 1) * threads_per_shard; -} - -size_t S3QueueFilesMetadata::getIdForProcessingThread(size_t thread_id, size_t shard_id) const -{ - return shard_id * threads_per_shard + thread_id; -} - -size_t S3QueueFilesMetadata::getProcessingIdForPath(const std::string & path) const -{ - return sipHash64(path) % getProcessingIdsNum(); -} - S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds); @@ -413,6 +387,8 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs { case SetFileProcessingResult::Success: { + LOG_TEST(log, "Path {} successfully acquired for processing", path); + std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Processing; @@ -426,18 +402,23 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs } case SetFileProcessingResult::AlreadyProcessed: { + LOG_TEST(log, "Path {} is already processed", path); + std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Processed; return {}; } case SetFileProcessingResult::AlreadyFailed: { + LOG_TEST(log, "Path {} is already failed and not retriable", path); + std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Failed; return {}; } case SetFileProcessingResult::ProcessingByOtherNode: { + LOG_TEST(log, "Path {} is being processing already", path); /// We cannot save any local state here, see comment above. return {}; } @@ -448,138 +429,135 @@ std::pair S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status) { - /// In one zookeeper transaction do the following: - /// 1. check that corresponding persistent nodes do not exist in processed/ and failed/; - /// 2. create an ephemenral node in /processing if it does not exist; - /// Return corresponding status if any of the step failed. - const auto node_name = getNodeName(path); - const auto zk_client = getZooKeeper(); + const auto processed_node_path = getProcessedPath(path, node_name); + const auto processing_node_path = getProcessingPath(node_name); + const auto failed_node_path = getFailedPath(node_name); + + /// In one zookeeper transaction do the following: + enum RequestType + { + /// node_name is not within processed persistent nodes + PROCESSED_PATH_DOESNT_EXIST = 0, + /// node_name is not within failed persistent nodes + FAILED_PATH_DOESNT_EXIST = 2, + /// node_name ephemeral processing node was successfully created + CREATED_PROCESSING_PATH = 4, + }; + auto node_metadata = createNodeMetadata(path); node_metadata.processing_id = getRandomASCIIString(10); Coordination::Requests requests; - - requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, "", zkutil::CreateMode::Persistent)); - requests.push_back(zkutil::makeRemoveRequest(zookeeper_processed_path / node_name, -1)); - - requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent)); - requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1)); - - requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral)); - Coordination::Responses responses; - auto code = zk_client->tryMulti(requests, responses); + auto is_request_failed = [&](RequestType type) { return responses[type]->error != Coordination::Error::ZOK; }; + + requests.push_back(zkutil::makeCreateRequest(processed_node_path, "", zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeRemoveRequest(processed_node_path, -1)); + + requests.push_back(zkutil::makeCreateRequest(failed_node_path, "", zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeRemoveRequest(failed_node_path, -1)); + + requests.push_back(zkutil::makeCreateRequest(processing_node_path, node_metadata.toString(), zkutil::CreateMode::Ephemeral)); + + const auto zk_client = getZooKeeper(); + const auto code = zk_client->tryMulti(requests, responses); if (code == Coordination::Error::ZOK) { auto holder = std::make_unique( - node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client, log); + node_metadata.processing_id, path, processing_node_path, file_status, zk_client, log); return std::pair{SetFileProcessingResult::Success, std::move(holder)}; } - if (responses[0]->error != Coordination::Error::ZOK) - { + if (is_request_failed(PROCESSED_PATH_DOESNT_EXIST)) return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr}; - } - else if (responses[2]->error != Coordination::Error::ZOK) - { + + if (is_request_failed(FAILED_PATH_DOESNT_EXIST)) return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr}; - } - else if (responses[4]->error != Coordination::Error::ZOK) - { + + if (is_request_failed(CREATED_PROCESSING_PATH)) return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr}; - } - else - { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", magic_enum::enum_name(code)); - } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", magic_enum::enum_name(code)); } std::pair S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status) { - /// Same as for Unordered mode. - /// The only difference is the check if the file is already processed. - /// For Ordered mode we do not keep a separate /processed/hash_node for each file - /// but instead we only keep a maximum processed file - /// (since all files are ordered and new files have a lexically bigger name, it makes sense). - const auto node_name = getNodeName(path); - const auto zk_client = getZooKeeper(); + const auto processed_node_path = getProcessedPath(path, node_name); + const auto processing_node_path = getProcessingPath(node_name); + const auto failed_node_path = getFailedPath(node_name); + + /// In one zookeeper transaction do the following: + enum RequestType + { + /// node_name is not within failed persistent nodes + FAILED_PATH_DOESNT_EXIST = 0, + /// node_name ephemeral processing node was successfully created + CREATED_PROCESSING_PATH = 2, + /// max_processed_node version did not change + CHECKED_MAX_PROCESSED_PATH = 3, + }; + auto node_metadata = createNodeMetadata(path); node_metadata.processing_id = getRandomASCIIString(10); + const auto zk_client = getZooKeeper(); while (true) { - /// Get a /processed node content - max_processed path. - /// Compare our path to it. - /// If file is not yet processed, check corresponding /failed node and try create /processing node - /// and in the same zookeeper transaction also check that /processed node did not change - /// in between, e.g. that stat.version remained the same. - /// If the version did change - retry (since we cannot do Get and Create requests - /// in the same zookeeper transaction, so we use a while loop with tries). - - auto processed_node = isShardedProcessing() - ? zookeeper_processed_path / toString(getProcessingIdForPath(path)) - : zookeeper_processed_path; - - NodeMetadata processed_node_metadata; - Coordination::Stat processed_node_stat; + std::optional max_processed_node_version; std::string data; - auto processed_node_exists = zk_client->tryGet(processed_node, data, &processed_node_stat); - if (processed_node_exists && !data.empty()) - processed_node_metadata = NodeMetadata::fromString(data); - - auto max_processed_file_path = processed_node_metadata.file_path; - if (!max_processed_file_path.empty() && path <= max_processed_file_path) + Coordination::Stat processed_node_stat; + if (zk_client->tryGet(processed_node_path, data, &processed_node_stat) && !data.empty()) { - LOG_TEST(log, "File {} is already processed, max processed file: {}", path, max_processed_file_path); - return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr}; + auto processed_node_metadata = NodeMetadata::fromString(data); + LOG_TEST(log, "Current max processed file {} from path: {}", processed_node_metadata.file_path, processed_node_path); + + if (!processed_node_metadata.file_path.empty() && path <= processed_node_metadata.file_path) + { + LOG_TEST(log, "File {} is already processed, max processed file: {}", + path, processed_node_metadata.file_path); + return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr}; + } + max_processed_node_version = processed_node_stat.version; } Coordination::Requests requests; - requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent)); - requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1)); + Coordination::Responses responses; + auto is_request_failed = [&](RequestType type) { return responses[type]->error != Coordination::Error::ZOK; }; - requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral)); + requests.push_back(zkutil::makeCreateRequest(failed_node_path, "", zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeRemoveRequest(failed_node_path, -1)); - if (processed_node_exists) + requests.push_back(zkutil::makeCreateRequest(processing_node_path, node_metadata.toString(), zkutil::CreateMode::Ephemeral)); + + if (max_processed_node_version.has_value()) { - requests.push_back(zkutil::makeCheckRequest(processed_node, processed_node_stat.version)); + requests.push_back(zkutil::makeCheckRequest(processed_node_path, max_processed_node_version.value())); } else { - requests.push_back(zkutil::makeCreateRequest(processed_node, "", zkutil::CreateMode::Persistent)); - requests.push_back(zkutil::makeRemoveRequest(processed_node, -1)); + requests.push_back(zkutil::makeCreateRequest(processed_node_path, "", zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeRemoveRequest(processed_node_path, -1)); } - Coordination::Responses responses; - auto code = zk_client->tryMulti(requests, responses); + const auto code = zk_client->tryMulti(requests, responses); if (code == Coordination::Error::ZOK) { - auto holder = std::make_unique( - node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client, log); - - LOG_TEST(log, "File {} is ready to be processed", path); + auto holder = std::make_unique(node_metadata.processing_id, path, processing_node_path, file_status, zk_client, log); return std::pair{SetFileProcessingResult::Success, std::move(holder)}; } - if (responses[0]->error != Coordination::Error::ZOK) - { - LOG_TEST(log, "Skipping file `{}`: failed", path); + if (is_request_failed(FAILED_PATH_DOESNT_EXIST)) return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr}; - } - else if (responses[2]->error != Coordination::Error::ZOK) - { - LOG_TEST(log, "Skipping file `{}`: already processing", path); + + if (is_request_failed(CREATED_PROCESSING_PATH)) return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr}; - } - else - { - LOG_TEST(log, "Version of max processed file changed. Retrying the check for file `{}`", path); - } + + LOG_TEST(log, "Version of max processed file changed. Retrying the check for file `{}`", path); } } @@ -615,6 +593,14 @@ void S3QueueFilesMetadata::setFileProcessed(ProcessingNodeHolderPtr holder) ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles); } +void S3QueueFilesMetadata::setFileProcessed(const std::string & path) +{ + if (mode != S3QueueMode::ORDERED) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode"); + + setFileProcessedForOrderedModeImpl(path, nullptr); +} + void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder) { /// Create a persistent node in /processed and remove ephemeral node from /processing. @@ -623,14 +609,15 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = getZooKeeper(); + const auto processed_node_path = getProcessedPath(path, node_name); Coordination::Requests requests; - requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, node_metadata, zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeCreateRequest(processed_node_path, node_metadata, zkutil::CreateMode::Persistent)); Coordination::Responses responses; if (holder->remove(&requests, &responses)) { - LOG_TRACE(log, "Moved file `{}` to processed", path); + LOG_TRACE(log, "Moved file `{}` to processed (node path: {})", path, processed_node_path); if (max_loading_retries) zk_client->tryRemove(zookeeper_failed_path / (node_name + ".retriable"), -1); return; @@ -643,29 +630,24 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(ProcessingNodeHolder } LOG_WARNING(log, - "Cannot set file ({}) as processed since ephemeral node in /processing" + "Cannot set file ({}) as processed since ephemeral node in /processing (code: {})" "does not exist with expected id, " - "this could be a result of expired zookeeper session", path); + "this could be a result of expired zookeeper session", path, responses[1]->error); } - void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder) { - auto processed_node_path = isShardedProcessing() - ? zookeeper_processed_path / toString(getProcessingIdForPath(holder->path)) - : zookeeper_processed_path; - - setFileProcessedForOrderedModeImpl(holder->path, holder, processed_node_path); + setFileProcessedForOrderedModeImpl(holder->path, holder); } -void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( - const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path) +void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl(const std::string & path, ProcessingNodeHolderPtr holder) { /// Update a persistent node in /processed and remove ephemeral node from /processing. const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = getZooKeeper(); + const auto processed_node_path = getProcessedPath(path, node_name); LOG_TRACE(log, "Setting file `{}` as processed (at {})", path, processed_node_path); while (true) @@ -695,6 +677,13 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( Coordination::Responses responses; if (holder) { + // if (useBucketsForProcessing()) + // { + // auto bucket_lock_path = getBucketLockPath(getBucketForPath(path)); + // /// TODO: add version + // requests.push_back(zkutil::makeCheckRequest(bucket_lock_path, -1)); + // } + if (holder->remove(&requests, &responses)) { LOG_TRACE(log, "Moved file `{}` to processed", path); @@ -728,22 +717,6 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedModeImpl( } } -void S3QueueFilesMetadata::setFileProcessed(const std::string & path, size_t shard_id) -{ - if (mode != S3QueueMode::ORDERED) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Can set file as preprocessed only for Ordered mode"); - - if (isShardedProcessing()) - { - for (const auto & processor : getProcessingIdsForShard(shard_id)) - setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path / toString(processor)); - } - else - { - setFileProcessedForOrderedModeImpl(path, nullptr, zookeeper_processed_path); - } -} - void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const String & exception_message) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); @@ -767,6 +740,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S const auto node_name = getNodeName(path); auto node_metadata = createNodeMetadata(path, exception_message); const auto zk_client = getZooKeeper(); + const auto processing_node_path = getProcessingPath(node_name); /// Is file retriable? if (max_loading_retries == 0) @@ -830,7 +804,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S /// Make a persistent node /failed/node_hash, remove /failed/node_hash.retriable node and node in /processing. Coordination::Requests requests; - requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1)); + requests.push_back(zkutil::makeRemoveRequest(processing_node_path, -1)); requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name_with_retriable_suffix, stat.version)); requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, @@ -849,7 +823,7 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S /// File is still retriable, update retries count and remove node from /processing. Coordination::Requests requests; - requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1)); + requests.push_back(zkutil::makeRemoveRequest(processing_node_path, -1)); if (node_metadata.retries == 0) { requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name_with_retriable_suffix, @@ -980,6 +954,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueCleanupMaxSetSizeOrTTLMicroseconds); const auto zk_client = getZooKeeper(); + const std::string zookeeper_processed_path = zookeeper_path / "processed"; Strings processed_nodes; auto code = zk_client->tryGetChildren(zookeeper_processed_path, processed_nodes); @@ -987,7 +962,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() { if (code == Coordination::Error::ZNONODE) { - LOG_TEST(log, "Path {} does not exist", zookeeper_processed_path.string()); + LOG_TEST(log, "Path {} does not exist", zookeeper_processed_path); } else throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", magic_enum::enum_name(code)); @@ -1051,7 +1026,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() for (const auto & node : processed_nodes) { - const std::string path = zookeeper_processed_path / node; + const std::string path = getProcessedPath("", node); /// TODO: try { std::string metadata_str; diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index e26af1d25c5..c90d599e837 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -18,8 +18,14 @@ class StorageS3Queue; /** * A class for managing S3Queue metadata in zookeeper, e.g. * the following folders: - * - /processing * - /processed + * - /processing + * - /failed + * + * In case we use buckets for processing for Ordered mode, the structure looks like: + * - /buckets//processed -- persistent node, information about last processed file. + * - /buckets//lock -- ephemeral node, used for acquiring bucket lock. + * - /processing * - /failed * * Depending on S3Queue processing mode (ordered or unordered) @@ -37,12 +43,15 @@ public: class ProcessingNodeHolder; using ProcessingNodeHolderPtr = std::shared_ptr; + using Bucket = size_t; + using Processor = std::string; + S3QueueFilesMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_); ~S3QueueFilesMetadata(); void setFileProcessed(ProcessingNodeHolderPtr holder); - void setFileProcessed(const std::string & path, size_t shard_id); + void setFileProcessed(const std::string & path); void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message); @@ -81,37 +90,13 @@ public: void deactivateCleanupTask(); - /// Should the table use sharded processing? - /// We use sharded processing for Ordered mode of S3Queue table. - /// It allows to parallelize processing within a single server - /// and to allow distributed processing. - bool isShardedProcessing() const; - - /// Register a new shard for processing. - /// Return a shard id of registered shard. - size_t registerNewShard(); - /// Register a new shard for processing by given id. - /// Throws exception if shard by this id is already registered. - void registerNewShard(size_t shard_id); - /// Unregister shard from keeper. - void unregisterShard(size_t shard_id); - bool isShardRegistered(size_t shard_id); - - /// Total number of processing ids. - /// A processing id identifies a single processing thread. - /// There might be several processing ids per shard. - size_t getProcessingIdsNum() const; - /// Get processing ids identified with requested shard. - std::vector getProcessingIdsForShard(size_t shard_id) const; - /// Check if given processing id belongs to a given shard. - bool isProcessingIdBelongsToShard(size_t id, size_t shard_id) const; - /// Get a processing id for processing thread by given thread id. - /// thread id is a value in range [0, threads_per_shard]. - size_t getIdForProcessingThread(size_t thread_id, size_t shard_id) const; - + bool useBucketsForProcessing() const; /// Calculate which processing id corresponds to a given file path. /// The file will be processed by a thread related to this processing id. - size_t getProcessingIdForPath(const std::string & path) const; + Bucket getBucketForPath(const std::string & path) const; + + bool tryAcquireBucket(const Bucket & bucket, const Processor & processor); + void releaseBucket(const Bucket & bucket); private: const S3QueueMode mode; @@ -120,13 +105,12 @@ private: const UInt64 max_loading_retries; const size_t min_cleanup_interval_ms; const size_t max_cleanup_interval_ms; - const size_t shards_num; - const size_t threads_per_shard; + const size_t buckets_num; + const fs::path zookeeper_path; const fs::path zookeeper_processing_path; - const fs::path zookeeper_processed_path; const fs::path zookeeper_failed_path; - const fs::path zookeeper_shards_path; + const fs::path zookeeper_buckets_path; const fs::path zookeeper_cleanup_lock_path; LoggerPtr log; @@ -135,15 +119,19 @@ private: BackgroundSchedulePool::TaskHolder task; std::string getNodeName(const std::string & path); + fs::path getBucketLockPath(const Bucket & bucket) const; + std::string getProcessorInfo(const std::string & processor_id); + + std::string getProcessedPath(const std::string & path, const std::string & path_hash) const; + std::string getProcessingPath(const std::string & path_hash) const; + std::string getFailedPath(const std::string & path_hash) const; zkutil::ZooKeeperPtr getZooKeeper() const; void setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder); void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder); - std::string getZooKeeperPathForShard(size_t shard_id) const; - void setFileProcessedForOrderedModeImpl( - const std::string & path, ProcessingNodeHolderPtr holder, const std::string & processed_node_path); + void setFileProcessedForOrderedModeImpl(const std::string & path, ProcessingNodeHolderPtr holder); enum class SetFileProcessingResult : uint8_t { diff --git a/src/Storages/S3Queue/S3QueueSettings.h b/src/Storages/S3Queue/S3QueueSettings.h index c26e973a1c0..c486a7fbb5d 100644 --- a/src/Storages/S3Queue/S3QueueSettings.h +++ b/src/Storages/S3Queue/S3QueueSettings.h @@ -13,7 +13,7 @@ class ASTStorage; #define S3QUEUE_RELATED_SETTINGS(M, ALIAS) \ M(S3QueueMode, \ mode, \ - S3QueueMode::ORDERED, \ + S3QueueMode::UNORDERED, \ "With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \ "With ordered mode, only the max name of the successfully consumed file stored.", \ 0) \ @@ -30,8 +30,7 @@ class ASTStorage; M(UInt32, s3queue_tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \ M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \ M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \ - M(UInt32, s3queue_total_shards_num, 1, "Value 0 means disabled", 0) \ - M(UInt32, s3queue_current_shard_num, 0, "", 0) \ + M(UInt32, s3queue_buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \ #define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \ S3QUEUE_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index b5bee2cc8da..f60d4e18de3 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -43,85 +43,207 @@ StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo( StorageS3QueueSource::FileIterator::FileIterator( std::shared_ptr metadata_, std::unique_ptr glob_iterator_, - size_t current_shard_, std::atomic & shutdown_called_, LoggerPtr logger_) : metadata(metadata_) , glob_iterator(std::move(glob_iterator_)) + , current_processor(getRandomASCIIString(10)) /// TODO: add server uuid? , shutdown_called(shutdown_called_) , log(logger_) - , sharded_processing(metadata->isShardedProcessing()) - , current_shard(current_shard_) { - if (sharded_processing) +} + +StorageS3QueueSource::FileIterator::~FileIterator() +{ + releaseAndResetCurrentBucket(); +} + +void StorageS3QueueSource::FileIterator::releaseAndResetCurrentBucket() +{ + try { - for (const auto & id : metadata->getProcessingIdsForShard(current_shard)) - sharded_keys.emplace(id, std::deque{}); + if (current_bucket.has_value()) + { + metadata->releaseBucket(current_bucket.value()); + current_bucket.reset(); + } + } + catch (const zkutil::KeeperException & e) + { + if (e.code == Coordination::Error::ZSESSIONEXPIRED) + { + LOG_TRACE(log, "Session expired while releasing bucket"); + } + if (e.code == Coordination::Error::ZNONODE) + { + LOG_TRACE(log, "Bucket {} does not exist. " + "This could happen because of an exprired session", + current_bucket.value()); + } + else + { + LOG_ERROR(log, "Got unexpected exception while releasing bucket: {}", + getCurrentExceptionMessage(true)); + chassert(false); + } + current_bucket.reset(); } } -StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(size_t idx) +StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket() { - while (!shutdown_called) + /// We need this lock to maintain consistency between listing s3 directory + /// and getting/putting result into listed_keys_cache. + std::lock_guard lock(buckets_mutex); + + while (true) { - KeyWithInfoPtr val{nullptr}; - + /// Each processing thread gets next path from glob_iterator->next() + /// and checks if corresponding bucket is already acquired by someone. + /// In case it is already acquired, they put the key into listed_keys_cache, + /// so that the thread who acquired the bucket will be able to see + /// those keys without the need to list s3 directory once again. + if (current_bucket.has_value()) { - std::unique_lock lk(sharded_keys_mutex, std::defer_lock); - if (sharded_processing) + auto it = listed_keys_cache.find(current_bucket.value()); + if (it != listed_keys_cache.end()) { - /// To make sure order on keys in each shard in sharded_keys - /// we need to check sharded_keys and to next() under lock. - lk.lock(); + /// `bucket_keys` -- keys we iterated so far and which were not taken for processing. + /// `processor` -- processor id of the thread which has acquired the bucket. + auto & [bucket_keys, processor] = it->second; - if (auto it = sharded_keys.find(idx); it != sharded_keys.end()) + /// Check correctness just in case. + if (!processor.has_value() || processor.value() != current_processor) { - auto & keys = it->second; - if (!keys.empty()) - { - val = keys.front(); - keys.pop_front(); - chassert(idx == metadata->getProcessingIdForPath(val->key)); - } + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected current processor {} to be equal to bucket's {} processor, " + "but have {}", current_processor, current_bucket.value(), + processor.has_value() ? processor.value() : Processor{}); } - else + + /// Take next key to process + if (!bucket_keys.empty()) { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Processing id {} does not exist (Expected ids: {})", - idx, fmt::join(metadata->getProcessingIdsForShard(current_shard), ", ")); + /// Take the key from the front, the order is important. + auto key_with_info = bucket_keys.front(); + bucket_keys.pop_front(); + return key_with_info; } + + /// No more keys in bucket, remove it from cache. + listed_keys_cache.erase(it); } - if (!val) + if (iterator_finished) { - val = glob_iterator->next(); - if (val && sharded_processing) - { - const auto processing_id_for_key = metadata->getProcessingIdForPath(val->key); - if (idx != processing_id_for_key) - { - if (metadata->isProcessingIdBelongsToShard(processing_id_for_key, current_shard)) - { - LOG_TEST(log, "Putting key {} into queue of processor {} (total: {})", - val->key, processing_id_for_key, sharded_keys.size()); + /// Bucket is fully processed - release the bucket. + releaseAndResetCurrentBucket(); + } + } + /// If processing thread has already acquired some bucket + /// and while listing s3 directory gets a key which is in a different bucket, + /// it puts the key into listed_keys_cache to allow others to process it, + /// because one processing thread can acquire only one bucket at a time. + /// Once a thread is finished with its acquired bucket, it checks listed_keys_cache + /// to see if there are keys from buckets not acquired by anyone. + if (!current_bucket.has_value()) + { + for (auto it = listed_keys_cache.begin(); it != listed_keys_cache.end();) + { + auto & [bucket, bucket_info] = *it; + auto & [bucket_keys, processor] = bucket_info; - if (auto it = sharded_keys.find(processing_id_for_key); it != sharded_keys.end()) - { - it->second.push_back(val); - } - else - { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Processing id {} does not exist (Expected ids: {})", - processing_id_for_key, fmt::join(metadata->getProcessingIdsForShard(current_shard), ", ")); - } - } - continue; - } + if (processor.has_value()) + { + LOG_TEST(log, "Bucket {} is already locked for processing by {} (keys: {})", + bucket, processor.value(), bucket_keys.size()); + ++it; + continue; } + + if (bucket_keys.empty()) + { + /// No more keys in bucket, remove it from cache. + /// We still might add new keys to this bucket if !iterator_finished. + it = listed_keys_cache.erase(it); + continue; + } + + if (!metadata->tryAcquireBucket(bucket, current_processor)) + { + LOG_TEST(log, "Bucket {} is already locked for processing (keys: {})", + bucket, bucket_keys.size()); + ++it; + continue; + } + + current_bucket = bucket; + processor = current_processor; + + /// Take the key from the front, the order is important. + auto key_with_info = bucket_keys.front(); + bucket_keys.pop_front(); + return key_with_info; } } + if (iterator_finished) + { + LOG_TEST(log, "Reached the end of file iterator and nothing left in keys cache"); + return {}; + } + + auto key_with_info = glob_iterator->next(); + if (key_with_info) + { + const auto bucket = metadata->getBucketForPath(key_with_info->key); + + LOG_TEST(log, "Found next file: {}, bucket: {}, current bucket: {}", + key_with_info->getFileName(), bucket, + current_bucket.has_value() ? toString(current_bucket.value()) : "None"); + + if (current_bucket.has_value()) + { + if (current_bucket.value() != bucket) + { + listed_keys_cache[bucket].keys.emplace_back(key_with_info); + continue; + } + return key_with_info; + } + else + { + if (!metadata->tryAcquireBucket(bucket, current_processor)) + { + LOG_TEST(log, "Bucket {} is already locked for processing", bucket); + continue; + } + + current_bucket = bucket; + return key_with_info; + } + } + else + { + releaseAndResetCurrentBucket(); + + LOG_TEST(log, "Reached the end of file iterator"); + iterator_finished = true; + + if (listed_keys_cache.empty()) + return {}; + else + continue; + } + } +} + +StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next() +{ + while (!shutdown_called) + { + auto val = metadata->useBucketsForProcessing() ? getNextKeyFromAcquiredBucket() : glob_iterator->next(); if (!val) return {}; @@ -138,19 +260,12 @@ StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(si return {}; } - LOG_TEST(log, "Checking if can process key {} for processing_id {}", val->key, idx); + LOG_TEST(log, "Checking if can process key {}", val->key); if (processing_holder) { return std::make_shared(val->key, val->info, processing_holder); } - else if (sharded_processing - && metadata->getFileStatus(val->key)->state == S3QueueFilesMetadata::FileStatus::State::Processing) - { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "File {} is processing by someone else in sharded processing. " - "It is a bug", val->key); - } } return {}; } @@ -165,7 +280,6 @@ StorageS3QueueSource::StorageS3QueueSource( const Block & header_, std::unique_ptr internal_source_, std::shared_ptr files_metadata_, - size_t processing_id_, const S3QueueAction & action_, RemoveFileFunc remove_file_func_, const NamesAndTypesList & requested_virtual_columns_, @@ -179,7 +293,6 @@ StorageS3QueueSource::StorageS3QueueSource( , WithContext(context_) , name(std::move(name_)) , action(action_) - , processing_id(processing_id_) , files_metadata(files_metadata_) , internal_source(std::move(internal_source_)) , requested_virtual_columns(requested_virtual_columns_) @@ -207,7 +320,7 @@ void StorageS3QueueSource::lazyInitialize() if (initialized) return; - internal_source->lazyInitialize(processing_id); + internal_source->lazyInitialize(); reader = std::move(internal_source->reader); if (reader) reader_future = std::move(internal_source->reader_future); @@ -335,7 +448,7 @@ Chunk StorageS3QueueSource::generate() /// Even if task is finished the thread may be not freed in pool. /// So wait until it will be freed before scheduling a new task. internal_source->create_reader_pool.wait(); - reader_future = internal_source->createReaderAsync(processing_id); + reader_future = internal_source->createReaderAsync(); } return {}; diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index a657459ed9d..3056ccecb11 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -41,28 +41,42 @@ public: FileIterator( std::shared_ptr metadata_, std::unique_ptr glob_iterator_, - size_t current_shard_, std::atomic & shutdown_called_, LoggerPtr logger_); + ~FileIterator() override; + /// Note: /// List results in s3 are always returned in UTF-8 binary order. /// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html) - KeyWithInfoPtr next(size_t idx) override; + KeyWithInfoPtr next() override; size_t estimatedKeysCount() override; private: + using Bucket = S3QueueFilesMetadata::Bucket; + using Processor = S3QueueFilesMetadata::Processor; + const std::shared_ptr metadata; const std::unique_ptr glob_iterator; + const Processor current_processor; + std::atomic & shutdown_called; std::mutex mutex; LoggerPtr log; - const bool sharded_processing; - const size_t current_shard; - std::unordered_map> sharded_keys; - std::mutex sharded_keys_mutex; + std::optional current_bucket; + std::mutex buckets_mutex; + struct ListedKeys + { + std::deque keys; + std::optional processor; + }; + std::unordered_map listed_keys_cache; + bool iterator_finished = false; + + KeyWithInfoPtr getNextKeyFromAcquiredBucket(); + void releaseAndResetCurrentBucket(); }; StorageS3QueueSource( @@ -70,7 +84,6 @@ public: const Block & header_, std::unique_ptr internal_source_, std::shared_ptr files_metadata_, - size_t processing_id_, const S3QueueAction & action_, RemoveFileFunc remove_file_func_, const NamesAndTypesList & requested_virtual_columns_, @@ -92,7 +105,6 @@ public: private: const String name; const S3QueueAction action; - const size_t processing_id; const std::shared_ptr files_metadata; const std::shared_ptr internal_source; const NamesAndTypesList requested_virtual_columns; diff --git a/src/Storages/S3Queue/S3QueueTableMetadata.cpp b/src/Storages/S3Queue/S3QueueTableMetadata.cpp index 1830bac4743..6e42831ee43 100644 --- a/src/Storages/S3Queue/S3QueueTableMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueTableMetadata.cpp @@ -38,11 +38,15 @@ S3QueueTableMetadata::S3QueueTableMetadata( const StorageInMemoryMetadata & storage_metadata) { format_name = configuration.format; + LOG_TEST(getLogger("KSSENII"), "KSSENII SEEEE: {}", engine_settings.after_processing.value); after_processing = engine_settings.after_processing.toString(); + LOG_TEST(getLogger("KSSENII"), "KSSENII SEE 2: {}", after_processing); mode = engine_settings.mode.toString(); + LOG_TEST(getLogger("KSSENII"), "KSSENII SEE 2: {}", mode); s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit; s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec; - s3queue_total_shards_num = engine_settings.s3queue_total_shards_num; + s3queue_buckets = engine_settings.s3queue_buckets; + LOG_TEST(getLogger("KSSENII"), "KSSENII SEE 2: {}", s3queue_buckets); s3queue_processing_threads_num = engine_settings.s3queue_processing_threads_num; columns = storage_metadata.getColumns().toString(); } @@ -54,7 +58,7 @@ String S3QueueTableMetadata::toString() const json.set("mode", mode); json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit); json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec); - json.set("s3queue_total_shards_num", s3queue_total_shards_num); + json.set("s3queue_buckets", s3queue_buckets); json.set("s3queue_processing_threads_num", s3queue_processing_threads_num); json.set("format_name", format_name); json.set("columns", columns); @@ -77,10 +81,10 @@ void S3QueueTableMetadata::read(const String & metadata_str) format_name = json->getValue("format_name"); columns = json->getValue("columns"); - if (json->has("s3queue_total_shards_num")) - s3queue_total_shards_num = json->getValue("s3queue_total_shards_num"); + if (json->has("s3queue_buckets")) + s3queue_buckets = json->getValue("s3queue_buckets"); else - s3queue_total_shards_num = 1; + s3queue_buckets = 1; if (json->has("s3queue_processing_threads_num")) s3queue_processing_threads_num = json->getValue("s3queue_processing_threads_num"); @@ -148,14 +152,13 @@ void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata from_zk.s3queue_processing_threads_num, s3queue_processing_threads_num); } - if (s3queue_total_shards_num != from_zk.s3queue_total_shards_num) + if (s3queue_buckets != from_zk.s3queue_buckets) { throw Exception( ErrorCodes::METADATA_MISMATCH, - "Existing table metadata in ZooKeeper differs in s3queue_total_shards_num setting. " + "Existing table metadata in ZooKeeper differs in s3queue_buckets setting. " "Stored in ZooKeeper: {}, local: {}", - from_zk.s3queue_total_shards_num, - s3queue_total_shards_num); + from_zk.s3queue_buckets, s3queue_buckets); } } } diff --git a/src/Storages/S3Queue/S3QueueTableMetadata.h b/src/Storages/S3Queue/S3QueueTableMetadata.h index 84087f72a6a..b32478dac62 100644 --- a/src/Storages/S3Queue/S3QueueTableMetadata.h +++ b/src/Storages/S3Queue/S3QueueTableMetadata.h @@ -23,7 +23,7 @@ struct S3QueueTableMetadata String mode; UInt64 s3queue_tracked_files_limit = 0; UInt64 s3queue_tracked_file_ttl_sec = 0; - UInt64 s3queue_total_shards_num = 1; + UInt64 s3queue_buckets = 0; UInt64 s3queue_processing_threads_num = 1; S3QueueTableMetadata() = default; diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index c3a772e532c..cf59bbd46dd 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -107,18 +107,19 @@ StorageS3Queue::StorageS3Queue( const String & comment, ContextPtr context_, std::optional format_settings_, - ASTStorage * engine_args, + ASTStorage * /* engine_args */, LoadingStrictnessLevel mode) : IStorage(table_id_) , WithContext(context_) , s3queue_settings(std::move(s3queue_settings_)) , zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings)) - , after_processing(s3queue_settings->after_processing) , configuration{configuration_} , format_settings(format_settings_) , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms) , log(getLogger("StorageS3Queue (" + table_id_.getFullTableName() + ")")) { + LOG_TEST(log, "KSSENII SEE: {}", s3queue_settings->after_processing.value); + if (configuration.url.key.empty()) { configuration.url.key = "/*"; @@ -135,7 +136,7 @@ StorageS3Queue::StorageS3Queue( if (mode == LoadingStrictnessLevel::CREATE && !context_->getSettingsRef().s3queue_allow_experimental_sharded_mode && s3queue_settings->mode == S3QueueMode::ORDERED - && (s3queue_settings->s3queue_total_shards_num > 1 || s3queue_settings->s3queue_processing_threads_num > 1)) + && (s3queue_settings->s3queue_buckets > 1 || s3queue_settings->s3queue_processing_threads_num > 1)) { throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue sharded mode is not allowed. To enable use `s3queue_allow_experimental_sharded_mode`"); } @@ -178,21 +179,9 @@ StorageS3Queue::StorageS3Queue( /// The ref count is decreased when StorageS3Queue::drop() method is called. files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings); - if (files_metadata->isShardedProcessing()) - { - if (!s3queue_settings->s3queue_current_shard_num.changed) - { - s3queue_settings->s3queue_current_shard_num = static_cast(files_metadata->registerNewShard()); - engine_args->settings->changes.setSetting("s3queue_current_shard_num", s3queue_settings->s3queue_current_shard_num.value); - } - else if (!files_metadata->isShardRegistered(s3queue_settings->s3queue_current_shard_num)) - { - files_metadata->registerNewShard(s3queue_settings->s3queue_current_shard_num); - } - } if (s3queue_settings->mode == S3QueueMode::ORDERED && !s3queue_settings->s3queue_last_processed_path.value.empty()) { - files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value, s3queue_settings->s3queue_current_shard_num); + files_metadata->setFileProcessed(s3queue_settings->s3queue_last_processed_path.value); } } @@ -216,13 +205,6 @@ void StorageS3Queue::shutdown(bool is_drop) if (files_metadata) { files_metadata->deactivateCleanupTask(); - - if (is_drop && files_metadata->isShardedProcessing()) - { - files_metadata->unregisterShard(s3queue_settings->s3queue_current_shard_num); - LOG_TRACE(log, "Unregistered shard {} from zookeeper", s3queue_settings->s3queue_current_shard_num); - } - files_metadata.reset(); } LOG_TRACE(log, "Shut down storage"); @@ -343,7 +325,6 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const pipes.emplace_back(storage->createSource( info, iterator, - storage->files_metadata->getIdForProcessingThread(i, storage->s3queue_settings->s3queue_current_shard_num), max_block_size, context)); auto pipe = Pipe::unitePipes(std::move(pipes)); @@ -359,7 +340,6 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const std::shared_ptr StorageS3Queue::createSource( const ReadFromFormatInfo & info, std::shared_ptr file_iterator, - size_t processing_id, size_t max_block_size, ContextPtr local_context) { @@ -405,7 +385,7 @@ std::shared_ptr StorageS3Queue::createSource( auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr; return std::make_shared( getName(), info.source_header, std::move(internal_source), - files_metadata, processing_id, after_processing, file_deleter, info.requested_virtual_columns, + files_metadata, s3queue_settings->after_processing, file_deleter, info.requested_virtual_columns, local_context, shutdown_called, table_is_being_dropped, s3_queue_log, getStorageID(), log); } @@ -508,10 +488,7 @@ bool StorageS3Queue::streamToViews() pipes.reserve(s3queue_settings->s3queue_processing_threads_num); for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i) { - auto source = createSource( - read_from_format_info, file_iterator, files_metadata->getIdForProcessingThread(i, s3queue_settings->s3queue_current_shard_num), - DBMS_DEFAULT_BUFFER_SIZE, s3queue_context); - + auto source = createSource(read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context); pipes.emplace_back(std::move(source)); } auto pipe = Pipe::unitePipes(std::move(pipes)); @@ -551,6 +528,8 @@ void StorageS3Queue::createOrCheckMetadata(const StorageInMemoryMetadata & stora if (zookeeper->exists(zk_path / "metadata")) { checkTableStructure(zk_path, storage_metadata); + checkTableStructure(zk_path, storage_metadata); + checkTableStructure(zk_path, storage_metadata); } else { @@ -623,8 +602,7 @@ std::shared_ptr StorageS3Queue::createFileIterator /* read_keys */ nullptr, configuration.request_settings); - return std::make_shared( - files_metadata, std::move(glob_iterator), s3queue_settings->s3queue_current_shard_num, shutdown_called, log); + return std::make_shared(files_metadata, std::move(glob_iterator), shutdown_called, log); } void registerStorageS3Queue(StorageFactory & factory) diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 1f735b47819..28fd09b8add 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -59,7 +59,6 @@ private: const std::unique_ptr s3queue_settings; const fs::path zk_path; - const S3QueueAction after_processing; std::shared_ptr files_metadata; Configuration configuration; @@ -86,7 +85,6 @@ private: std::shared_ptr createSource( const ReadFromFormatInfo & info, std::shared_ptr file_iterator, - size_t processing_id, size_t max_block_size, ContextPtr local_context); diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 9768653f3fe..48477345507 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -230,7 +230,7 @@ public: expanded_keys_iter++; } - KeyWithInfoPtr next(size_t) + KeyWithInfoPtr next() { std::lock_guard lock(mutex); return nextAssumeLocked(); @@ -491,9 +491,9 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( { } -StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next(size_t idx) /// NOLINT +StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next() /// NOLINT { - return pimpl->next(idx); + return pimpl->next(); } size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount() @@ -535,7 +535,7 @@ public: } } - KeyWithInfoPtr next(size_t) + KeyWithInfoPtr next() { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= keys.size()) @@ -579,9 +579,9 @@ StorageS3Source::KeysIterator::KeysIterator( { } -StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next(size_t idx) /// NOLINT +StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next() /// NOLINT { - return pimpl->next(idx); + return pimpl->next(); } size_t StorageS3Source::KeysIterator::estimatedKeysCount() @@ -608,7 +608,7 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator( buffer.emplace_back(std::make_shared(key_future.get())); } -StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next(size_t) /// NOLINT +StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next() { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= buffer.size()) @@ -663,7 +663,7 @@ StorageS3Source::ArchiveIterator::ArchiveIterator( } } -StorageS3Source::KeyWithInfoPtr StorageS3Source::ArchiveIterator::next(size_t) +StorageS3Source::KeyWithInfoPtr StorageS3Source::ArchiveIterator::next() { if (!path_in_archive.empty()) { @@ -789,23 +789,23 @@ StorageS3Source::StorageS3Source( { } -void StorageS3Source::lazyInitialize(size_t idx) +void StorageS3Source::lazyInitialize() { if (initialized) return; - reader = createReader(idx); + reader = createReader(); if (reader) - reader_future = createReaderAsync(idx); + reader_future = createReaderAsync(); initialized = true; } -StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx) +StorageS3Source::ReaderHolder StorageS3Source::createReader() { KeyWithInfoPtr key_with_info; do { - key_with_info = file_iterator->next(idx); + key_with_info = file_iterator->next(); if (!key_with_info || key_with_info->key.empty()) return {}; @@ -888,9 +888,9 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx) return ReaderHolder{key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)}; } -std::future StorageS3Source::createReaderAsync(size_t idx) +std::future StorageS3Source::createReaderAsync() { - return create_reader_scheduler([=, this] { return createReader(idx); }, Priority{}); + return create_reader_scheduler([=, this] { return createReader(); }, Priority{}); } std::unique_ptr createS3ReadBuffer( diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 606c677f915..3f3d4346bbd 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -76,7 +76,7 @@ public: { public: virtual ~IIterator() = default; - virtual KeyWithInfoPtr next(size_t idx = 0) = 0; /// NOLINT + virtual KeyWithInfoPtr next() = 0; /// NOLINT /// Estimates how many streams we need to process all files. /// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys. @@ -100,7 +100,7 @@ public: const S3Settings::RequestSettings & request_settings_ = {}, std::function progress_callback_ = {}); - KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT + KeyWithInfoPtr next() override; /// NOLINT size_t estimatedKeysCount() override; private: @@ -121,7 +121,7 @@ public: KeysWithInfo * read_keys = nullptr, std::function progress_callback_ = {}); - KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT + KeyWithInfoPtr next() override; /// NOLINT size_t estimatedKeysCount() override; private: @@ -135,7 +135,7 @@ public: public: explicit ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count); - KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT + KeyWithInfoPtr next() override; /// NOLINT size_t estimatedKeysCount() override; private: @@ -158,7 +158,7 @@ public: ContextPtr context_, KeysWithInfo * read_keys_); - KeyWithInfoPtr next(size_t) override; /// NOLINT + KeyWithInfoPtr next() override; /// NOLINT size_t estimatedKeysCount() override; void refreshArchiveReader(); @@ -301,11 +301,11 @@ private: /// Notice: we should initialize reader and future_reader lazily in generate to make sure key_condition /// is set before createReader is invoked for key_condition is read in createReader. - void lazyInitialize(size_t idx = 0); + void lazyInitialize(); /// Recreate ReadBuffer and Pipeline for each file. - ReaderHolder createReader(size_t idx = 0); - std::future createReaderAsync(size_t idx = 0); + ReaderHolder createReader(); + std::future createReaderAsync(); void addNumRowsToCache(const String & bucket_with_key, size_t num_rows); std::optional tryGetNumRowsFromCache(const KeyWithInfo & key_with_info); diff --git a/tests/integration/test_storage_s3_queue/configs/zookeeper.xml b/tests/integration/test_storage_s3_queue/configs/zookeeper.xml index 27334dca590..1115d335f4f 100644 --- a/tests/integration/test_storage_s3_queue/configs/zookeeper.xml +++ b/tests/integration/test_storage_s3_queue/configs/zookeeper.xml @@ -13,4 +13,19 @@ 2181 + + + + + + instance + 9000 + + + instance2 + 9000 + + + + diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index e7925d55d00..ca1e9eb5a48 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -1061,7 +1061,7 @@ def test_processing_threads(started_cluster, mode): def get_count(table_name): return int(run_query(node, f"SELECT count() FROM {table_name}")) - for _ in range(100): + for _ in range(30): if (get_count(f"{dst_table_name}")) == files_to_generate: break time.sleep(1) @@ -1078,7 +1078,7 @@ def test_processing_threads(started_cluster, mode): if mode == "ordered": zk = started_cluster.get_kazoo_client("zoo1") - processed_nodes = zk.get_children(f"{keeper_path}/processed/") + processed_nodes = zk.get_children(f"{keeper_path}/buckets/") assert len(processed_nodes) == processing_threads @@ -1112,7 +1112,7 @@ def test_shards(started_cluster, mode, processing_threads): additional_settings={ "keeper_path": keeper_path, "s3queue_processing_threads_num": processing_threads, - "s3queue_total_shards_num": shards_num, + "s3queue_buckets": shards_num, }, ) create_mv(node, table, dst_table) @@ -1125,12 +1125,10 @@ def test_shards(started_cluster, mode, processing_threads): return int(run_query(node, f"SELECT count() FROM {table_name}")) for _ in range(100): - if ( - get_count(f"{dst_table_name}_1") - + get_count(f"{dst_table_name}_2") - + get_count(f"{dst_table_name}_3") - ) == files_to_generate: + count = get_count(f"{dst_table_name}_1") + get_count(f"{dst_table_name}_2") + get_count(f"{dst_table_name}_3") + if count == files_to_generate: break + print(f"Current {count}/{files_to_generate}") time.sleep(1) if ( @@ -1138,10 +1136,22 @@ def test_shards(started_cluster, mode, processing_threads): + get_count(f"{dst_table_name}_2") + get_count(f"{dst_table_name}_3") ) != files_to_generate: + processed_files = node.query( + f"select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' order by file" + ).strip().split('\n') + logging.debug(f"Processed files: {len(processed_files)}/{files_to_generate}") + + count = get_count(f"{dst_table_name}_1") + get_count(f"{dst_table_name}_2") + get_count(f"{dst_table_name}_3") + logging.debug(f"Processed rows: {count}/{files_to_generate}") + info = node.query( - f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical" + f""" + select concat('test_', toString(number), '.csv') as file from numbers(300) + where file not in (select splitByChar('/', file_name)[-1] from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed') + """ ) - logging.debug(info) + logging.debug(f"Unprocessed files: {info}") + assert False res1 = [ @@ -1176,10 +1186,8 @@ def test_shards(started_cluster, mode, processing_threads): if mode == "ordered": zk = started_cluster.get_kazoo_client("zoo1") - processed_nodes = zk.get_children(f"{keeper_path}/processed/") - assert len(processed_nodes) == shards_num * processing_threads - shard_nodes = zk.get_children(f"{keeper_path}/shards/") - assert len(shard_nodes) == shards_num + processed_nodes = zk.get_children(f"{keeper_path}/buckets/") + assert len(processed_nodes) == shards_num @pytest.mark.parametrize( @@ -1214,7 +1222,7 @@ def test_shards_distributed(started_cluster, mode, processing_threads): additional_settings={ "keeper_path": keeper_path, "s3queue_processing_threads_num": processing_threads, - "s3queue_total_shards_num": shards_num, + "s3queue_buckets": shards_num, }, ) i += 1 @@ -1229,7 +1237,7 @@ def test_shards_distributed(started_cluster, mode, processing_threads): def get_count(node, table_name): return int(run_query(node, f"SELECT count() FROM {table_name}")) - for _ in range(150): + for _ in range(10): if ( get_count(node, dst_table_name) + get_count(node_2, dst_table_name) ) == total_rows: @@ -1239,10 +1247,47 @@ def test_shards_distributed(started_cluster, mode, processing_threads): if ( get_count(node, dst_table_name) + get_count(node_2, dst_table_name) ) != total_rows: + processed_files = node.query( + f""" +select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file + """ + ).strip().split('\n') + logging.debug(f"Processed files by node 1: {len(processed_files)}/{files_to_generate}") + processed_files = node_2.query( + f""" +select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file + """ + ).strip().split('\n') + logging.debug(f"Processed files by node 2: {len(processed_files)}/{files_to_generate}") + + count = get_count(node, dst_table_name) + get_count(node_2, dst_table_name) + logging.debug(f"Processed rows: {count}/{files_to_generate}") + info = node.query( - f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical" + f""" + select concat('test_', toString(number), '.csv') as file from numbers(300) + where file not in (select splitByChar('/', file_name)[-1] from clusterAllReplicas(default, system.s3queue) + where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0) + """ ) - logging.debug(info) + logging.debug(f"Unprocessed files: {info}") + + files1 = node.query( + f""" + select splitByChar('/', file_name)[-1] from system.s3queue + where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 + """ + ).strip().split("\n") + files2 = node_2.query( + f""" + select splitByChar('/', file_name)[-1] from system.s3queue + where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 + """ + ).strip().split("\n") + def intersection(list_a, list_b): + return [ e for e in list_a if e in list_b ] + logging.debug(f"Intersecting files: {intersection(files1, files2)}") + assert False get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}" @@ -1267,10 +1312,8 @@ def test_shards_distributed(started_cluster, mode, processing_threads): if mode == "ordered": zk = started_cluster.get_kazoo_client("zoo1") - processed_nodes = zk.get_children(f"{keeper_path}/processed/") - assert len(processed_nodes) == shards_num * processing_threads - shard_nodes = zk.get_children(f"{keeper_path}/shards/") - assert len(shard_nodes) == shards_num + processed_nodes = zk.get_children(f"{keeper_path}/buckets/") + assert len(processed_nodes) == shards_num node.restart_clickhouse() time.sleep(10) @@ -1297,12 +1340,12 @@ def test_settings_check(started_cluster): additional_settings={ "keeper_path": keeper_path, "s3queue_processing_threads_num": 5, - "s3queue_total_shards_num": 2, + "s3queue_buckets": 2, }, ) assert ( - "Existing table metadata in ZooKeeper differs in s3queue_total_shards_num setting. Stored in ZooKeeper: 2, local: 3" + "Existing table metadata in ZooKeeper differs in s3queue_buckets setting. Stored in ZooKeeper: 2, local: 3" in create_table( started_cluster, node_2, @@ -1312,7 +1355,7 @@ def test_settings_check(started_cluster): additional_settings={ "keeper_path": keeper_path, "s3queue_processing_threads_num": 5, - "s3queue_total_shards_num": 3, + "s3queue_buckets": 3, }, expect_error=True, ) @@ -1329,7 +1372,7 @@ def test_settings_check(started_cluster): additional_settings={ "keeper_path": keeper_path, "s3queue_processing_threads_num": 2, - "s3queue_total_shards_num": 2, + "s3queue_buckets": 2, }, expect_error=True, ) @@ -1419,7 +1462,7 @@ def test_processed_file_setting_distributed(started_cluster, processing_threads) "keeper_path": keeper_path, "s3queue_processing_threads_num": processing_threads, "s3queue_last_processed_path": f"{files_path}/test_5.csv", - "s3queue_total_shards_num": 2, + "s3queue_buckets": 2, }, )