diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 24bf5b1eb81..52290555ad4 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -157,10 +157,14 @@ void StorageDistributedDirectoryMonitor::run() try { do_sleep = !processFiles(files, metric_pending_files); + + std::unique_lock metrics_lock(metrics_mutex); last_exception = std::exception_ptr{}; } catch (...) { + std::unique_lock metrics_lock(metrics_mutex); + do_sleep = true; ++error_count; sleep_time = std::min( @@ -178,6 +182,8 @@ void StorageDistributedDirectoryMonitor::run() const auto now = std::chrono::system_clock::now(); if (now - last_decrease_time > decrease_error_count_period) { + std::unique_lock metrics_lock(metrics_mutex); + error_count /= 2; last_decrease_time = now; } @@ -262,13 +268,16 @@ std::map StorageDistributedDirectoryMonitor::getFiles(Curre } } - files_count = files.size(); - bytes_count = new_bytes_count; - /// Note: the value of this metric will be kept if this function will throw an exception. /// This is needed, because in case of exception, files still pending. metric_pending_files.changeTo(files.size()); + { + std::unique_lock metrics_lock(metrics_mutex); + files_count = files.size(); + bytes_count = new_bytes_count; + } + return files; } bool StorageDistributedDirectoryMonitor::processFiles(const std::map & files, CurrentMetrics::Increment & metric_pending_files) @@ -619,6 +628,36 @@ bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms) return task_handle->scheduleAfter(ms, false); } +std::string StorageDistributedDirectoryMonitor::getPath() const +{ + std::unique_lock metrics_lock(metrics_mutex); + return path; +} +std::exception_ptr StorageDistributedDirectoryMonitor::getLastException() const +{ + std::unique_lock metrics_lock(metrics_mutex); + return last_exception; +} +size_t StorageDistributedDirectoryMonitor::getErrorCount() const +{ + std::unique_lock metrics_lock(metrics_mutex); + return error_count; +} +size_t StorageDistributedDirectoryMonitor::getFilesCount() const +{ + std::unique_lock metrics_lock(metrics_mutex); + return files_count; +} +size_t StorageDistributedDirectoryMonitor::getBytesCount() const +{ + std::unique_lock metrics_lock(metrics_mutex); + return bytes_count; +} +bool StorageDistributedDirectoryMonitor::isBlocked() const +{ + return monitor_blocker.isCancelled(); +} + void StorageDistributedDirectoryMonitor::processFilesWithBatching( const std::map & files, CurrentMetrics::Increment & metric_pending_files) @@ -760,7 +799,10 @@ void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_path task_handle->deactivate(); - path = new_path; + { + std::unique_lock metrics_lock(metrics_mutex); + path = new_path; + } current_batch_file_path = path + "current_batch.txt"; task_handle->activateAndSchedule(); diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index a610efeb7fb..7725a01b6b4 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -39,13 +39,12 @@ public: bool scheduleAfter(size_t ms); /// system.distribution_queue interface - std::string getPath() const { return path; } - /// Racy but ok - std::exception_ptr getLastException() const { return last_exception; } - size_t getErrorCount() const { return error_count; } - size_t getFilesCount() const { return files_count; } - size_t getBytesCount() const { return bytes_count; } - size_t isBlocked() const { return monitor_blocker.isCancelled(); } + std::string getPath() const; + std::exception_ptr getLastException() const; + size_t getErrorCount() const; + size_t getFilesCount() const; + size_t getBytesCount() const; + bool isBlocked() const; private: void run(); @@ -73,6 +72,7 @@ private: struct BatchHeader; struct Batch; + mutable std::mutex metrics_mutex; size_t error_count = 0; size_t files_count = 0; size_t bytes_count = 0;