From 14b09d3cdcbcfbe62f9780b91eed2611b48958f1 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 26 Sep 2023 16:23:24 +0200 Subject: [PATCH] Add caching --- src/Common/ProfileEvents.cpp | 1 + src/Interpreters/Cache/Metadata.cpp | 1 + src/Storages/S3Queue/S3QueueFilesMetadata.cpp | 283 +++++++++++++----- src/Storages/S3Queue/S3QueueFilesMetadata.h | 37 ++- .../S3Queue/S3QueueMetadataFactory.cpp | 29 ++ src/Storages/S3Queue/S3QueueMetadataFactory.h | 26 ++ src/Storages/S3Queue/StorageS3Queue.cpp | 3 +- src/Storages/S3Queue/StorageS3Queue.h | 2 - src/Storages/System/StorageSystemS3Queue.cpp | 56 ++-- 9 files changed, 315 insertions(+), 123 deletions(-) create mode 100644 src/Storages/S3Queue/S3QueueMetadataFactory.cpp create mode 100644 src/Storages/S3Queue/S3QueueMetadataFactory.h diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index dc6a3108971..a8a3a732784 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -534,6 +534,7 @@ The server successfully detected this situation and will download merged part fr M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\ M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\ M(S3QueuePullMicroseconds, "Time spent to read file data")\ + M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\ \ M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\ M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \ diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index 7a9321e4215..c251cb2dec2 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include namespace fs = std::filesystem; diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index a9be348405f..009f5e3ad75 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -27,6 +27,7 @@ namespace ProfileEvents extern const Event S3QueueSetFileProcessedMicroseconds; extern const Event S3QueueSetFileFailedMicroseconds; extern const Event S3QueueCleanupMaxSetSizeOrTTLMicroseconds; + extern const Event S3QueueLockLocalFileStatusesMicroseconds; }; namespace DB @@ -35,6 +36,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; } namespace @@ -53,41 +55,54 @@ namespace } } -S3QueueFilesMetadata::S3QueueFilesMetadata( - const StorageS3Queue * storage_, - const S3QueueSettings & settings_, - ContextPtr context) - : storage(storage_) - , mode(settings_.mode) - , max_set_size(settings_.s3queue_tracked_files_limit.value) - , max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value) - , max_loading_retries(settings_.s3queue_loading_retries.value) - , min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value) - , max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value) - , zookeeper_processing_path(storage->getZooKeeperPath() / "processing") - , zookeeper_processed_path(storage->getZooKeeperPath() / "processed") - , zookeeper_failed_path(storage->getZooKeeperPath() / "failed") - , zookeeper_cleanup_lock_path(storage->getZooKeeperPath() / "cleanup_lock") - , log(&Poco::Logger::get("S3QueueFilesMetadata")) +std::unique_lock S3QueueFilesMetadata::LocalFileStatuses::lock() const { - if (mode == S3QueueMode::UNORDERED && (max_set_size || max_set_age_sec)) + auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueLockLocalFileStatusesMicroseconds); + return std::unique_lock(mutex); +} + +S3QueueFilesMetadata::FileStatus::State S3QueueFilesMetadata::LocalFileStatuses::state(const std::string & filename) const +{ + auto lk = lock(); + if (auto it = file_statuses.find(filename); it != file_statuses.end()) + return it->second->state; + else + return FileStatus::State::None; +} + +S3QueueFilesMetadata::FileStatuses S3QueueFilesMetadata::LocalFileStatuses::getAll() const +{ + auto lk = lock(); + return file_statuses; +} + +std::shared_ptr S3QueueFilesMetadata::LocalFileStatuses::get(const std::string & filename, bool create) +{ + auto lk = lock(); + auto it = file_statuses.find(filename); + if (it == file_statuses.end()) { - task = context->getSchedulePool().createTask("S3QueueCleanupFunc", [this] { cleanupThreadFunc(); }); - task->activate(); - task->scheduleAfter(generateRescheduleInterval(min_cleanup_interval_ms, max_cleanup_interval_ms)); + if (create) + it = file_statuses.emplace(filename, std::make_shared()).first; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "File status for {} doesn't exist", filename); } + return it->second; } -S3QueueFilesMetadata::~S3QueueFilesMetadata() +bool S3QueueFilesMetadata::LocalFileStatuses::remove(const std::string & filename, bool if_exists) { - deactivateCleanupTask(); -} - -void S3QueueFilesMetadata::deactivateCleanupTask() -{ - shutdown = true; - if (task) - task->deactivate(); + auto lk = lock(); + auto it = file_statuses.find(filename); + if (it == file_statuses.end()) + { + if (if_exists) + return false; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "File status for {} doesn't exist", filename); + } + file_statuses.erase(it); + return true; } std::string S3QueueFilesMetadata::NodeMetadata::toString() const @@ -117,6 +132,49 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::NodeMetadata::fromStrin return metadata; } +S3QueueFilesMetadata::S3QueueFilesMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_) + : mode(settings_.mode) + , max_set_size(settings_.s3queue_tracked_files_limit.value) + , max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value) + , max_loading_retries(settings_.s3queue_loading_retries.value) + , min_cleanup_interval_ms(settings_.s3queue_cleanup_interval_min_ms.value) + , max_cleanup_interval_ms(settings_.s3queue_cleanup_interval_max_ms.value) + , zookeeper_processing_path(zookeeper_path_ / "processing") + , zookeeper_processed_path(zookeeper_path_ / "processed") + , zookeeper_failed_path(zookeeper_path_ / "failed") + , zookeeper_cleanup_lock_path(zookeeper_path_ / "cleanup_lock") + , log(&Poco::Logger::get("S3QueueFilesMetadata")) +{ + if (mode == S3QueueMode::UNORDERED && (max_set_size || max_set_age_sec)) + { + task = Context::getGlobalContextInstance()->getSchedulePool().createTask("S3QueueCleanupFunc", [this] { cleanupThreadFunc(); }); + task->activate(); + task->scheduleAfter(generateRescheduleInterval(min_cleanup_interval_ms, max_cleanup_interval_ms)); + } +} + +S3QueueFilesMetadata::~S3QueueFilesMetadata() +{ + deactivateCleanupTask(); +} + +void S3QueueFilesMetadata::deactivateCleanupTask() +{ + shutdown = true; + if (task) + task->deactivate(); +} + +zkutil::ZooKeeperPtr S3QueueFilesMetadata::getZooKeeper() const +{ + return Context::getGlobalContextInstance()->getZooKeeper(); +} + +std::shared_ptr S3QueueFilesMetadata::getFileStatus(const std::string & path) +{ + return local_file_statuses.get(path, /* create */false); +} + std::string S3QueueFilesMetadata::getNodeName(const std::string & path) { SipHash path_hash; @@ -137,23 +195,39 @@ S3QueueFilesMetadata::NodeMetadata S3QueueFilesMetadata::createNodeMetadata( return metadata; } -std::shared_ptr S3QueueFilesMetadata::getFileStatus(const std::string & path) -{ - std::lock_guard lock(file_statuses_mutex); - return file_statuses.at(path); -} - -S3QueueFilesMetadata::FileStatuses S3QueueFilesMetadata::getFileStateses() const -{ - std::lock_guard lock(file_statuses_mutex); - return file_statuses; -} - bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessingMicroseconds); - bool result; + /// Check locally cached file status. + switch (local_file_statuses.state(path)) + { + case FileStatus::State::Processing: [[fallthrough]]; + case FileStatus::State::Processed: + { + /// File is already processes or processing by current server. + return false; + } + case FileStatus::State::Failed: + { + if (!max_loading_retries) + { + /// File was processes by current server and failed, + /// retries are disabled. + return false; + } + /// TODO save information if file is still retriable. + break; + } + case FileStatus::State::None: + { + /// The file was not processed by current server, + /// check metadata in zookeeper. + break; + } + } + + SetFileProcessingResult result; switch (mode) { case S3QueueMode::ORDERED: @@ -167,21 +241,42 @@ bool S3QueueFilesMetadata::trySetFileAsProcessing(const std::string & path) break; } } - if (result) + switch (result) { - std::lock_guard lock(file_statuses_mutex); - auto it = file_statuses.emplace(path, std::make_shared()).first; - auto & file_status = it->second; - file_status->state = FileStatus::State::Processing; - file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get()); - timer.cancel(); - if (!file_status->processing_start_time) - file_status->processing_start_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + case SetFileProcessingResult::Success: + { + auto file_status = local_file_statuses.get(path, /* create */true); + file_status->state = FileStatus::State::Processing; + file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get()); + timer.cancel(); + if (!file_status->processing_start_time) + file_status->processing_start_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + break; + } + case SetFileProcessingResult::AlreadyProcessed: + { + /// Cache the state. + auto file_status = local_file_statuses.get(path, /* create */true); + file_status->state = FileStatus::State::Processed; + break; + } + case SetFileProcessingResult::AlreadyFailed: + { + /// Cache the state. + auto file_status = local_file_statuses.get(path, /* create */true); + file_status->state = FileStatus::State::Failed; + break; + } + case SetFileProcessingResult::ProcessingByOtherNode: + { + /// We cannot save any local state. + break; + } } - return result; + return result == SetFileProcessingResult::Success; } -bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path) +S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path) { /// Create an ephemenral node in /processing /// if corresponding node does not exist in failed/, processed/ and processing/. @@ -189,7 +284,7 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::str const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); - const auto zk_client = storage->getZooKeeper(); + const auto zk_client = getZooKeeper(); Coordination::Requests requests; zkutil::addCheckNotExistsRequest(requests, *zk_client, zookeeper_processed_path / node_name); @@ -198,10 +293,26 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::str Coordination::Responses responses; auto code = zk_client->tryMulti(requests, responses); - return code == Coordination::Error::ZOK; + + if (code == Coordination::Error::ZOK) + { + return SetFileProcessingResult::Success; + } + else if (responses[0]->error == Coordination::Error::ZOK) + { + if (responses[1]->error == Coordination::Error::ZOK) + { + chassert(responses[2]->error != Coordination::Error::ZOK); + return SetFileProcessingResult::ProcessingByOtherNode; + } + else + return SetFileProcessingResult::AlreadyFailed; + } + else + return SetFileProcessingResult::AlreadyProcessed; } -bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path) +S3QueueFilesMetadata::SetFileProcessingResult S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path) { /// Create an ephemenral node in /processing /// if corresponding it does not exist in failed/, processing/ and satisfied max processed file check. @@ -209,7 +320,7 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::strin const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); - const auto zk_client = storage->getZooKeeper(); + const auto zk_client = getZooKeeper(); while (true) { @@ -221,9 +332,16 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::strin auto code = zk_client->tryMulti(requests, responses); if (code != Coordination::Error::ZOK) { - LOG_TEST(log, "Skipping file `{}`: {}", - path, responses[0]->error != Coordination::Error::ZOK ? "failed" : "processing"); - return false; + if (responses[0]->error == Coordination::Error::ZOK) + { + LOG_TEST(log, "Skipping file `{}`: already processing", path); + return SetFileProcessingResult::ProcessingByOtherNode; + } + else + { + LOG_TEST(log, "Skipping file `{}`: failed", path); + return SetFileProcessingResult::AlreadyFailed; + } } Coordination::Stat processed_node_stat; @@ -234,7 +352,7 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::strin auto max_processed_file_path = processed_node_metadata.file_path; if (!max_processed_file_path.empty() && path <= max_processed_file_path) - return false; + return SetFileProcessingResult::AlreadyProcessed; requests.clear(); responses.clear(); @@ -244,14 +362,17 @@ bool S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::strin code = zk_client->tryMulti(requests, responses); if (code == Coordination::Error::ZOK) - return true; + return SetFileProcessingResult::Success; - if (responses[0]->error != Coordination::Error::ZOK - || responses[1]->error != Coordination::Error::ZOK) + if (responses[0]->error != Coordination::Error::ZOK) { - LOG_TEST(log, "Skipping file `{}`: {}", - path, responses[0]->error != Coordination::Error::ZOK ? "failed" : "processing"); - return false; + LOG_TEST(log, "Skipping file `{}`: failed", path); + return SetFileProcessingResult::AlreadyFailed; + } + else if (responses[1]->error != Coordination::Error::ZOK) + { + LOG_TEST(log, "Skipping file `{}`: already processing", path); + return SetFileProcessingResult::ProcessingByOtherNode; } else { @@ -264,8 +385,7 @@ void S3QueueFilesMetadata::setFileProcessed(const String & path) { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessedMicroseconds); SCOPE_EXIT({ - std::lock_guard lock(file_statuses_mutex); - auto & file_status = file_statuses.at(path); + auto file_status = local_file_statuses.get(path, /* create */false); file_status->state = FileStatus::State::Processed; file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessedMicroseconds, timer.get()); timer.cancel(); @@ -291,7 +411,7 @@ void S3QueueFilesMetadata::setFileProcessedForUnorderedMode(const String & path) const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); - const auto zk_client = storage->getZooKeeper(); + const auto zk_client = getZooKeeper(); Coordination::Requests requests; requests.push_back(zkutil::makeRemoveRequest(zookeeper_processing_path / node_name, -1)); @@ -316,7 +436,7 @@ void S3QueueFilesMetadata::setFileProcessedForOrderedMode(const String & path) { const auto node_name = getNodeName(path); const auto node_metadata = createNodeMetadata(path).toString(); - const auto zk_client = storage->getZooKeeper(); + const auto zk_client = getZooKeeper(); while (true) { @@ -351,8 +471,7 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileFailedMicroseconds); SCOPE_EXIT_SAFE({ - std::lock_guard lock(file_statuses_mutex); - auto & file_status = file_statuses.at(path); + auto file_status = local_file_statuses.get(path, /* create */false); file_status->state = FileStatus::State::Failed; file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileFailedMicroseconds, timer.get()); timer.cancel(); @@ -361,7 +480,7 @@ void S3QueueFilesMetadata::setFileFailed(const String & path, const String & exc const auto node_name = getNodeName(path); auto node_metadata = createNodeMetadata(path, exception_message); - const auto zk_client = storage->getZooKeeper(); + const auto zk_client = getZooKeeper(); if (max_loading_retries == 0) { @@ -474,7 +593,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() const bool check_nodes_limit = max_set_size > 0; const bool check_nodes_ttl = max_set_age_sec > 0; - const auto zk_client = storage->getZooKeeper(); + const auto zk_client = getZooKeeper(); auto nodes = zk_client->getChildren(zookeeper_processed_path); if (nodes.empty()) { @@ -571,6 +690,8 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() LOG_TEST(log, "Removing node at path {} ({}) because max files limit is reached", node.metadata.file_path, path.string()); + local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); + code = zk_client->tryRemove(path); if (code == Coordination::Error::ZOK) --nodes_to_remove; @@ -586,6 +707,8 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() LOG_TEST(log, "Removing node at path {} ({}) because file is reached", node.metadata.file_path, path.string()); + local_file_statuses.remove(node.metadata.file_path, /* if_exists */true); + code = zk_client->tryRemove(path); if (code != Coordination::Error::ZOK) LOG_ERROR(log, "Failed to remove a node `{}` (code: {})", path.string(), code); @@ -608,6 +731,16 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() LOG_TRACE(log, "Node limits check finished"); } +bool S3QueueFilesMetadata::checkSettings(const S3QueueSettings & settings) const +{ + return mode == settings.mode + && max_set_size == settings.s3queue_tracked_files_limit.value + && max_set_age_sec == settings.s3queue_tracked_file_ttl_sec.value + && max_loading_retries == settings.s3queue_loading_retries.value + && min_cleanup_interval_ms == settings.s3queue_cleanup_interval_min_ms.value + && max_cleanup_interval_ms == settings.s3queue_cleanup_interval_max_ms.value; +} + } #endif diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index 079020514c6..5f3a6448e49 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace fs = std::filesystem; namespace Poco { class Logger; } @@ -18,7 +19,7 @@ class StorageS3Queue; class S3QueueFilesMetadata { public: - S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_, ContextPtr context); + S3QueueFilesMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_); ~S3QueueFilesMetadata(); @@ -52,10 +53,11 @@ public: std::shared_ptr getFileStatus(const std::string & path); - FileStatuses getFileStateses() const; + FileStatuses getFileStateses() const { return local_file_statuses.getAll(); } + + bool checkSettings(const S3QueueSettings & settings) const; private: - const StorageS3Queue * storage; const S3QueueMode mode; const UInt64 max_set_size; const UInt64 max_set_age_sec; @@ -73,16 +75,22 @@ private: std::atomic_bool shutdown = false; BackgroundSchedulePool::TaskHolder task; - FileStatuses file_statuses; - mutable std::mutex file_statuses_mutex; + std::string getNodeName(const std::string & path); - bool trySetFileAsProcessingForOrderedMode(const std::string & path); - bool trySetFileAsProcessingForUnorderedMode(const std::string & path); + zkutil::ZooKeeperPtr getZooKeeper() const; void setFileProcessedForOrderedMode(const std::string & path); void setFileProcessedForUnorderedMode(const std::string & path); - std::string getNodeName(const std::string & path); + enum class SetFileProcessingResult + { + Success, + ProcessingByOtherNode, + AlreadyProcessed, + AlreadyFailed, + }; + SetFileProcessingResult trySetFileAsProcessingForOrderedMode(const std::string & path); + SetFileProcessingResult trySetFileAsProcessingForUnorderedMode(const std::string & path); struct NodeMetadata { @@ -99,6 +107,19 @@ private: void cleanupThreadFunc(); void cleanupThreadFuncImpl(); + + struct LocalFileStatuses + { + FileStatuses file_statuses; + mutable std::mutex mutex; + + FileStatuses getAll() const; + std::shared_ptr get(const std::string & filename, bool create); + bool remove(const std::string & filename, bool if_exists); + FileStatus::State state(const std::string & filename) const; + std::unique_lock lock() const; + }; + LocalFileStatuses local_file_statuses; }; } diff --git a/src/Storages/S3Queue/S3QueueMetadataFactory.cpp b/src/Storages/S3Queue/S3QueueMetadataFactory.cpp new file mode 100644 index 00000000000..b1b0c4aef23 --- /dev/null +++ b/src/Storages/S3Queue/S3QueueMetadataFactory.cpp @@ -0,0 +1,29 @@ +#include + +namespace DB +{ + +S3QueueMetadataFactory & S3QueueMetadataFactory::instance() +{ + static S3QueueMetadataFactory ret; + return ret; +} + +S3QueueMetadataFactory::MetadataPtr +S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings) +{ + std::lock_guard lock(mutex); + auto it = metadata_by_path.find(zookeeper_path); + if (it == metadata_by_path.end()) + { + it = metadata_by_path.emplace(zookeeper_path, std::make_shared(fs::path(zookeeper_path), settings)).first; + } + else if (!it->second->checkSettings(settings)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with the same `s3queue_zookeeper_path` " + "was already created but with different settings"); + } + return it->second; +} + +} diff --git a/src/Storages/S3Queue/S3QueueMetadataFactory.h b/src/Storages/S3Queue/S3QueueMetadataFactory.h new file mode 100644 index 00000000000..e7a473d863d --- /dev/null +++ b/src/Storages/S3Queue/S3QueueMetadataFactory.h @@ -0,0 +1,26 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +class S3QueueMetadataFactory final : private boost::noncopyable +{ +public: + using MetadataPtr = std::shared_ptr; + using MetadataByPath = std::unordered_map; + + static S3QueueMetadataFactory & instance(); + + MetadataPtr getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings); + + MetadataByPath getAll() { return metadata_by_path; } + +private: + MetadataByPath metadata_by_path; + std::mutex mutex; +}; + +} diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index 0bc98006463..a15cc1bea9b 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -85,7 +86,7 @@ StorageS3Queue::StorageS3Queue( , s3queue_settings(std::move(s3queue_settings_)) , zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings)) , after_processing(s3queue_settings->after_processing) - , files_metadata(std::make_shared(this, *s3queue_settings, context_)) + , files_metadata(S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings)) , configuration{configuration_} , format_settings(format_settings_) , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms) diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 8ec8bfeeb0a..2a62078fcca 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -54,8 +54,6 @@ public: zkutil::ZooKeeperPtr getZooKeeper() const; - S3QueueFilesMetadata::FileStatuses getFileStatuses() const { return files_metadata->getFileStateses(); } - private: using FileIterator = StorageS3QueueSource::FileIterator; diff --git a/src/Storages/System/StorageSystemS3Queue.cpp b/src/Storages/System/StorageSystemS3Queue.cpp index 6f320bbd5f1..faba5bc671b 100644 --- a/src/Storages/System/StorageSystemS3Queue.cpp +++ b/src/Storages/System/StorageSystemS3Queue.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -24,8 +25,7 @@ namespace DB NamesAndTypesList StorageSystemS3Queue::getNamesAndTypes() { return { - {"database", std::make_shared()}, - {"table", std::make_shared()}, + {"zookeeper_path", std::make_shared()}, {"file_name", std::make_shared()}, {"rows_processed", std::make_shared()}, {"status", std::make_shared()}, @@ -40,47 +40,29 @@ StorageSystemS3Queue::StorageSystemS3Queue(const StorageID & table_id_) { } -void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const +void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const { - const auto access = context->getAccess(); - const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES); - - if (show_tables_granted) + for (const auto & [zookeeper_path, metadata] : S3QueueMetadataFactory::instance().getAll()) { - auto databases = DatabaseCatalog::instance().getDatabases(); - for (const auto & db : databases) + for (const auto & [file_name, file_status] : metadata->getFileStateses()) { - for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) - { - StoragePtr storage = iterator->table(); - if (auto * s3queue_table = dynamic_cast(storage.get())) - { - const auto & table_id = s3queue_table->getStorageID(); - auto file_statuses = s3queue_table->getFileStatuses(); - for (const auto & [file_name, file_status] : file_statuses) - { - size_t i = 0; - res_columns[i++]->insert(table_id.database_name); - res_columns[i++]->insert(table_id.table_name); - res_columns[i++]->insert(file_name); - res_columns[i++]->insert(file_status->processed_rows); - res_columns[i++]->insert(magic_enum::enum_name(file_status->state)); + size_t i = 0; + res_columns[i++]->insert(zookeeper_path); + res_columns[i++]->insert(file_name); + res_columns[i++]->insert(file_status->processed_rows); + res_columns[i++]->insert(magic_enum::enum_name(file_status->state)); - if (file_status->processing_start_time) - res_columns[i++]->insert(file_status->processing_start_time); - else - res_columns[i++]->insertDefault(); - if (file_status->processing_end_time) - res_columns[i++]->insert(file_status->processing_end_time); - else - res_columns[i++]->insertDefault(); + if (file_status->processing_start_time) + res_columns[i++]->insert(file_status->processing_start_time); + else + res_columns[i++]->insertDefault(); + if (file_status->processing_end_time) + res_columns[i++]->insert(file_status->processing_end_time); + else + res_columns[i++]->insertDefault(); - ProfileEvents::dumpToMapColumn(file_status->profile_counters.getPartiallyAtomicSnapshot(), res_columns[i++].get(), true); - } - } - } + ProfileEvents::dumpToMapColumn(file_status->profile_counters.getPartiallyAtomicSnapshot(), res_columns[i++].get(), true); } - } }