diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index f3996975882..b67d3283ac9 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -94,6 +94,7 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( , log{&Poco::Logger::get(getLoggerName())} , monitor_blocker(monitor_blocker_) , bg_pool(bg_pool_) + , metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0) { task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); }); task_handle->activateAndSchedule(); @@ -114,16 +115,15 @@ void StorageDistributedDirectoryMonitor::flushAllData() if (quit) return; - CurrentMetrics::Increment metric_pending_files{CurrentMetrics::DistributedFilesToInsert, 0}; std::unique_lock lock{mutex}; - const auto & files = getFiles(metric_pending_files); + const auto & files = getFiles(); if (!files.empty()) { - processFiles(files, metric_pending_files); + processFiles(files); /// Update counters - getFiles(metric_pending_files); + getFiles(); } } @@ -143,15 +143,12 @@ void StorageDistributedDirectoryMonitor::run() { std::unique_lock lock{mutex}; - /// This metric will be updated with the number of pending files later. - CurrentMetrics::Increment metric_pending_files{CurrentMetrics::DistributedFilesToInsert, 0}; - bool do_sleep = false; while (!quit) { do_sleep = true; - const auto & files = getFiles(metric_pending_files); + const auto & files = getFiles(); if (files.empty()) break; @@ -159,7 +156,7 @@ void StorageDistributedDirectoryMonitor::run() { try { - do_sleep = !processFiles(files, metric_pending_files); + do_sleep = !processFiles(files); std::unique_lock metrics_lock(metrics_mutex); last_exception = std::exception_ptr{}; @@ -196,7 +193,7 @@ void StorageDistributedDirectoryMonitor::run() } /// Update counters - getFiles(metric_pending_files); + getFiles(); if (!quit && do_sleep) task_handle->scheduleAfter(sleep_time.count()); @@ -253,7 +250,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri } -std::map StorageDistributedDirectoryMonitor::getFiles(CurrentMetrics::Increment & metric_pending_files) +std::map StorageDistributedDirectoryMonitor::getFiles() { std::map files; size_t new_bytes_count = 0; @@ -271,8 +268,6 @@ std::map StorageDistributedDirectoryMonitor::getFiles(Curre } } - /// Note: the value of this metric will be kept if this function will throw an exception. - /// This is needed, because in case of exception, files still pending. metric_pending_files.changeTo(files.size()); { @@ -283,11 +278,11 @@ std::map StorageDistributedDirectoryMonitor::getFiles(Curre return files; } -bool StorageDistributedDirectoryMonitor::processFiles(const std::map & files, CurrentMetrics::Increment & metric_pending_files) +bool StorageDistributedDirectoryMonitor::processFiles(const std::map & files) { if (should_batch_inserts) { - processFilesWithBatching(files, metric_pending_files); + processFilesWithBatching(files); } else { @@ -296,14 +291,14 @@ bool StorageDistributedDirectoryMonitor::processFiles(const std::mapgetSettingsRef()); @@ -645,9 +640,7 @@ StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::g }; } -void StorageDistributedDirectoryMonitor::processFilesWithBatching( - const std::map & files, - CurrentMetrics::Increment & metric_pending_files) +void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map & files) { std::unordered_set file_indices_to_skip; diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index 960d82f0716..a6fb78c8db3 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -57,10 +57,10 @@ public: private: void run(); - std::map getFiles(CurrentMetrics::Increment & metric_pending_files); - bool processFiles(const std::map & files, CurrentMetrics::Increment & metric_pending_files); - void processFile(const std::string & file_path, CurrentMetrics::Increment & metric_pending_files); - void processFilesWithBatching(const std::map & files, CurrentMetrics::Increment & metric_pending_files); + std::map getFiles(); + bool processFiles(const std::map & files); + void processFile(const std::string & file_path); + void processFilesWithBatching(const std::map & files); static bool isFileBrokenErrorCode(int code); void markAsBroken(const std::string & file_path) const; @@ -98,6 +98,8 @@ private: BackgroundSchedulePool & bg_pool; BackgroundSchedulePoolTaskHolder task_handle; + CurrentMetrics::Increment metric_pending_files; + /// Read insert query and insert settings for backward compatible. static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Poco::Logger * log);