Merge pull request #16729 from azat/dist-cleanup-empty-dirs-at-start

Remove empty directories for async INSERT at start of Distributed engine
This commit is contained in:
alexey-milovidov 2020-11-06 12:39:31 +03:00 committed by GitHub
commit 0da1127440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 6 deletions

View File

@ -79,7 +79,7 @@ namespace
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_)
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool)
: storage(storage_)
, pool(std::move(pool_))
, path{path_ + '/'}
@ -92,7 +92,6 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, max_sleep_time{storage.global_context->getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
, log{&Poco::Logger::get(getLoggerName())}
, monitor_blocker(monitor_blocker_)
, bg_pool(bg_pool_)
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
{
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });

View File

@ -25,7 +25,7 @@ class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool_);
StorageDistributed & storage_, std::string path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, BackgroundSchedulePool & bg_pool);
~StorageDistributedDirectoryMonitor();
@ -95,7 +95,6 @@ private:
Poco::Logger * log;
ActionBlocker & monitor_blocker;
BackgroundSchedulePool & bg_pool;
BackgroundSchedulePoolTaskHolder task_handle;
CurrentMetrics::Increment metric_pending_files;

View File

@ -670,8 +670,28 @@ void StorageDistributed::createDirectoryMonitors(const std::string & disk)
std::filesystem::directory_iterator begin(path);
std::filesystem::directory_iterator end;
for (auto it = begin; it != end; ++it)
if (std::filesystem::is_directory(*it))
requireDirectoryMonitor(disk, it->path().filename().string());
{
const auto & dir_path = it->path();
if (std::filesystem::is_directory(dir_path))
{
const auto & tmp_path = dir_path / "tmp";
/// "tmp" created by DistributedBlockOutputStream
if (std::filesystem::is_directory(tmp_path) && std::filesystem::is_empty(tmp_path))
std::filesystem::remove(tmp_path);
if (std::filesystem::is_empty(dir_path))
{
LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path);
/// Will be created by DistributedBlockOutputStream on demand.
std::filesystem::remove(dir_path);
}
else
{
requireDirectoryMonitor(disk, dir_path.filename().string());
}
}
}
}