Make system.distribution_queue metrics non racy

This commit is contained in:
Azat Khuzhin 2020-06-03 11:22:48 +03:00
parent 09c3ca9c6c
commit f0050adc51
2 changed files with 53 additions and 11 deletions

View File

@ -157,10 +157,14 @@ void StorageDistributedDirectoryMonitor::run()
try try
{ {
do_sleep = !processFiles(files, metric_pending_files); do_sleep = !processFiles(files, metric_pending_files);
std::unique_lock metrics_lock(metrics_mutex);
last_exception = std::exception_ptr{}; last_exception = std::exception_ptr{};
} }
catch (...) catch (...)
{ {
std::unique_lock metrics_lock(metrics_mutex);
do_sleep = true; do_sleep = true;
++error_count; ++error_count;
sleep_time = std::min( sleep_time = std::min(
@ -178,6 +182,8 @@ void StorageDistributedDirectoryMonitor::run()
const auto now = std::chrono::system_clock::now(); const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period) if (now - last_decrease_time > decrease_error_count_period)
{ {
std::unique_lock metrics_lock(metrics_mutex);
error_count /= 2; error_count /= 2;
last_decrease_time = now; last_decrease_time = now;
} }
@ -262,13 +268,16 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles(Curre
} }
} }
files_count = files.size();
bytes_count = new_bytes_count;
/// Note: the value of this metric will be kept if this function will throw an exception. /// Note: the value of this metric will be kept if this function will throw an exception.
/// This is needed, because in case of exception, files still pending. /// This is needed, because in case of exception, files still pending.
metric_pending_files.changeTo(files.size()); metric_pending_files.changeTo(files.size());
{
std::unique_lock metrics_lock(metrics_mutex);
files_count = files.size();
bytes_count = new_bytes_count;
}
return files; return files;
} }
bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files, CurrentMetrics::Increment & metric_pending_files) bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files, CurrentMetrics::Increment & metric_pending_files)
@ -619,6 +628,36 @@ bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms)
return task_handle->scheduleAfter(ms, false); return task_handle->scheduleAfter(ms, false);
} }
std::string StorageDistributedDirectoryMonitor::getPath() const
{
std::unique_lock metrics_lock(metrics_mutex);
return path;
}
std::exception_ptr StorageDistributedDirectoryMonitor::getLastException() const
{
std::unique_lock metrics_lock(metrics_mutex);
return last_exception;
}
size_t StorageDistributedDirectoryMonitor::getErrorCount() const
{
std::unique_lock metrics_lock(metrics_mutex);
return error_count;
}
size_t StorageDistributedDirectoryMonitor::getFilesCount() const
{
std::unique_lock metrics_lock(metrics_mutex);
return files_count;
}
size_t StorageDistributedDirectoryMonitor::getBytesCount() const
{
std::unique_lock metrics_lock(metrics_mutex);
return bytes_count;
}
bool StorageDistributedDirectoryMonitor::isBlocked() const
{
return monitor_blocker.isCancelled();
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching( void StorageDistributedDirectoryMonitor::processFilesWithBatching(
const std::map<UInt64, std::string> & files, const std::map<UInt64, std::string> & files,
CurrentMetrics::Increment & metric_pending_files) CurrentMetrics::Increment & metric_pending_files)
@ -760,7 +799,10 @@ void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_path
task_handle->deactivate(); task_handle->deactivate();
{
std::unique_lock metrics_lock(metrics_mutex);
path = new_path; path = new_path;
}
current_batch_file_path = path + "current_batch.txt"; current_batch_file_path = path + "current_batch.txt";
task_handle->activateAndSchedule(); task_handle->activateAndSchedule();

View File

@ -39,13 +39,12 @@ public:
bool scheduleAfter(size_t ms); bool scheduleAfter(size_t ms);
/// system.distribution_queue interface /// system.distribution_queue interface
std::string getPath() const { return path; } std::string getPath() const;
/// Racy but ok std::exception_ptr getLastException() const;
std::exception_ptr getLastException() const { return last_exception; } size_t getErrorCount() const;
size_t getErrorCount() const { return error_count; } size_t getFilesCount() const;
size_t getFilesCount() const { return files_count; } size_t getBytesCount() const;
size_t getBytesCount() const { return bytes_count; } bool isBlocked() const;
size_t isBlocked() const { return monitor_blocker.isCancelled(); }
private: private:
void run(); void run();
@ -73,6 +72,7 @@ private:
struct BatchHeader; struct BatchHeader;
struct Batch; struct Batch;
mutable std::mutex metrics_mutex;
size_t error_count = 0; size_t error_count = 0;
size_t files_count = 0; size_t files_count = 0;
size_t bytes_count = 0; size_t bytes_count = 0;