diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 6fe98c53b3e..f2c547c87db 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -435,7 +435,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri } -std::map StorageDistributedDirectoryMonitor::getFiles() +std::map StorageDistributedDirectoryMonitor::getFiles(bool lock_metrics) const { std::map files; size_t new_bytes_count = 0; @@ -456,7 +456,9 @@ std::map StorageDistributedDirectoryMonitor::getFiles() metric_pending_files.changeTo(files.size()); { - std::unique_lock metrics_lock(metrics_mutex); + std::unique_lock metrics_lock(metrics_mutex, std::defer_lock); + if (lock_metrics) + metrics_lock.lock(); files_count = files.size(); bytes_count = new_bytes_count; } @@ -758,6 +760,9 @@ StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::g { std::unique_lock metrics_lock(metrics_mutex); + /// Recalculate counters + getFiles(false /* metrics_lock already acquired */); + return Status{ path, last_exception, diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index c73b79761ca..16026e2de26 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -65,7 +65,7 @@ public: private: void run(); - std::map getFiles(); + std::map getFiles(bool lock_metrics = true) const; bool processFiles(const std::map & files); void processFile(const std::string & file_path); void processFilesWithBatching(const std::map & files); @@ -93,8 +93,8 @@ private: mutable std::mutex metrics_mutex; size_t error_count = 0; - size_t files_count = 0; - size_t bytes_count = 0; + mutable size_t files_count = 0; + mutable size_t bytes_count = 0; std::exception_ptr last_exception; const std::chrono::milliseconds default_sleep_time; @@ -108,7 +108,7 @@ private: BackgroundSchedulePoolTaskHolder task_handle; - CurrentMetrics::Increment metric_pending_files; + mutable CurrentMetrics::Increment metric_pending_files; friend class DirectoryMonitorBlockInputStream; }; diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 1d3f5968536..8b7cb2fde02 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -982,7 +982,6 @@ void StorageDistributed::throwInsertIfNeeded() const if (!distributed_settings.bytes_to_throw_insert) return; - /// TODO: update the counters UInt64 total_bytes = *totalBytes(global_context.getSettingsRef()); if (total_bytes > distributed_settings.bytes_to_throw_insert) { diff --git a/tests/queries/0_stateless/00753_system_columns_and_system_tables.reference b/tests/queries/0_stateless/00753_system_columns_and_system_tables.reference index ea105b0679c..e92582f658a 100644 --- a/tests/queries/0_stateless/00753_system_columns_and_system_tables.reference +++ b/tests/queries/0_stateless/00753_system_columns_and_system_tables.reference @@ -47,5 +47,5 @@ Check total_bytes/total_rows for Join 10240 100 Check total_bytes/total_rows for Distributed 0 \N -629 \N +581 \N 0 \N diff --git a/tests/queries/0_stateless/00753_system_columns_and_system_tables.sql b/tests/queries/0_stateless/00753_system_columns_and_system_tables.sql index 7a774cc7cab..dc0f3f96f98 100644 --- a/tests/queries/0_stateless/00753_system_columns_and_system_tables.sql +++ b/tests/queries/0_stateless/00753_system_columns_and_system_tables.sql @@ -128,16 +128,11 @@ SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tab DROP TABLE check_system_tables; SELECT 'Check total_bytes/total_rows for Distributed'; --- metrics updated only after distributed_directory_monitor_sleep_time_ms -SET distributed_directory_monitor_sleep_time_ms=10; CREATE TABLE check_system_tables_null (key Int) Engine=Null(); CREATE TABLE check_system_tables AS check_system_tables_null Engine=Distributed(test_shard_localhost, currentDatabase(), check_system_tables_null); SYSTEM STOP DISTRIBUTED SENDS check_system_tables; SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables'; INSERT INTO check_system_tables SELECT * FROM numbers(1) SETTINGS prefer_localhost_replica=0; --- 1 second should guarantee metrics update --- XXX: but this is kind of quirk, way more better will be account this metrics without any delays. -SELECT sleep(1) FORMAT Null; SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables'; SYSTEM FLUSH DISTRIBUTED check_system_tables; SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables'; diff --git a/tests/queries/0_stateless/01293_system_distribution_queue.sql b/tests/queries/0_stateless/01293_system_distribution_queue.sql index 8f84bbac41f..7b37e70df9a 100644 --- a/tests/queries/0_stateless/01293_system_distribution_queue.sql +++ b/tests/queries/0_stateless/01293_system_distribution_queue.sql @@ -10,11 +10,6 @@ select * from system.distribution_queue; select 'INSERT'; system stop distributed sends dist_01293; insert into dist_01293 select * from numbers(10); --- metrics updated only after distributed_directory_monitor_sleep_time_ms -set distributed_directory_monitor_sleep_time_ms=10; --- 1 second should guarantee metrics update --- XXX: but this is kind of quirk, way more better will be account this metrics without any delays. -select sleep(1) format Null; select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue; system flush distributed dist_01293;