From 74269882f7f353bef0b7dc5603510d765d84c9aa Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 4 May 2021 22:16:36 +0300 Subject: [PATCH] Add broken_data_files/broken_data_compressed_bytes into distribution_queue --- .../system-tables/distribution_queue.md | 4 ++ src/Storages/Distributed/DirectoryMonitor.cpp | 67 +++++++++---------- src/Storages/Distributed/DirectoryMonitor.h | 27 +++++--- .../System/StorageSystemDistributionQueue.cpp | 4 ++ .../01293_system_distribution_queue.reference | 6 +- .../01293_system_distribution_queue.sql | 6 +- 6 files changed, 62 insertions(+), 52 deletions(-) diff --git a/docs/en/operations/system-tables/distribution_queue.md b/docs/en/operations/system-tables/distribution_queue.md index fdc6a134da2..3b09c20874c 100644 --- a/docs/en/operations/system-tables/distribution_queue.md +++ b/docs/en/operations/system-tables/distribution_queue.md @@ -18,6 +18,10 @@ Columns: - `data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in local files, in bytes. +- `broken_data_files` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of files that has been marked as broken (due to an error). + +- `broken_data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in broken files, in bytes. + - `last_exception` ([String](../../sql-reference/data-types/string.md)) — Text message about the last error that occurred (if any). **Example** diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 2b7599ffa20..e75d05a1226 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -368,20 +368,20 @@ void StorageDistributedDirectoryMonitor::run() { do_sleep = !processFiles(files); - std::lock_guard metrics_lock(metrics_mutex); - last_exception = std::exception_ptr{}; + std::lock_guard status_lock(status_mutex); + status.last_exception = std::exception_ptr{}; } catch (...) { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); do_sleep = true; - ++error_count; + ++status.error_count; sleep_time = std::min( - std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))}, + std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(status.error_count))}, max_sleep_time); tryLogCurrentException(getLoggerName().data()); - last_exception = std::current_exception(); + status.last_exception = std::current_exception(); } } else @@ -392,9 +392,9 @@ void StorageDistributedDirectoryMonitor::run() const auto now = std::chrono::system_clock::now(); if (now - last_decrease_time > decrease_error_count_period) { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); - error_count /= 2; + status.error_count /= 2; last_decrease_time = now; } @@ -502,16 +502,16 @@ std::map StorageDistributedDirectoryMonitor::getFiles() } { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); - if (files_count != files.size()) - LOG_TRACE(log, "Files set to {} (was {})", files.size(), files_count); - if (bytes_count != new_bytes_count) - LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, bytes_count); + if (status.files_count != files.size()) + LOG_TRACE(log, "Files set to {} (was {})", files.size(), status.files_count); + if (status.bytes_count != new_bytes_count) + LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, status.bytes_count); metric_pending_files.changeTo(files.size()); - files_count = files.size(); - bytes_count = new_bytes_count; + status.files_count = files.size(); + status.bytes_count = new_bytes_count; } return files; @@ -828,10 +828,10 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t return false; { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); metric_pending_files.add(); - bytes_count += file_size; - ++files_count; + status.bytes_count += file_size; + ++status.files_count; } return task_handle->scheduleAfter(ms, false); @@ -839,16 +839,9 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::getStatus() { - std::lock_guard metrics_lock(metrics_mutex); - - return Status{ - path, - last_exception, - error_count, - files_count, - bytes_count, - monitor_blocker.isCancelled(), - }; + std::lock_guard status_lock(status_mutex); + Status current_status{status, path, monitor_blocker.isCancelled()}; + return current_status; } void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map & files) @@ -977,11 +970,15 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p Poco::File file(file_path); { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); size_t file_size = file.getSize(); - --files_count; - bytes_count -= file_size; + + --status.files_count; + status.bytes_count -= file_size; + + ++status.broken_files_count; + status.broken_bytes_count += file_size; } file.renameTo(broken_file_path); @@ -995,10 +992,10 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat size_t file_size = file.getSize(); { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); metric_pending_files.sub(); - --files_count; - bytes_count -= file_size; + --status.files_count; + status.bytes_count -= file_size; } file.remove(); @@ -1027,7 +1024,7 @@ void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_rela std::lock_guard lock{mutex}; { - std::lock_guard metrics_lock(metrics_mutex); + std::lock_guard status_lock(status_mutex); relative_path = new_relative_path; path = disk->getPath() + relative_path + '/'; } diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index 1ccac4522d7..e4139f79722 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -50,15 +50,23 @@ public: /// For scheduling via DistributedBlockOutputStream bool addAndSchedule(size_t file_size, size_t ms); + struct InternalStatus + { + std::exception_ptr last_exception; + + size_t error_count = 0; + + size_t files_count = 0; + size_t bytes_count = 0; + + size_t broken_files_count = 0; + size_t broken_bytes_count = 0; + }; /// system.distribution_queue interface - struct Status + struct Status : InternalStatus { std::string path; - std::exception_ptr last_exception; - size_t error_count; - size_t files_count; - size_t bytes_count; - bool is_blocked; + bool is_blocked = false; }; Status getStatus(); @@ -92,11 +100,8 @@ private: struct BatchHeader; struct Batch; - std::mutex metrics_mutex; - size_t error_count = 0; - size_t files_count = 0; - size_t bytes_count = 0; - std::exception_ptr last_exception; + std::mutex status_mutex; + InternalStatus status; const std::chrono::milliseconds default_sleep_time; std::chrono::milliseconds sleep_time; diff --git a/src/Storages/System/StorageSystemDistributionQueue.cpp b/src/Storages/System/StorageSystemDistributionQueue.cpp index 9c0f8818011..d8879c3655e 100644 --- a/src/Storages/System/StorageSystemDistributionQueue.cpp +++ b/src/Storages/System/StorageSystemDistributionQueue.cpp @@ -98,6 +98,8 @@ NamesAndTypesList StorageSystemDistributionQueue::getNamesAndTypes() { "error_count", std::make_shared() }, { "data_files", std::make_shared() }, { "data_compressed_bytes", std::make_shared() }, + { "broken_data_files", std::make_shared() }, + { "broken_data_compressed_bytes", std::make_shared() }, { "last_exception", std::make_shared() }, }; } @@ -181,6 +183,8 @@ void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, Cont res_columns[col_num++]->insert(status.error_count); res_columns[col_num++]->insert(status.files_count); res_columns[col_num++]->insert(status.bytes_count); + res_columns[col_num++]->insert(status.broken_files_count); + res_columns[col_num++]->insert(status.broken_bytes_count); if (status.last_exception) res_columns[col_num++]->insert(getExceptionMessage(status.last_exception, false)); diff --git a/tests/queries/0_stateless/01293_system_distribution_queue.reference b/tests/queries/0_stateless/01293_system_distribution_queue.reference index a2c1e5f2a7b..4a51abdb745 100644 --- a/tests/queries/0_stateless/01293_system_distribution_queue.reference +++ b/tests/queries/0_stateless/01293_system_distribution_queue.reference @@ -1,6 +1,6 @@ INSERT -1 0 1 1 +1 0 1 1 0 0 FLUSH -1 0 0 0 +1 0 0 0 0 0 UNBLOCK -0 0 0 0 +0 0 0 0 0 0 diff --git a/tests/queries/0_stateless/01293_system_distribution_queue.sql b/tests/queries/0_stateless/01293_system_distribution_queue.sql index dc63dece960..b16433029bf 100644 --- a/tests/queries/0_stateless/01293_system_distribution_queue.sql +++ b/tests/queries/0_stateless/01293_system_distribution_queue.sql @@ -10,15 +10,15 @@ select * from system.distribution_queue; select 'INSERT'; system stop distributed sends dist_01293; insert into dist_01293 select * from numbers(10); -select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue where database = currentDatabase(); +select is_blocked, error_count, data_files, data_compressed_bytes>100, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase(); system flush distributed dist_01293; select 'FLUSH'; -select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase(); +select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase(); select 'UNBLOCK'; system start distributed sends dist_01293; -select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase(); +select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase(); drop table null_01293; drop table dist_01293;