diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp index 39e91e19014..04fac6c3402 100644 --- a/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/src/Storages/Distributed/DirectoryMonitor.cpp @@ -60,6 +60,7 @@ namespace ErrorCodes extern const int TOO_MANY_PARTITIONS; extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; extern const int ARGUMENT_OUT_OF_BOUND; + extern const int LOGICAL_ERROR; } @@ -365,18 +366,22 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( const std::string & relative_path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, - BackgroundSchedulePool & bg_pool) + BackgroundSchedulePool & bg_pool, + bool initialize_from_disk) : storage(storage_) , pool(std::move(pool_)) , disk(disk_) , relative_path(relative_path_) , path(fs::path(disk->getPath()) / relative_path / "") + , broken_relative_path(fs::path(relative_path) / "broken") + , broken_path(fs::path(path) / "broken" / "") , should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts) , split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure) , dir_fsync(storage.getDistributedSettingsRef().fsync_directories) , min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows) , min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes) , current_batch_file_path(path + "current_batch.txt") + , pending_files(std::numeric_limits::max()) , default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds()) , sleep_time(default_sleep_time) , max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds()) @@ -385,6 +390,11 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( , metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0) , metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0) { + fs::create_directory(broken_path); + + if (initialize_from_disk) + initializeFilesFromDisk(); + task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); }); task_handle->activateAndSchedule(); } @@ -392,35 +402,29 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor( StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor() { - if (!quit) + if (!pending_files.isFinished()) { - quit = true; + pending_files.clearAndFinish(); task_handle->deactivate(); } } void StorageDistributedDirectoryMonitor::flushAllData() { - if (quit) + if (pending_files.isFinished()) return; std::lock_guard lock{mutex}; - - const auto & files = getFiles(); - if (!files.empty()) - { - processFiles(files); - - /// Update counters. - getFiles(); - } + if (!hasPendingFiles()) + return; + processFiles(); } void StorageDistributedDirectoryMonitor::shutdownAndDropAllData() { - if (!quit) + if (!pending_files.isFinished()) { - quit = true; + pending_files.clearAndFinish(); task_handle->deactivate(); } @@ -434,19 +438,21 @@ void StorageDistributedDirectoryMonitor::run() std::lock_guard lock{mutex}; bool do_sleep = false; - while (!quit) + while (!pending_files.isFinished()) { do_sleep = true; - const auto & files = getFiles(); - if (files.empty()) + if (!hasPendingFiles()) break; if (!monitor_blocker.isCancelled()) { try { - do_sleep = !processFiles(files); + processFiles(); + /// No errors while processing existing files. + /// Let's see maybe there are more files to process. + do_sleep = false; std::lock_guard status_lock(status_mutex); status.last_exception = std::exception_ptr{}; @@ -470,9 +476,7 @@ void StorageDistributedDirectoryMonitor::run() } } else - { LOG_DEBUG(log, "Skipping send data over distributed table."); - } const auto now = std::chrono::system_clock::now(); if (now - last_decrease_time > decrease_error_count_period) @@ -487,10 +491,7 @@ void StorageDistributedDirectoryMonitor::run() break; } - /// Update counters. - getFiles(); - - if (!quit && do_sleep) + if (!pending_files.isFinished() && do_sleep) task_handle->scheduleAfter(sleep_time.count()); } @@ -568,41 +569,83 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri settings.distributed_replica_error_cap); } - -std::map StorageDistributedDirectoryMonitor::getFiles() +bool StorageDistributedDirectoryMonitor::hasPendingFiles() const { - std::map files; + return fs::exists(current_batch_file_path) || !current_batch_file.empty() || !pending_files.empty(); +} + +void StorageDistributedDirectoryMonitor::initializeFilesFromDisk() +{ + /// NOTE: This method does not requires to hold status_mutex, hence, no TSA + /// annotations in the header file. fs::directory_iterator end; - for (fs::directory_iterator it{path}; it != end; ++it) + + /// Initialize pending files { - const auto & file_path_str = it->path(); - if (!it->is_directory() && startsWith(fs::path(file_path_str).extension(), ".bin")) + size_t bytes_count = 0; + + for (fs::directory_iterator it{path}; it != end; ++it) { - files[parse(fs::path(file_path_str).stem())] = file_path_str; + const auto & file_path = it->path(); + const auto & base_name = file_path.stem().string(); + if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse(base_name)) + { + const std::string & file_path_str = file_path.string(); + if (!pending_files.push(file_path_str)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file"); + bytes_count += fs::file_size(file_path); + } + else if (base_name != "tmp" && base_name != "broken") + { + /// It is OK to log current_batch.txt here too (useful for debugging). + LOG_WARNING(log, "Unexpected file {} in {}", file_path.string(), path); + } } + + LOG_TRACE(log, "Files set to {}", pending_files.size()); + LOG_TRACE(log, "Bytes set to {}", bytes_count); + + metric_pending_files.changeTo(pending_files.size()); + status.files_count = pending_files.size(); + status.bytes_count = bytes_count; } - return files; + /// Initialize broken files + { + size_t broken_bytes_count = 0; + size_t broken_files = 0; + + for (fs::directory_iterator it{broken_path}; it != end; ++it) + { + const auto & file_path = it->path(); + if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse(file_path.stem())) + broken_bytes_count += fs::file_size(file_path); + else + LOG_WARNING(log, "Unexpected file {} in {}", file_path.string(), broken_path); + } + + LOG_TRACE(log, "Broken files set to {}", broken_files); + LOG_TRACE(log, "Broken bytes set to {}", broken_bytes_count); + + metric_broken_files.changeTo(broken_files); + status.broken_files_count = broken_files; + status.broken_bytes_count = broken_bytes_count; + } } -bool StorageDistributedDirectoryMonitor::processFiles(const std::map & files) +void StorageDistributedDirectoryMonitor::processFiles() { if (should_batch_inserts) - { - processFilesWithBatching(files); - } + processFilesWithBatching(); else { - for (const auto & file : files) - { - if (quit) - return true; + /// Process unprocessed file. + if (!current_batch_file.empty()) + processFile(current_batch_file); - processFile(file.second); - } + while (pending_files.tryPop(current_batch_file)) + processFile(current_batch_file); } - - return true; } void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path) @@ -649,7 +692,11 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa thread_trace_context->root_span.addAttribute(std::current_exception()); e.addMessage(fmt::format("While sending {}", file_path)); - maybeMarkAsBroken(file_path, e); + if (isFileBrokenErrorCode(e.code(), e.isRemoteException())) + { + markAsBroken(file_path); + current_batch_file.clear(); + } throw; } catch (...) @@ -662,6 +709,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path); markAsSend(file_path); + current_batch_file.clear(); LOG_TRACE(log, "Finished processing `{}` (took {} ms)", file_path, watch.elapsedMilliseconds()); } @@ -701,23 +749,19 @@ struct StorageDistributedDirectoryMonitor::BatchHeader struct StorageDistributedDirectoryMonitor::Batch { - std::vector file_indices; size_t total_rows = 0; size_t total_bytes = 0; bool recovered = false; StorageDistributedDirectoryMonitor & parent; - const std::map & file_index_to_path; + std::vector files; bool split_batch_on_failure = true; bool fsync = false; bool dir_fsync = false; - Batch( - StorageDistributedDirectoryMonitor & parent_, - const std::map & file_index_to_path_) + explicit Batch(StorageDistributedDirectoryMonitor & parent_) : parent(parent_) - , file_index_to_path(file_index_to_path_) , split_batch_on_failure(parent.split_batch_on_failure) , fsync(parent.storage.getDistributedSettingsRef().fsync_after_insert) , dir_fsync(parent.dir_fsync) @@ -732,7 +776,7 @@ struct StorageDistributedDirectoryMonitor::Batch void send() { - if (file_indices.empty()) + if (files.empty()) return; CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend}; @@ -775,7 +819,7 @@ struct StorageDistributedDirectoryMonitor::Batch } catch (const Exception & e) { - if (split_batch_on_failure && file_indices.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException())) + if (split_batch_on_failure && files.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException())) { tryLogCurrentException(parent.log, "Trying to split batch due to"); sendSeparateFiles(); @@ -795,44 +839,28 @@ struct StorageDistributedDirectoryMonitor::Batch } else { - std::vector files; - for (const auto && file_info : file_index_to_path | boost::adaptors::indexed()) - { - if (file_info.index() > 8) - { - files.push_back("..."); - break; - } - - files.push_back(file_info.value().second); - } - e.addMessage(fmt::format("While sending batch, nums: {}, files: {}", file_index_to_path.size(), fmt::join(files, "\n"))); - + e.addMessage(fmt::format("While sending a batch of {} files, files: {}", files.size(), fmt::join(files, "\n"))); throw; } } if (!batch_broken) { - LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", file_indices.size(), watch.elapsedMilliseconds()); + LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", files.size(), watch.elapsedMilliseconds()); auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path); - for (UInt64 file_index : file_indices) - parent.markAsSend(file_index_to_path.at(file_index)); + for (const auto & file : files) + parent.markAsSend(file); } else if (!batch_marked_as_broken) { - LOG_ERROR(parent.log, "Marking a batch of {} files as broken.", file_indices.size()); + LOG_ERROR(parent.log, "Marking a batch of {} files as broken, files: {}", files.size(), fmt::join(files, "\n")); - for (UInt64 file_idx : file_indices) - { - auto file_path = file_index_to_path.find(file_idx); - if (file_path != file_index_to_path.end()) - parent.markAsBroken(file_path->second); - } + for (const auto & file : files) + parent.markAsBroken(file); } - file_indices.clear(); + files.clear(); total_rows = 0; total_bytes = 0; recovered = false; @@ -842,8 +870,11 @@ struct StorageDistributedDirectoryMonitor::Batch void writeText(WriteBuffer & out) { - for (UInt64 file_idx : file_indices) - out << file_idx << '\n'; + for (const auto & file : files) + { + UInt64 file_index = parse(fs::path(file).stem()); + out << file_index << '\n'; + } } void readText(ReadBuffer & in) @@ -852,8 +883,9 @@ struct StorageDistributedDirectoryMonitor::Batch { UInt64 idx; in >> idx >> "\n"; - file_indices.push_back(idx); + files.push_back(fmt::format("{}/{}.bin", parent.path, idx)); } + recovered = true; } @@ -865,14 +897,9 @@ private: IConnectionPool::Entry connection; - for (UInt64 file_idx : file_indices) + for (const auto & file : files) { - auto file_path = file_index_to_path.find(file_idx); - if (file_path == file_index_to_path.end()) - throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO, - "Failed to send batch: file with index {} is absent", file_idx); - - ReadBufferFromFile in(file_path->second); + ReadBufferFromFile in(file); const auto & distributed_header = readDistributedHeader(in, parent.log); OpenTelemetry::TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__, @@ -886,7 +913,7 @@ private: compression_expected = connection->getCompression() == Protocol::Compression::Enable; LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).", - file_indices.size(), + files.size(), connection->getDescription(), formatReadableQuantity(total_rows), formatReadableSizeWithBinarySuffix(total_bytes)); @@ -907,19 +934,11 @@ private: { size_t broken_files = 0; - for (UInt64 file_idx : file_indices) + for (const auto & file : files) { - auto file_path = file_index_to_path.find(file_idx); - if (file_path == file_index_to_path.end()) - { - LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx); - ++broken_files; - continue; - } - try { - ReadBufferFromFile in(file_path->second); + ReadBufferFromFile in(file); const auto & distributed_header = readDistributedHeader(in, parent.log); // this function is called in a separated thread, so we set up the trace context from the file @@ -941,9 +960,11 @@ private: } catch (Exception & e) { - e.addMessage(fmt::format("While sending {}", file_path->second)); - parent.maybeMarkAsBroken(file_path->second, e); - ++broken_files; + if (isFileBrokenErrorCode(e.code(), e.isRemoteException())) + { + parent.markAsBroken(file); + ++broken_files; + } } } @@ -1023,13 +1044,18 @@ std::shared_ptr StorageDistributedDirectoryMonitor::createSourceFromFil return std::make_shared(file_name); } -bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t ms) +bool StorageDistributedDirectoryMonitor::addAndSchedule(const std::string & file_path, size_t file_size, size_t ms) { - if (quit) + /// NOTE: It is better not to throw in this case, since the file is already + /// on disk (see DistributedSink), and it will be processed next time. + if (pending_files.isFinished()) return false; + if (!pending_files.push(file_path)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file"); + { - std::lock_guard status_lock(status_mutex); + std::lock_guard lock(status_mutex); metric_pending_files.add(); status.bytes_count += file_size; ++status.files_count; @@ -1045,33 +1071,25 @@ StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::g return current_status; } -void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map & files) +void StorageDistributedDirectoryMonitor::processFilesWithBatching() { - std::unordered_set file_indices_to_skip; - + /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch. if (fs::exists(current_batch_file_path)) { - /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch. - Batch batch(*this, files); + Batch batch(*this); ReadBufferFromFile in{current_batch_file_path}; batch.readText(in); - file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end()); batch.send(); + + auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path); + fs::remove(current_batch_file_path); } std::unordered_map header_to_batch; - for (const auto & file : files) + std::string file_path; + while (pending_files.tryPop(file_path)) { - if (quit) - return; - - UInt64 file_idx = file.first; - const String & file_path = file.second; - - if (file_indices_to_skip.contains(file_idx)) - continue; - size_t total_rows = 0; size_t total_bytes = 0; Block header; @@ -1110,8 +1128,9 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map } catch (const Exception & e) { - if (maybeMarkAsBroken(file_path, e)) + if (isFileBrokenErrorCode(e.code(), e.isRemoteException())) { + markAsBroken(file_path); tryLogCurrentException(log, "File is marked broken due to"); continue; } @@ -1125,9 +1144,9 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map std::move(distributed_header.client_info), std::move(header) ); - Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second; + Batch & batch = header_to_batch.try_emplace(batch_header, *this).first->second; - batch.file_indices.push_back(file_idx); + batch.files.push_back(file_path); batch.total_rows += total_rows; batch.total_bytes += total_bytes; @@ -1155,16 +1174,10 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path) { - const auto last_path_separator_pos = file_path.rfind('/'); - const auto & base_path = file_path.substr(0, last_path_separator_pos + 1); - const auto & file_name = file_path.substr(last_path_separator_pos + 1); - const String & broken_path = fs::path(base_path) / "broken/"; - const String & broken_file_path = fs::path(broken_path) / file_name; - - fs::create_directory(broken_path); + const String & broken_file_path = fs::path(broken_path) / fs::path(file_path).filename(); auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path); - auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, fs::path(relative_path) / "broken/"); + auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, broken_relative_path); { std::lock_guard status_lock(status_mutex); @@ -1198,21 +1211,9 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat fs::remove(file_path); } -bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e) -{ - /// Mark file as broken if necessary. - if (isFileBrokenErrorCode(e.code(), e.isRemoteException())) - { - markAsBroken(file_path); - return true; - } - else - return false; -} - std::string StorageDistributedDirectoryMonitor::getLoggerName() const { - return storage.getStorageID().getFullTableName() + ".DirectoryMonitor"; + return storage.getStorageID().getFullTableName() + ".DirectoryMonitor." + disk->getName(); } void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_relative_path) diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h index 7015fca0311..99e949ddcff 100644 --- a/src/Storages/Distributed/DirectoryMonitor.h +++ b/src/Storages/Distributed/DirectoryMonitor.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -38,7 +39,8 @@ public: const std::string & relative_path_, ConnectionPoolPtr pool_, ActionBlocker & monitor_blocker_, - BackgroundSchedulePool & bg_pool); + BackgroundSchedulePool & bg_pool, + bool initialize_from_disk); ~StorageDistributedDirectoryMonitor(); @@ -53,7 +55,7 @@ public: static std::shared_ptr createSourceFromFile(const String & file_name); /// For scheduling via DistributedSink. - bool addAndSchedule(size_t file_size, size_t ms); + bool addAndSchedule(const std::string & file_path, size_t file_size, size_t ms); struct InternalStatus { @@ -78,14 +80,15 @@ public: private: void run(); - std::map getFiles(); - bool processFiles(const std::map & files); + bool hasPendingFiles() const; + + void initializeFilesFromDisk(); + void processFiles(); void processFile(const std::string & file_path); - void processFilesWithBatching(const std::map & files); + void processFilesWithBatching(); void markAsBroken(const std::string & file_path); void markAsSend(const std::string & file_path); - bool maybeMarkAsBroken(const std::string & file_path, const Exception & e); std::string getLoggerName() const; @@ -95,25 +98,33 @@ private: DiskPtr disk; std::string relative_path; std::string path; + std::string broken_relative_path; + std::string broken_path; const bool should_batch_inserts = false; const bool split_batch_on_failure = true; const bool dir_fsync = false; const size_t min_batched_block_size_rows = 0; const size_t min_batched_block_size_bytes = 0; - String current_batch_file_path; + + /// This is pending data (due to some error) for should_batch_inserts==true + std::string current_batch_file_path; + /// This is pending data (due to some error) for should_batch_inserts==false + std::string current_batch_file; struct BatchHeader; struct Batch; std::mutex status_mutex; + InternalStatus status; + ConcurrentBoundedQueue pending_files; + const std::chrono::milliseconds default_sleep_time; std::chrono::milliseconds sleep_time; const std::chrono::milliseconds max_sleep_time; std::chrono::time_point last_decrease_time {std::chrono::system_clock::now()}; - std::atomic quit {false}; std::mutex mutex; Poco::Logger * log; ActionBlocker & monitor_blocker; diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp index 38ff06f4744..8cee3e9ee91 100644 --- a/src/Storages/Distributed/DistributedSink.cpp +++ b/src/Storages/Distributed/DistributedSink.cpp @@ -724,6 +724,9 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const return guard; }; + std::vector bin_files; + bin_files.reserve(dir_names.size()); + auto it = dir_names.begin(); /// on first iteration write block to a temporary directory for subsequent /// hardlinking to ensure the inode is not freed until we're done @@ -802,8 +805,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const } // Create hardlink here to reuse increment number - const std::string block_file_path(fs::path(path) / file_name); - createHardLink(first_file_tmp_path, block_file_path); + bin_files.push_back(fs::path(path) / file_name); + createHardLink(first_file_tmp_path, bin_files.back()); auto dir_sync_guard = make_directory_sync_guard(*it); } ++it; @@ -814,8 +817,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const const std::string path(fs::path(disk_path) / (data_path + *it)); fs::create_directory(path); - const std::string block_file_path(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin")); - createHardLink(first_file_tmp_path, block_file_path); + bin_files.push_back(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin")); + createHardLink(first_file_tmp_path, bin_files.back()); auto dir_sync_guard = make_directory_sync_guard(*it); } @@ -826,10 +829,13 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const /// Notify auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms; - for (const auto & dir_name : dir_names) + for (size_t i = 0; i < dir_names.size(); ++i) { + const auto & dir_name = dir_names[i]; + const auto & bin_file = bin_files[i]; + auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false); - directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds()); + directory_monitor.addAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds()); } } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index febf9773f71..888c9fffa48 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1205,12 +1205,15 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk) const auto & dir_path = it->path(); if (std::filesystem::is_directory(dir_path)) { + /// Created by DistributedSink const auto & tmp_path = dir_path / "tmp"; - - /// "tmp" created by DistributedSink if (std::filesystem::is_directory(tmp_path) && std::filesystem::is_empty(tmp_path)) std::filesystem::remove(tmp_path); + const auto & broken_path = dir_path / "broken"; + if (std::filesystem::is_directory(broken_path) && std::filesystem::is_empty(broken_path)) + std::filesystem::remove(broken_path); + if (std::filesystem::is_empty(dir_path)) { LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path.string()); @@ -1239,7 +1242,8 @@ StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor( *this, disk, relative_data_path + name, data.connection_pool, monitors_blocker, - getContext()->getDistributedSchedulePool()); + getContext()->getDistributedSchedulePool(), + /* initialize_from_disk= */ startup); return data; }; diff --git a/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference index f3be69d3279..b0d8284faa5 100644 --- a/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference +++ b/tests/queries/0_stateless/01791_dist_INSERT_block_structure_mismatch.reference @@ -1,7 +1,7 @@ DistributedSink: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done. DistributedSink: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done. - default.dist_01683.DirectoryMonitor: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done - default.dist_01683.DirectoryMonitor: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done + default.dist_01683.DirectoryMonitor.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done + default.dist_01683.DirectoryMonitor.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done 1 1 2