Merge pull request #43406 from azat/dist/async-insert-stat

Avoid race condition for updating system.distribution_queue values
This commit is contained in:
Alexander Tokmakov 2022-12-05 12:53:12 +03:00 committed by GitHub
commit 52d3e5471b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -572,7 +572,6 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
{
std::map<UInt64, std::string> files;
size_t new_bytes_count = 0;
fs::directory_iterator end;
for (fs::directory_iterator it{path}; it != end; ++it)
@ -581,23 +580,9 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
if (!it->is_directory() && startsWith(fs::path(file_path_str).extension(), ".bin"))
{
files[parse<UInt64>(fs::path(file_path_str).stem())] = file_path_str;
new_bytes_count += fs::file_size(fs::path(file_path_str));
}
}
{
std::lock_guard status_lock(status_mutex);
if (status.files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), status.files_count);
if (status.bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, status.bytes_count);
metric_pending_files.changeTo(files.size());
status.files_count = files.size();
status.bytes_count = new_bytes_count;
}
return files;
}
bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files)