From f0a2efa63014ff1349340e28647ab067698cf974 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sat, 21 Jan 2023 10:20:28 +0100 Subject: [PATCH] Always manipulate with absolute file paths in DirectoryMonitor Otherwise on batch restore we can get the difference in file paths. Signed-off-by: Azat Khuzhin --- src/Storages/Distributed/DirectoryMonitor.cpp | 16 ++++++++++------ src/Storages/Distributed/DirectoryMonitor.h | 3 ++- src/Storages/Distributed/DistributedSink.cpp | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index c04e54b6bba..eead8c8ea42 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -573,6 +573,12 @@ bool StorageDistributedDirectoryMonitor::hasPendingFiles() const return fs::exists(current_batch_file_path) || !current_file.empty() || !pending_files.empty(); } +void StorageDistributedDirectoryMonitor::addFile(const std::string & file_path) +{ + if (!pending_files.push(fs::absolute(file_path).string())) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot schedule a file '{}'", file_path); +} + void StorageDistributedDirectoryMonitor::initializeFilesFromDisk() { /// NOTE: This method does not requires to hold status_mutex, hence, no TSA @@ -591,8 +597,7 @@ void StorageDistributedDirectoryMonitor::initializeFilesFromDisk() if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse(base_name)) { const std::string & file_path_str = file_path.string(); - if (!pending_files.push(file_path_str)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file"); + addFile(file_path_str); bytes_count += fs::file_size(file_path); } else if (base_name != "tmp" && base_name != "broken") @@ -882,7 +887,7 @@ struct StorageDistributedDirectoryMonitor::Batch { UInt64 idx; in >> idx >> "\n"; - files.push_back(fmt::format("{}/{}.bin", parent.path, idx)); + files.push_back(fs::absolute(fmt::format("{}/{}.bin", parent.path, idx)).string()); } recovered = true; @@ -1043,7 +1048,7 @@ std::shared_ptr StorageDistributedDirectoryMonitor::createSourceFromFil return std::make_shared(file_name); } -bool StorageDistributedDirectoryMonitor::addAndSchedule(const std::string & file_path, size_t file_size, size_t ms) +bool StorageDistributedDirectoryMonitor::addFileAndSchedule(const std::string & file_path, size_t file_size, size_t ms) { /// NOTE: It is better not to throw in this case, since the file is already /// on disk (see DistributedSink), and it will be processed next time. @@ -1053,8 +1058,7 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(const std::string & file return false; } - if (!pending_files.push(file_path)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file"); + addFile(file_path); { std::lock_guard lock(status_mutex); diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index 06843b77a4b..9b1596d45e3 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -55,7 +55,7 @@ public: static std::shared_ptr createSourceFromFile(const String & file_name); /// For scheduling via DistributedSink. - bool addAndSchedule(const std::string & file_path, size_t file_size, size_t ms); + bool addFileAndSchedule(const std::string & file_path, size_t file_size, size_t ms); struct InternalStatus { @@ -83,6 +83,7 @@ private: bool hasPendingFiles() const; + void addFile(const std::string & file_path); void initializeFilesFromDisk(); void processFiles(); void processFile(const std::string & file_path); diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index edbb2acc923..c9c235596db 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -835,7 +835,7 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const const auto & bin_file = bin_files[i]; auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false); - directory_monitor.addAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds()); + directory_monitor.addFileAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds()); } }