Fix race for distributed sends from disk

Before it was initialized from disk only on startup, but if some INSERT
can create the object before, then, it will lead to the situation when
it will not be initialized.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-01-21 20:37:12 +01:00
parent b5434eac3b
commit e10fb142fd
5 changed files with 30 additions and 32 deletions

View File

@ -118,8 +118,7 @@ DistributedAsyncInsertDirectoryQueue::DistributedAsyncInsertDirectoryQueue(
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool,
bool initialize_from_disk)
BackgroundSchedulePool & bg_pool)
: storage(storage_)
, pool(std::move(pool_))
, disk(disk_)
@ -144,8 +143,7 @@ DistributedAsyncInsertDirectoryQueue::DistributedAsyncInsertDirectoryQueue(
{
fs::create_directory(broken_path);
if (initialize_from_disk)
initializeFilesFromDisk();
initializeFilesFromDisk();
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();

View File

@ -54,8 +54,7 @@ public:
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool,
bool initialize_from_disk);
BackgroundSchedulePool & bg_pool);
~DistributedAsyncInsertDirectoryQueue();

View File

@ -724,8 +724,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
return guard;
};
std::vector<std::string> bin_files;
bin_files.reserve(dir_names.size());
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds();
size_t file_size;
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
@ -804,10 +804,16 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
out.sync();
}
file_size = fs::file_size(first_file_tmp_path);
// Create hardlink here to reuse increment number
bin_files.push_back(fs::path(path) / file_name);
createHardLink(first_file_tmp_path, bin_files.back());
auto dir_sync_guard = make_directory_sync_guard(*it);
auto bin_file = (fs::path(path) / file_name).string();
auto & directory_queue = storage.getDirectoryQueue(disk, *it);
{
createHardLink(first_file_tmp_path, bin_file);
auto dir_sync_guard = make_directory_sync_guard(*it);
}
directory_queue.addFileAndSchedule(bin_file, file_size, sleep_ms);
}
++it;
@ -817,26 +823,18 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
const std::string path(fs::path(disk_path) / (data_path + *it));
fs::create_directory(path);
bin_files.push_back(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin"));
createHardLink(first_file_tmp_path, bin_files.back());
auto dir_sync_guard = make_directory_sync_guard(*it);
auto bin_file = (fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin")).string();
auto & directory_monitor = storage.getDirectoryQueue(disk, *it);
{
createHardLink(first_file_tmp_path, bin_file);
auto dir_sync_guard = make_directory_sync_guard(*it);
}
directory_monitor.addFileAndSchedule(bin_file, file_size, sleep_ms);
}
auto file_size = fs::file_size(first_file_tmp_path);
/// remove the temporary file, enabling the OS to reclaim inode after all threads
/// have removed their corresponding files
fs::remove(first_file_tmp_path);
/// Notify
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (size_t i = 0; i < dir_names.size(); ++i)
{
const auto & dir_name = dir_names[i];
const auto & bin_file = bin_files[i];
auto & directory_monitor = storage.getDirectoryQueue(disk, dir_name, /* startup= */ false);
directory_monitor.addFileAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds());
}
}
}

View File

@ -1225,14 +1225,14 @@ void StorageDistributed::initializeDirectoryQueuesForDisk(const DiskPtr & disk)
}
else
{
getDirectoryQueue(disk, dir_path.filename().string(), /* startup= */ true);
getDirectoryQueue(disk, dir_path.filename().string());
}
}
}
}
DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(const DiskPtr & disk, const std::string & name, bool startup)
DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(const DiskPtr & disk, const std::string & name)
{
const std::string & disk_path = disk->getPath();
const std::string key(disk_path + name);
@ -1246,8 +1246,7 @@ DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(con
*this, disk, relative_data_path + name,
node_data.connection_pool,
monitors_blocker,
getContext()->getDistributedSchedulePool(),
/* initialize_from_disk= */ startup);
getContext()->getDistributedSchedulePool());
}
return *node_data.directory_monitor;
}

View File

@ -166,8 +166,12 @@ private:
/// create directory monitors for each existing subdirectory
void initializeDirectoryQueuesForDisk(const DiskPtr & disk);
/// ensure directory queue thread and connection pool created by disk and subdirectory name
DistributedAsyncInsertDirectoryQueue & getDirectoryQueue(const DiskPtr & disk, const std::string & name, bool startup);
/// Get directory queue thread and connection pool created by disk and subdirectory name
///
/// Used for the INSERT into Distributed in case of insert_distributed_sync==1, from DistributedSink.
DistributedAsyncInsertDirectoryQueue & getDirectoryQueue(const DiskPtr & disk, const std::string & name);
/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)