Add new metrics BrokenDistributedBytesToInsert/DistributedBytesToInsert

Useful to see at the server status overall.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-05-25 17:23:39 +02:00
parent 0f37be5492
commit 69aec7af9b
3 changed files with 13 additions and 0 deletions

View File

@ -145,6 +145,8 @@
M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool.") \ M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool.") \
M(OutdatedPartsLoadingThreads, "Number of threads in the threadpool for loading Outdated data parts.") \ M(OutdatedPartsLoadingThreads, "Number of threads in the threadpool for loading Outdated data parts.") \
M(OutdatedPartsLoadingThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \ M(OutdatedPartsLoadingThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
M(DistributedBytesToInsert, "Number of pending bytes to process for asynchronous insertion into Distributed tables. Number of bytes for every shard is summed.") \
M(BrokenDistributedBytesToInsert, "Number of bytes for asynchronous insertion into Distributed tables that has been marked as broken. Number of bytes for every shard is summed.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \ M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. Number of files for every shard is summed.") \ M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \ M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \

View File

@ -35,6 +35,8 @@ namespace CurrentMetrics
extern const Metric DistributedSend; extern const Metric DistributedSend;
extern const Metric DistributedFilesToInsert; extern const Metric DistributedFilesToInsert;
extern const Metric BrokenDistributedFilesToInsert; extern const Metric BrokenDistributedFilesToInsert;
extern const Metric DistributedBytesToInsert;
extern const Metric BrokenDistributedBytesToInsert;
} }
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -138,7 +140,9 @@ DistributedAsyncInsertDirectoryQueue::DistributedAsyncInsertDirectoryQueue(
, max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds()) , max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds())
, log(&Poco::Logger::get(getLoggerName())) , log(&Poco::Logger::get(getLoggerName()))
, monitor_blocker(monitor_blocker_) , monitor_blocker(monitor_blocker_)
, metric_pending_bytes(CurrentMetrics::DistributedBytesToInsert, 0)
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0) , metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
, metric_broken_bytes(CurrentMetrics::BrokenDistributedBytesToInsert, 0)
, metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0) , metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0)
{ {
fs::create_directory(broken_path); fs::create_directory(broken_path);
@ -357,6 +361,7 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk()
LOG_TRACE(log, "Files set to {}", pending_files.size()); LOG_TRACE(log, "Files set to {}", pending_files.size());
LOG_TRACE(log, "Bytes set to {}", bytes_count); LOG_TRACE(log, "Bytes set to {}", bytes_count);
metric_pending_bytes.changeTo(bytes_count);
metric_pending_files.changeTo(pending_files.size()); metric_pending_files.changeTo(pending_files.size());
status.files_count = pending_files.size(); status.files_count = pending_files.size();
status.bytes_count = bytes_count; status.bytes_count = bytes_count;
@ -380,6 +385,7 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk()
LOG_TRACE(log, "Broken bytes set to {}", broken_bytes_count); LOG_TRACE(log, "Broken bytes set to {}", broken_bytes_count);
metric_broken_files.changeTo(broken_files); metric_broken_files.changeTo(broken_files);
metric_broken_bytes.changeTo(broken_bytes_count);
status.broken_files_count = broken_files; status.broken_files_count = broken_files;
status.broken_bytes_count = broken_bytes_count; status.broken_bytes_count = broken_bytes_count;
} }
@ -520,6 +526,7 @@ bool DistributedAsyncInsertDirectoryQueue::addFileAndSchedule(const std::string
{ {
std::lock_guard lock(status_mutex); std::lock_guard lock(status_mutex);
metric_pending_files.add(); metric_pending_files.add();
metric_pending_bytes.add(file_size);
status.bytes_count += file_size; status.bytes_count += file_size;
++status.files_count; ++status.files_count;
} }
@ -679,6 +686,7 @@ void DistributedAsyncInsertDirectoryQueue::markAsBroken(const std::string & file
status.broken_bytes_count += file_size; status.broken_bytes_count += file_size;
metric_broken_files.add(); metric_broken_files.add();
metric_broken_bytes.add(file_size);
} }
fs::rename(file_path, broken_file_path); fs::rename(file_path, broken_file_path);
@ -692,6 +700,7 @@ void DistributedAsyncInsertDirectoryQueue::markAsSend(const std::string & file_p
{ {
std::lock_guard status_lock(status_mutex); std::lock_guard status_lock(status_mutex);
metric_pending_files.sub(); metric_pending_files.sub();
metric_pending_bytes.sub(file_size);
--status.files_count; --status.files_count;
status.bytes_count -= file_size; status.bytes_count -= file_size;
} }

View File

@ -149,7 +149,9 @@ private:
BackgroundSchedulePoolTaskHolder task_handle; BackgroundSchedulePoolTaskHolder task_handle;
CurrentMetrics::Increment metric_pending_bytes;
CurrentMetrics::Increment metric_pending_files; CurrentMetrics::Increment metric_pending_files;
CurrentMetrics::Increment metric_broken_bytes;
CurrentMetrics::Increment metric_broken_files; CurrentMetrics::Increment metric_broken_files;
}; };