Fix DistributedFilesToInsert metric (zeroed when it should not)

CurrentMetrics::Increment add amount for specified metric only for the
lifetime of the object, but this is not the intention, since
DistributedFilesToInsert is a gauge and after #10263 it can exit from
the callback (and enter again later, for example after SYSTEM STOP
DISTRIBUTED SEND it will always exit from it, until SYSTEM START
DISTRIBUTED SEND).

So make Increment member of a class (this will also fix possible issues
with substructing value on DROP TABLE).
This commit is contained in:
Azat Khuzhin 2020-08-27 00:43:00 +03:00
parent d04d652ad4
commit a588947fe2
2 changed files with 19 additions and 24 deletions

View File

@ -94,6 +94,7 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, log{&Poco::Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker_)
, bg_pool(bg_pool_)
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
{
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();
@ -114,16 +115,15 @@ void StorageDistributedDirectoryMonitor::flushAllData()
if (quit)
return;
CurrentMetrics::Increment metric_pending_files{CurrentMetrics::DistributedFilesToInsert, 0};
std::unique_lock lock{mutex};
const auto & files = getFiles(metric_pending_files);
const auto & files = getFiles();
if (!files.empty())
{
processFiles(files, metric_pending_files);
processFiles(files);
/// Update counters
getFiles(metric_pending_files);
getFiles();
}
}
@ -143,15 +143,12 @@ void StorageDistributedDirectoryMonitor::run()
{
std::unique_lock lock{mutex};
/// This metric will be updated with the number of pending files later.
CurrentMetrics::Increment metric_pending_files{CurrentMetrics::DistributedFilesToInsert, 0};
bool do_sleep = false;
while (!quit)
{
do_sleep = true;
const auto & files = getFiles(metric_pending_files);
const auto & files = getFiles();
if (files.empty())
break;
@ -159,7 +156,7 @@ void StorageDistributedDirectoryMonitor::run()
{
try
{
do_sleep = !processFiles(files, metric_pending_files);
do_sleep = !processFiles(files);
std::unique_lock metrics_lock(metrics_mutex);
last_exception = std::exception_ptr{};
@ -196,7 +193,7 @@ void StorageDistributedDirectoryMonitor::run()
}
/// Update counters
getFiles(metric_pending_files);
getFiles();
if (!quit && do_sleep)
task_handle->scheduleAfter(sleep_time.count());
@ -253,7 +250,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles(CurrentMetrics::Increment & metric_pending_files)
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
{
std::map<UInt64, std::string> files;
size_t new_bytes_count = 0;
@ -271,8 +268,6 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles(Curre
}
}
/// Note: the value of this metric will be kept if this function will throw an exception.
/// This is needed, because in case of exception, files still pending.
metric_pending_files.changeTo(files.size());
{
@ -283,11 +278,11 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles(Curre
return files;
}
bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files, CurrentMetrics::Increment & metric_pending_files)
bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files)
{
if (should_batch_inserts)
{
processFilesWithBatching(files, metric_pending_files);
processFilesWithBatching(files);
}
else
{
@ -296,14 +291,14 @@ bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std
if (quit)
return true;
processFile(file.second, metric_pending_files);
processFile(file.second);
}
}
return true;
}
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path, CurrentMetrics::Increment & metric_pending_files)
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
{
LOG_TRACE(log, "Started processing `{}`", file_path);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context->getSettingsRef());
@ -645,9 +640,7 @@ StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::g
};
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(
const std::map<UInt64, std::string> & files,
CurrentMetrics::Increment & metric_pending_files)
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
{
std::unordered_set<UInt64> file_indices_to_skip;

View File

@ -57,10 +57,10 @@ public:
private:
void run();
std::map<UInt64, std::string> getFiles(CurrentMetrics::Increment & metric_pending_files);
bool processFiles(const std::map<UInt64, std::string> & files, CurrentMetrics::Increment & metric_pending_files);
void processFile(const std::string & file_path, CurrentMetrics::Increment & metric_pending_files);
void processFilesWithBatching(const std::map<UInt64, std::string> & files, CurrentMetrics::Increment & metric_pending_files);
std::map<UInt64, std::string> getFiles();
bool processFiles(const std::map<UInt64, std::string> & files);
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
static bool isFileBrokenErrorCode(int code);
void markAsBroken(const std::string & file_path) const;
@ -98,6 +98,8 @@ private:
BackgroundSchedulePool & bg_pool;
BackgroundSchedulePoolTaskHolder task_handle;
CurrentMetrics::Increment metric_pending_files;
/// Read insert query and insert settings for backward compatible.
static void readHeader(ReadBuffer & in, Settings & insert_settings, std::string & insert_query, ClientInfo & client_info, Poco::Logger * log);