Avoid race condition for updating system.distribution_queue values

Previously it was possible to have a race while updating
files_count/bytes_count, since INSERT updates it those counters from one
thread and the same metrics are updated from filesystem in a separate
thread, and even though the access is synchronized with the mutex it
avoids the race only for accessing the variables not the logical race,
since it is possible that getFiles() from a separate thread will
increment counters and later addAndSchedule() will increment them again.

Here you can find an example of this race [1].

  [1]: https://pastila.nl/?00950e00/41a3c7bbb0a7e75bd3f2922c58b02334

Note, that I analyzed logs from production system with lots of async
Distributed INSERT and everything is OK there, even though the logs
contains the following:

    2022.11.20 02:21:15.459483 [ 11528 ] {} <Trace> v21.dist_out.DirectoryMonitor: Files set to 35 (was 34)
    2022.11.20 02:21:15.459515 [ 11528 ] {} <Trace> v21.dist_out.DirectoryMonitor: Bytes set to 4035418 (was 3929008)
    2022.11.20 02:21:15.819488 [ 11528 ] {} <Trace> v21.dist_out.DirectoryMonitor: Files set to 1 (was 2)
    2022.11.20 02:21:15.819502 [ 11528 ] {} <Trace> v21.dist_out.DirectoryMonitor: Bytes set to 190072 (was 296482)

As you may see it first increases the counters and next update
decreases (and 4035418-3929008 == 296482-190072)

Refs: #23885
Reported-by: @tavplubix
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-11-20 14:07:22 +01:00
parent 0b4e643c27
commit 177cbbac4b

View File

@ -572,7 +572,6 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
{
std::map<UInt64, std::string> files;
size_t new_bytes_count = 0;
fs::directory_iterator end;
for (fs::directory_iterator it{path}; it != end; ++it)
@ -581,23 +580,9 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
if (!it->is_directory() && startsWith(fs::path(file_path_str).extension(), ".bin"))
{
files[parse<UInt64>(fs::path(file_path_str).stem())] = file_path_str;
new_bytes_count += fs::file_size(fs::path(file_path_str));
}
}
{
std::lock_guard status_lock(status_mutex);
if (status.files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), status.files_count);
if (status.bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, status.bytes_count);
metric_pending_files.changeTo(files.size());
status.files_count = files.size();
status.bytes_count = new_bytes_count;
}
return files;
}
bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files)