From 7f8f379d7fe6b3368221ff97572df83eacd14e1f Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 24 Jan 2024 16:04:00 +0100 Subject: [PATCH] Parallel & disrtibuted processing for ordered mode --- src/Storages/S3Queue/S3QueueFilesMetadata.cpp | 51 +++- src/Storages/S3Queue/S3QueueFilesMetadata.h | 14 +- src/Storages/S3Queue/S3QueueSettings.h | 2 + src/Storages/S3Queue/S3QueueSource.cpp | 73 +++++- src/Storages/S3Queue/S3QueueSource.h | 14 +- src/Storages/S3Queue/StorageS3Queue.cpp | 35 +-- src/Storages/S3Queue/StorageS3Queue.h | 1 + src/Storages/StorageS3.cpp | 28 +- src/Storages/StorageS3.h | 14 +- .../integration/test_storage_s3_queue/test.py | 240 ++++++++++++++++++ 10 files changed, 414 insertions(+), 58 deletions(-) diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index f49e1d6f25c..02974be4c4a 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -129,6 +129,8 @@ S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, con , max_loading_retries(settings_.s3queue_loading_retries.value) , min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value) , max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value) + , shards_num(settings_.s3queue_total_shards_num) + , threads_per_shard(settings_.s3queue_processing_threads_num) , zookeeper_processing_path(zookeeper_path_ / "processing") , zookeeper_processed_path(zookeeper_path_ / "processed") , zookeeper_failed_path(zookeeper_path_ / "failed") @@ -197,6 +199,11 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata( return metadata; } +size_t S3QueueFilesMetadata::getProcessingThreadForPath(const std::string & path) const +{ + return sipHash64(path.data(), path.size()) % getProcessingThreadsNum(); +} + S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds); @@ -312,7 +319,8 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs } std::pair S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status) + S3QueueFilesMetadata::ProcessingNodeHolderPtr> +S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, const FileStatusPtr & file_status) { /// In one zookeeper transaction do the following: /// 1. check that corresponding persistent nodes do not exist in processed/ and failed/; @@ -339,7 +347,8 @@ std::pair(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client); + auto holder = std::make_unique( + node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client); return std::pair{SetFileProcessingResult::Success, std::move(holder)}; } @@ -362,7 +371,8 @@ std::pair S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status) + S3QueueFilesMetadata::ProcessingNodeHolderPtr> +S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, const FileStatusPtr & file_status) { /// Same as for Unordered mode. /// The only difference is the check if the file is already processed. @@ -385,10 +395,15 @@ std::pairget(zookeeper_processed_path, &processed_node_stat); + auto processed_node = isShardedProcessing() + ? zookeeper_processed_path / toString(getProcessingThreadForPath(path)) + : zookeeper_processed_path; + NodeMetadata processed_node_metadata; - if (!data.empty()) + Coordination::Stat processed_node_stat; + std::string data; + auto processed_node_exists = zk_client->tryGet(processed_node, data, &processed_node_stat); + if (processed_node_exists && !data.empty()) processed_node_metadata = NodeMetadata::fromString(data); auto max_processed_file_path = processed_node_metadata.file_path; @@ -403,13 +418,23 @@ std::pairtryMulti(requests, responses); if (code == Coordination::Error::ZOK) { - auto holder = std::make_unique(node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client); + auto holder = std::make_unique( + node_metadata.processing_id, path, zookeeper_processing_path / node_name, file_status, zk_client); return std::pair{SetFileProcessingResult::Success, std::move(holder)}; } @@ -500,11 +525,15 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt const auto node_metadata = createNodeMetadata(path).toString(); const auto zk_client = getZooKeeper(); + auto processed_node = isShardedProcessing() + ? zookeeper_processed_path / toString(getProcessingThreadForPath(path)) + : zookeeper_processed_path; + while (true) { std::string res; Coordination::Stat stat; - bool exists = zk_client->tryGet(zookeeper_processed_path, res, &stat); + bool exists = zk_client->tryGet(processed_node, res, &stat); Coordination::Requests requests; if (exists) { @@ -527,11 +556,11 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(ProcessingNodeHolderPt return; } } - requests.push_back(zkutil::makeSetRequest(zookeeper_processed_path, node_metadata, stat.version)); + requests.push_back(zkutil::makeSetRequest(processed_node, node_metadata, stat.version)); } else { - requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path, node_metadata, zkutil::CreateMode::Persistent)); + requests.push_back(zkutil::makeCreateRequest(processed_node, node_metadata, zkutil::CreateMode::Persistent)); } Coordination::Responses responses; diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index f3be7c5c3a0..708355e4ac3 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -80,6 +80,15 @@ public: void deactivateCleanupTask(); + bool isShardedProcessing() const { return getProcessingThreadsNum() > 1 && mode == S3QueueMode::ORDERED; } + + size_t getProcessingThreadsNum() const { return shards_num * threads_per_shard; } + + size_t getProcessingThreadForPath(const std::string & path) const; + + /// shard_id must be in range [0, shards_num - 1] + size_t getIdForProcessingThread(size_t thread_id, size_t shard_id) const { return shard_id * threads_per_shard + thread_id; } + private: const S3QueueMode mode; const UInt64 max_set_size; @@ -87,6 +96,8 @@ private: const UInt64 max_loading_retries; const size_t min_cleanup_interval_ms; const size_t max_cleanup_interval_ms; + const size_t shards_num; + const size_t threads_per_shard; const fs::path zookeeper_processing_path; const fs::path zookeeper_processed_path; @@ -117,8 +128,7 @@ private: struct NodeMetadata { - std::string file_path; - UInt64 last_processed_timestamp = 0; + std::string file_path; UInt64 last_processed_timestamp = 0; std::string last_exception; UInt64 retries = 0; std::string processing_id; /// For ephemeral processing node. diff --git a/src/Storages/S3Queue/S3QueueSettings.h b/src/Storages/S3Queue/S3QueueSettings.h index 66fe9b4ce31..d65b38f77f2 100644 --- a/src/Storages/S3Queue/S3QueueSettings.h +++ b/src/Storages/S3Queue/S3QueueSettings.h @@ -29,6 +29,8 @@ class ASTStorage; M(UInt32, s3queue_tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \ M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \ M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \ + M(UInt32, s3queue_total_shards_num, 1, "Value 0 means disabled", 0) \ + M(UInt32, s3queue_current_shard_num, 0, "", 0) \ #define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \ S3QUEUE_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 27bec039f96..7d4ad64d554 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -46,29 +46,86 @@ StorageS3QueueSource::FileIterator::FileIterator( : metadata(metadata_) , glob_iterator(std::move(glob_iterator_)) , shutdown_called(shutdown_called_) + , log(&Poco::Logger::get("StorageS3QueueSource")) + , sharded_processing(metadata->isShardedProcessing()) { + if (sharded_processing) + { + for (size_t i = 0; i < metadata->getProcessingThreadsNum(); ++i) + sharded_keys.emplace(i, std::deque{}); + } } -StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next() +StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next(size_t idx) { while (!shutdown_called) { - KeyWithInfoPtr val = glob_iterator->next(); + KeyWithInfoPtr val{nullptr}; + + if (sharded_processing) + { + LOG_TEST(log, "CHECK: {}", idx); + auto & keys = sharded_keys.at(idx); + if (!keys.empty()) + { + std::lock_guard lk(sharded_keys_mutex); + val = keys.front(); + keys.pop_front(); + } + } + + if (!val) + { + std::unique_lock lk(sharded_keys_mutex, std::defer_lock); + if (sharded_processing) + { + /// To make sure order on keys in each shard in sharded_keys. + lk.lock(); + } + + val = glob_iterator->next(); + + if (val && sharded_processing) + { + auto shard = metadata->getProcessingThreadForPath(val->key); + if (shard != idx) + { + LOG_TEST(log, "Key {} is for shard {} (total: {})", val->key, shard, sharded_keys.size()); + auto & keys = sharded_keys.at(shard); + keys.push_back(val); + continue; + } + LOG_TEST(log, "Processing shard {} with key {}", shard, val->key); + } + } if (!val) return {}; if (shutdown_called) { - LOG_TEST(&Poco::Logger::get("StorageS3QueueSource"), "Shutdown was called, stopping file iterator"); + LOG_TEST(log, "Shutdown was called, stopping file iterator"); return {}; } - if (auto processing_holder = metadata->trySetFileAsProcessing(val->key); - processing_holder && !shutdown_called) + auto processing_holder = metadata->trySetFileAsProcessing(val->key); + if (shutdown_called) + { + LOG_TEST(log, "Shutdown was called, stopping file iterator"); + return {}; + } + + if (processing_holder) { return std::make_shared(val->key, val->info, processing_holder); } + else if (sharded_processing + && metadata->getFileStatus(val->key)->state == S3QueueFilesMetadata::FileStatus::State::Processing) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "File {} is processing by someone else in sharded processing. " + "It is a bug", val->key); + } } return {}; } @@ -83,6 +140,7 @@ StorageS3QueueSource::StorageS3QueueSource( const Block & header_, std::unique_ptr internal_source_, std::shared_ptr files_metadata_, + size_t processing_id_, const S3QueueAction & action_, RemoveFileFunc remove_file_func_, const NamesAndTypesList & requested_virtual_columns_, @@ -96,6 +154,7 @@ StorageS3QueueSource::StorageS3QueueSource( , WithContext(context_) , name(std::move(name_)) , action(action_) + , processing_id(processing_id_) , files_metadata(files_metadata_) , internal_source(std::move(internal_source_)) , requested_virtual_columns(requested_virtual_columns_) @@ -123,7 +182,7 @@ void StorageS3QueueSource::lazyInitialize() if (initialized) return; - internal_source->lazyInitialize(); + internal_source->lazyInitialize(processing_id); reader = std::move(internal_source->reader); if (reader) reader_future = std::move(internal_source->reader_future); @@ -249,7 +308,7 @@ Chunk StorageS3QueueSource::generate() /// Even if task is finished the thread may be not freed in pool. /// So wait until it will be freed before scheduling a new task. internal_source->create_reader_pool.wait(); - reader_future = internal_source->createReaderAsync(); + reader_future = internal_source->createReaderAsync(processing_id); } return {}; diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index 542f8e8fd8c..338f355974b 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -38,12 +38,15 @@ public: class FileIterator : public IIterator { public: - FileIterator(std::shared_ptr metadata_, std::unique_ptr glob_iterator_, std::atomic & shutdown_called_); + FileIterator( + std::shared_ptr metadata_, + std::unique_ptr glob_iterator_, + std::atomic & shutdown_called_); /// Note: /// List results in s3 are always returned in UTF-8 binary order. /// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html) - KeyWithInfoPtr next() override; + KeyWithInfoPtr next(size_t idx) override; size_t estimatedKeysCount() override; @@ -52,6 +55,11 @@ public: const std::unique_ptr glob_iterator; std::atomic & shutdown_called; std::mutex mutex; + Poco::Logger * log; + + const bool sharded_processing; + std::unordered_map> sharded_keys; + std::mutex sharded_keys_mutex; }; StorageS3QueueSource( @@ -59,6 +67,7 @@ public: const Block & header_, std::unique_ptr internal_source_, std::shared_ptr files_metadata_, + size_t processing_id_, const S3QueueAction & action_, RemoveFileFunc remove_file_func_, const NamesAndTypesList & requested_virtual_columns_, @@ -80,6 +89,7 @@ public: private: const String name; const S3QueueAction action; + const size_t processing_id; const std::shared_ptr files_metadata; const std::shared_ptr internal_source; const NamesAndTypesList requested_virtual_columns; diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index bc33e8cf2a9..511add4912f 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -75,14 +75,8 @@ namespace return zkutil::extractZooKeeperPath(result_zk_path, true); } - void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, Poco::Logger * log) + void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings) { - if (s3queue_settings.mode == S3QueueMode::ORDERED && s3queue_settings.s3queue_processing_threads_num > 1) - { - LOG_WARNING(log, "Parallel processing is not yet supported for Ordered mode"); - s3queue_settings.s3queue_processing_threads_num = 1; - } - if (!s3queue_settings.s3queue_processing_threads_num) { throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero"); @@ -99,6 +93,13 @@ namespace "Setting `s3queue_cleanup_interval_min_ms` ({}) must be less or equal to `s3queue_cleanup_interval_max_ms` ({})", s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms); } + + if (s3queue_settings.s3queue_current_shard_num >= s3queue_settings.s3queue_total_shards_num) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Setting `s3queue_current_shard_num` ({}) cannot exceed `{}` (`s3queue_total_shards_num` - 1)", + s3queue_settings.s3queue_current_shard_num, s3queue_settings.s3queue_total_shards_num); + + ///TODO: Add a test with different total_shards_settings for same keeper path - exception must be thrown. } } @@ -134,7 +135,7 @@ StorageS3Queue::StorageS3Queue( throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs"); } - checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), log); + checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef()); configuration.update(context_); FormatFactory::instance().checkFormatName(configuration.format); @@ -221,13 +222,12 @@ public: std::shared_ptr storage_, ContextPtr context_, size_t max_block_size_, - size_t num_streams_) + size_t ) : SourceStepWithFilter(DataStream{.header = std::move(sample_block)}) , info(std::move(info_)) , storage(std::move(storage_)) , context(std::move(context_)) , max_block_size(max_block_size_) - , num_streams(num_streams_) { } @@ -236,7 +236,6 @@ private: std::shared_ptr storage; ContextPtr context; size_t max_block_size; - size_t num_streams; std::shared_ptr iterator; @@ -301,11 +300,15 @@ void StorageS3Queue::read( void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { Pipes pipes; - const size_t adjusted_num_streams = std::min(num_streams, storage->s3queue_settings->s3queue_processing_threads_num); + const size_t adjusted_num_streams = storage->s3queue_settings->s3queue_processing_threads_num; createIterator(nullptr); for (size_t i = 0; i < adjusted_num_streams; ++i) - pipes.emplace_back(storage->createSource(info, iterator, max_block_size, context)); + pipes.emplace_back(storage->createSource( + info, + iterator, + storage->files_metadata->getIdForProcessingThread(i, storage->s3queue_settings->s3queue_current_shard_num), + max_block_size, context)); auto pipe = Pipe::unitePipes(std::move(pipes)); if (pipe.empty()) @@ -320,6 +323,7 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const std::shared_ptr StorageS3Queue::createSource( const ReadFromFormatInfo & info, std::shared_ptr file_iterator, + size_t processing_id, size_t max_block_size, ContextPtr local_context) { @@ -359,7 +363,7 @@ std::shared_ptr StorageS3Queue::createSource( auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr; return std::make_shared( getName(), info.source_header, std::move(internal_source), - files_metadata, after_processing, file_deleter, info.requested_virtual_columns, + files_metadata, processing_id, after_processing, file_deleter, info.requested_virtual_columns, local_context, shutdown_called, table_is_being_dropped, s3_queue_log, getStorageID(), log); } @@ -463,7 +467,8 @@ bool StorageS3Queue::streamToViews() for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i) { auto source = createSource( - read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context); + read_from_format_info, file_iterator, files_metadata->getIdForProcessingThread(i, s3queue_settings->s3queue_current_shard_num), + DBMS_DEFAULT_BUFFER_SIZE, s3queue_context); pipes.emplace_back(std::move(source)); } diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 3d3594dc2ab..f65fdf38b3c 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -91,6 +91,7 @@ private: std::shared_ptr createSource( const ReadFromFormatInfo & info, std::shared_ptr file_iterator, + size_t processing_id, size_t max_block_size, ContextPtr local_context); diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index c376af5a3d7..dcd7e13b865 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -244,7 +244,7 @@ public: fillInternalBufferAssumeLocked(); } - KeyWithInfoPtr next() + KeyWithInfoPtr next(size_t) { std::lock_guard lock(mutex); return nextAssumeLocked(); @@ -436,9 +436,9 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( { } -StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next() +StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next(size_t idx) /// NOLINT { - return pimpl->next(); + return pimpl->next(idx); } size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount() @@ -471,7 +471,7 @@ public: } } - KeyWithInfoPtr next() + KeyWithInfoPtr next(size_t) { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= keys.size()) @@ -516,9 +516,9 @@ StorageS3Source::KeysIterator::KeysIterator( { } -StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next() +StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next(size_t idx) /// NOLINT { - return pimpl->next(); + return pimpl->next(idx); } size_t StorageS3Source::KeysIterator::estimatedKeysCount() @@ -545,7 +545,7 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator( buffer.emplace_back(std::make_shared(key_future.get(), std::nullopt)); } -StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next() +StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next(size_t) /// NOLINT { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); if (current_index >= buffer.size()) @@ -599,23 +599,23 @@ StorageS3Source::StorageS3Source( { } -void StorageS3Source::lazyInitialize() +void StorageS3Source::lazyInitialize(size_t idx) { if (initialized) return; - reader = createReader(); + reader = createReader(idx); if (reader) - reader_future = createReaderAsync(); + reader_future = createReaderAsync(idx); initialized = true; } -StorageS3Source::ReaderHolder StorageS3Source::createReader() +StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx) { KeyWithInfoPtr key_with_info; do { - key_with_info = (*file_iterator)(); + key_with_info = file_iterator->next(idx); if (!key_with_info || key_with_info->key.empty()) return {}; @@ -689,9 +689,9 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader() return ReaderHolder{key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)}; } -std::future StorageS3Source::createReaderAsync() +std::future StorageS3Source::createReaderAsync(size_t idx) { - return create_reader_scheduler([this] { return createReader(); }, Priority{}); + return create_reader_scheduler([=, this] { return createReader(idx); }, Priority{}); } std::unique_ptr StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size) diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index b90a0d394cb..5c4e4d358c0 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -61,7 +61,7 @@ public: { public: virtual ~IIterator() = default; - virtual KeyWithInfoPtr next() = 0; + virtual KeyWithInfoPtr next(size_t idx = 0) = 0; /// NOLINT /// Estimates how many streams we need to process all files. /// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys. @@ -85,7 +85,7 @@ public: const S3Settings::RequestSettings & request_settings_ = {}, std::function progress_callback_ = {}); - KeyWithInfoPtr next() override; + KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT size_t estimatedKeysCount() override; private: @@ -106,7 +106,7 @@ public: KeysWithInfo * read_keys = nullptr, std::function progress_callback_ = {}); - KeyWithInfoPtr next() override; + KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT size_t estimatedKeysCount() override; private: @@ -120,7 +120,7 @@ public: public: explicit ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count); - KeyWithInfoPtr next() override; + KeyWithInfoPtr next(size_t idx = 0) override; /// NOLINT size_t estimatedKeysCount() override; private: @@ -253,11 +253,11 @@ private: /// Notice: we should initialize reader and future_reader lazily in generate to make sure key_condition /// is set before createReader is invoked for key_condition is read in createReader. - void lazyInitialize(); + void lazyInitialize(size_t idx = 0); /// Recreate ReadBuffer and Pipeline for each file. - ReaderHolder createReader(); - std::future createReaderAsync(); + ReaderHolder createReader(size_t idx = 0); + std::future createReaderAsync(size_t idx = 0); std::unique_ptr createS3ReadBuffer(const String & key, size_t object_size); std::unique_ptr createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size); diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 7d40060fec6..5209c6f9642 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -960,3 +960,243 @@ def test_s3_client_reused(started_cluster): s3_clients_after = get_created_s3_clients_count() assert s3_clients_before == s3_clients_after + + +@pytest.mark.parametrize("mode", ["unordered", "ordered"]) +def test_processing_threads(started_cluster, mode): + node = started_cluster.instances["instance"] + table_name = f"processing_threads_{mode}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 300 + processing_threads = 32 + + create_table( + started_cluster, + node, + table_name, + mode, + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_processing_threads_num": processing_threads, + }, + ) + create_mv(node, table_name, dst_table_name) + + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, row_num=1 + ) + + def get_count(table_name): + return int(run_query(node, f"SELECT count() FROM {table_name}")) + + for _ in range(100): + if (get_count(f"{dst_table_name}")) == files_to_generate: + break + time.sleep(1) + + assert get_count(dst_table_name) == files_to_generate + + res = [ + list(map(int, l.split())) + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name}" + ).splitlines() + ] + assert {tuple(v) for v in res} == set([tuple(i) for i in total_values]) + + if mode == "ordered": + zk = started_cluster.get_kazoo_client("zoo1") + processed_nodes = zk.get_children(f"{keeper_path}/processed/") + assert len(processed_nodes) == processing_threads + + +@pytest.mark.parametrize( + "mode, processing_threads", + [ + pytest.param("unordered", 1), + pytest.param("unordered", 8), + pytest.param("ordered", 1), + pytest.param("ordered", 8), + ], +) +def test_shards(started_cluster, mode, processing_threads): + node = started_cluster.instances["instance"] + table_name = f"test_shards_{mode}_{processing_threads}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 300 + shards_num = 3 + + for i in range(shards_num): + table = f"{table_name}_{i + 1}" + dst_table = f"{dst_table_name}_{i + 1}" + create_table( + started_cluster, + node, + table, + mode, + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_processing_threads_num": processing_threads, + "s3queue_total_shards_num": shards_num, + "s3queue_current_shard_num": i, + }, + ) + create_mv(node, table, dst_table) + + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, row_num=1 + ) + + def get_count(table_name): + return int(run_query(node, f"SELECT count() FROM {table_name}")) + + for _ in range(100): + if ( + get_count(f"{dst_table_name}_1") + + get_count(f"{dst_table_name}_2") + + get_count(f"{dst_table_name}_3") + ) == files_to_generate: + break + time.sleep(1) + + if ( + get_count(f"{dst_table_name}_1") + + get_count(f"{dst_table_name}_2") + + get_count(f"{dst_table_name}_3") + ) != files_to_generate: + info = node.query( + f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical" + ) + logging.debug(info) + assert False + + res1 = [ + list(map(int, l.split())) + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name}_1" + ).splitlines() + ] + res2 = [ + list(map(int, l.split())) + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name}_2" + ).splitlines() + ] + res3 = [ + list(map(int, l.split())) + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name}_3" + ).splitlines() + ] + assert {tuple(v) for v in res1 + res2 + res3} == set( + [tuple(i) for i in total_values] + ) + + # Checking that all files were processed only once + time.sleep(10) + assert ( + get_count(f"{dst_table_name}_1") + + get_count(f"{dst_table_name}_2") + + get_count(f"{dst_table_name}_3") + ) == files_to_generate + + if mode == "ordered": + zk = started_cluster.get_kazoo_client("zoo1") + processed_nodes = zk.get_children(f"{keeper_path}/processed/") + assert len(processed_nodes) == shards_num * processing_threads + + +@pytest.mark.parametrize( + "mode, processing_threads", + [ + pytest.param("unordered", 1), + pytest.param("unordered", 8), + pytest.param("ordered", 1), + pytest.param("ordered", 8), + ], +) +def test_shards_distributed(started_cluster, mode, processing_threads): + node = started_cluster.instances["instance"] + node_2 = started_cluster.instances["instance2"] + table_name = f"test_shards_distributed_{mode}_{processing_threads}" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 300 + row_num = 50 + total_rows = row_num * files_to_generate + shards_num = 2 + + i = 0 + for instance in [node, node_2]: + create_table( + started_cluster, + instance, + table_name, + mode, + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_processing_threads_num": processing_threads, + "s3queue_total_shards_num": shards_num, + "s3queue_current_shard_num": i, + }, + ) + i += 1 + + for instance in [node, node_2]: + create_mv(instance, table_name, dst_table_name) + + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, row_num=row_num + ) + + def get_count(node, table_name): + return int(run_query(node, f"SELECT count() FROM {table_name}")) + + for _ in range(150): + if ( + get_count(node, dst_table_name) + get_count(node_2, dst_table_name) + ) == total_rows: + break + time.sleep(1) + + if ( + get_count(node, dst_table_name) + get_count(node_2, dst_table_name) + ) != total_rows: + info = node.query( + f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical" + ) + logging.debug(info) + assert False + + get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}" + res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()] + res2 = [ + list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines() + ] + + assert len(res1) + len(res2) == total_rows + + # Checking that all engines have made progress + assert len(res1) > 0 + assert len(res2) > 0 + + assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values]) + + # Checking that all files were processed only once + time.sleep(10) + assert ( + get_count(node, dst_table_name) + get_count(node_2, dst_table_name) + ) == total_rows + + if mode == "ordered": + zk = started_cluster.get_kazoo_client("zoo1") + processed_nodes = zk.get_children(f"{keeper_path}/processed/") + assert len(processed_nodes) == shards_num * processing_threads