Distributed: Calculate counters for async INSERT at INSERT time

Previous patch fixes the inaccuracy, but it's done using iterating over
directory on each request (to system.distribution_queue or to check
bytes_to_throw_insert), and like previous patch alredy stated, it may
have pretty huge overhead (especially when you have lots of distributed
files pending).

This patch remove that recalculation (but it will still be done, and
if there is different, there will be a log message), and replace it with
proper account at INSERT time (and after file has been sent, or marked
as broken).
This commit is contained in:
Azat Khuzhin 2021-01-26 21:45:37 +03:00
parent b43046ba06
commit fcf49a4914
3 changed files with 52 additions and 19 deletions

View File

@ -435,7 +435,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles(bool lock_metrics) const
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles() const
{
std::map<UInt64, std::string> files;
size_t new_bytes_count = 0;
@ -456,9 +456,13 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles(bool
metric_pending_files.changeTo(files.size());
{
std::unique_lock metrics_lock(metrics_mutex, std::defer_lock);
if (lock_metrics)
metrics_lock.lock();
std::unique_lock metrics_lock(metrics_mutex);
if (files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), files_count);
if (bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, bytes_count);
files_count = files.size();
bytes_count = new_bytes_count;
}
@ -511,9 +515,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
}
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
Poco::File{file_path}.remove();
metric_pending_files.sub();
markAsSend(file_path);
LOG_TRACE(log, "Finished processing `{}`", file_path);
}
@ -663,7 +665,7 @@ struct StorageDistributedDirectoryMonitor::Batch
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
for (UInt64 file_index : file_indices)
Poco::File{file_index_to_path.at(file_index)}.remove();
parent.markAsSend(file_index_to_path.at(file_index));
}
else
{
@ -749,10 +751,19 @@ BlockInputStreamPtr StorageDistributedDirectoryMonitor::createStreamFromFile(con
return std::make_shared<DirectoryMonitorBlockInputStream>(file_name);
}
bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms)
bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t ms)
{
if (quit)
return false;
{
std::unique_lock metrics_lock(metrics_mutex);
/// TODO: extend CurrentMetrics::Increment
metric_pending_files.sub(-1);
bytes_count += file_size;
++files_count;
}
return task_handle->scheduleAfter(ms, false);
}
@ -760,9 +771,6 @@ StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::g
{
std::unique_lock metrics_lock(metrics_mutex);
/// Recalculate counters
getFiles(false /* metrics_lock already acquired */);
return Status{
path,
last_exception,
@ -785,7 +793,6 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
batch.readText(in);
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
@ -855,7 +862,6 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
if (batch.isEnoughSize())
{
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
}
@ -863,7 +869,6 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
{
Batch & batch = kv.second;
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
{
@ -889,10 +894,36 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path + "/broken/");
Poco::File{file_path}.renameTo(broken_file_path);
Poco::File file(file_path);
{
/// TODO: guard_lock
std::unique_lock metrics_lock(metrics_mutex);
size_t file_size = file.getSize();
--files_count;
bytes_count -= file_size;
}
file.renameTo(broken_file_path);
LOG_ERROR(log, "Renamed `{}` to `{}`", file_path, broken_file_path);
}
void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_path) const
{
Poco::File file(file_path);
{
std::unique_lock metrics_lock(metrics_mutex);
size_t file_size = file.getSize();
--files_count;
bytes_count -= file_size;
}
metric_pending_files.sub();
file.remove();
}
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e) const
{

View File

@ -48,7 +48,7 @@ public:
static BlockInputStreamPtr createStreamFromFile(const String & file_name);
/// For scheduling via DistributedBlockOutputStream
bool scheduleAfter(size_t ms);
bool addAndSchedule(size_t file_size, size_t ms);
/// system.distribution_queue interface
struct Status
@ -65,12 +65,13 @@ public:
private:
void run();
std::map<UInt64, std::string> getFiles(bool lock_metrics = true) const;
std::map<UInt64, std::string> getFiles() const;
bool processFiles(const std::map<UInt64, std::string> & files);
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
void markAsBroken(const std::string & file_path) const;
void markAsSend(const std::string & file_path) const;
bool maybeMarkAsBroken(const std::string & file_path, const Exception & e) const;
std::string getLoggerName() const;

View File

@ -719,6 +719,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
auto dir_sync_guard = make_directory_sync_guard(*it);
}
auto file_size = Poco::File(first_file_tmp_path).getSize();
/// remove the temporary file, enabling the OS to reclaim inode after all threads
/// have removed their corresponding files
Poco::File(first_file_tmp_path).remove();
@ -728,7 +729,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
for (const auto & dir_name : dir_names)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
directory_monitor.scheduleAfter(sleep_ms.totalMilliseconds());
directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds());
}
}