Always manipulate with absolute file paths in DirectoryMonitor

Otherwise on batch restore we can get the difference in file paths.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-01-21 10:20:28 +01:00
parent ef1e642e05
commit f0a2efa630
3 changed files with 13 additions and 8 deletions

View File

@ -573,6 +573,12 @@ bool StorageDistributedDirectoryMonitor::hasPendingFiles() const
return fs::exists(current_batch_file_path) || !current_file.empty() || !pending_files.empty();
}
void StorageDistributedDirectoryMonitor::addFile(const std::string & file_path)
{
if (!pending_files.push(fs::absolute(file_path).string()))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot schedule a file '{}'", file_path);
}
void StorageDistributedDirectoryMonitor::initializeFilesFromDisk()
{
/// NOTE: This method does not requires to hold status_mutex, hence, no TSA
@ -591,8 +597,7 @@ void StorageDistributedDirectoryMonitor::initializeFilesFromDisk()
if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse<UInt64>(base_name))
{
const std::string & file_path_str = file_path.string();
if (!pending_files.push(file_path_str))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file");
addFile(file_path_str);
bytes_count += fs::file_size(file_path);
}
else if (base_name != "tmp" && base_name != "broken")
@ -882,7 +887,7 @@ struct StorageDistributedDirectoryMonitor::Batch
{
UInt64 idx;
in >> idx >> "\n";
files.push_back(fmt::format("{}/{}.bin", parent.path, idx));
files.push_back(fs::absolute(fmt::format("{}/{}.bin", parent.path, idx)).string());
}
recovered = true;
@ -1043,7 +1048,7 @@ std::shared_ptr<ISource> StorageDistributedDirectoryMonitor::createSourceFromFil
return std::make_shared<DirectoryMonitorSource>(file_name);
}
bool StorageDistributedDirectoryMonitor::addAndSchedule(const std::string & file_path, size_t file_size, size_t ms)
bool StorageDistributedDirectoryMonitor::addFileAndSchedule(const std::string & file_path, size_t file_size, size_t ms)
{
/// NOTE: It is better not to throw in this case, since the file is already
/// on disk (see DistributedSink), and it will be processed next time.
@ -1053,8 +1058,7 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(const std::string & file
return false;
}
if (!pending_files.push(file_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file");
addFile(file_path);
{
std::lock_guard lock(status_mutex);

View File

@ -55,7 +55,7 @@ public:
static std::shared_ptr<ISource> createSourceFromFile(const String & file_name);
/// For scheduling via DistributedSink.
bool addAndSchedule(const std::string & file_path, size_t file_size, size_t ms);
bool addFileAndSchedule(const std::string & file_path, size_t file_size, size_t ms);
struct InternalStatus
{
@ -83,6 +83,7 @@ private:
bool hasPendingFiles() const;
void addFile(const std::string & file_path);
void initializeFilesFromDisk();
void processFiles();
void processFile(const std::string & file_path);

View File

@ -835,7 +835,7 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
const auto & bin_file = bin_files[i];
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false);
directory_monitor.addAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds());
directory_monitor.addFileAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds());
}
}