diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp index dbbf7e2ee1d..1856490c92e 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp @@ -208,42 +208,46 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs /// Processing state is cached only if processing is being done by current clickhouse server /// (because If another server is doing the processing, /// we cannot know if state changes without checking with zookeeper so there is no point in cache here). - switch (file_status->state) + { - case FileStatus::State::Processing: [[fallthrough]]; - case FileStatus::State::Processed: + std::lock_guard lock(file_status->metadata_lock); + switch (file_status->state) { - return nullptr; - } - case FileStatus::State::Failed: - { - /// If max_loading_retries == 0, file is not retriable. - if (max_loading_retries == 0) + case FileStatus::State::Processing: [[fallthrough]]; + case FileStatus::State::Processed: + { return nullptr; + } + case FileStatus::State::Failed: + { + /// If max_loading_retries == 0, file is not retriable. + if (max_loading_retries == 0) + return nullptr; - /// Otherwise file_status->retries is also cached. - /// In case file_status->retries >= max_loading_retries we can fully rely that it is true - /// and will not attempt processing it. - /// But in case file_status->retries < max_loading_retries we cannot be sure - /// (another server could have done a try after we cached retries value), - /// so check with zookeeper here. - if (file_status->retries >= max_loading_retries) - return nullptr; + /// Otherwise file_status->retries is also cached. + /// In case file_status->retries >= max_loading_retries we can fully rely that it is true + /// and will not attempt processing it. + /// But in case file_status->retries < max_loading_retries we cannot be sure + /// (another server could have done a try after we cached retries value), + /// so check with zookeeper here. + if (file_status->retries >= max_loading_retries) + return nullptr; - break; - } - case FileStatus::State::None: - { - /// The file was not processed by current server and file status was not cached, - /// check metadata in zookeeper. - break; + break; + } + case FileStatus::State::None: + { + /// The file was not processed by current server and file status was not cached, + /// check metadata in zookeeper. + break; + } } } /// Another thread could already be trying to set file as processing. /// So there is no need to attempt the same, better to continue with the next file. - std::unique_lock lock(file_status->processing_lock, std::defer_lock); - if (!lock.try_lock()) + std::unique_lock processing_lock(file_status->processing_lock, std::defer_lock); + if (!processing_lock.try_lock()) { return nullptr; } @@ -272,6 +276,7 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs { case SetFileProcessingResult::Success: { + std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Processing; file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get()); @@ -284,11 +289,13 @@ S3QueueFilesMetadata::ProcessingNodeHolderPtr S3QueueFilesMetadata::trySetFileAs } case SetFileProcessingResult::AlreadyProcessed: { + std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Processed; break; } case SetFileProcessingResult::AlreadyFailed: { + std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Failed; break; } @@ -408,12 +415,16 @@ void S3QueueFilesMetadata::setFileProcessed(ProcessingNodeHolderPtr holder) auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueSetFileProcessedMicroseconds); const auto & path = holder->path; - SCOPE_EXIT({ - auto file_status = local_file_statuses.get(path, /* create */false); + auto file_status = local_file_statuses.get(path, /* create */false); + { + std::lock_guard lock(file_status->metadata_lock); file_status->state = FileStatus::State::Processed; + file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + } + + SCOPE_EXIT({ file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessedMicroseconds, timer.get()); timer.cancel(); - file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); }); switch (mode) @@ -524,9 +535,12 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S const auto & path = holder->path; auto file_status = local_file_statuses.get(path, /* create */false); - file_status->state = FileStatus::State::Failed; - file_status->last_exception = exception_message; - file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + { + std::lock_guard lock(file_status->metadata_lock); + file_status->state = FileStatus::State::Failed; + file_status->last_exception = exception_message; + file_status->processing_end_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + } SCOPE_EXIT({ file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileFailedMicroseconds, timer.get()); @@ -583,6 +597,8 @@ void S3QueueFilesMetadata::setFileFailed(ProcessingNodeHolderPtr holder, const S if (zk_client->tryGet(zookeeper_failed_path / node_name_with_retriable_suffix, res, &stat)) { node_metadata.retries = failed_node_metadata.retries + 1; + + std::lock_guard lock(file_status->metadata_lock); file_status->retries = node_metadata.retries; } @@ -771,6 +787,7 @@ void S3QueueFilesMetadata::cleanupThreadFuncImpl() LOG_TEST(log, "Cleanup is already being executed by another node"); return; } + /// TODO because of this lock we might not update local file statuses on time on one of the nodes. struct Node { diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h index f6d98d13aaf..7eb5c40eb26 100644 --- a/src/Storages/S3Queue/S3QueueFilesMetadata.h +++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h @@ -59,7 +59,7 @@ public: }; State state = State::None; - size_t processed_rows = 0; + std::atomic processed_rows = 0; time_t processing_start_time = 0; time_t processing_end_time = 0; size_t retries = 0; @@ -67,6 +67,7 @@ public: ProfileEvents::Counters profile_counters; std::mutex processing_lock; + std::mutex metadata_lock; }; using FileStatuses = std::unordered_map>; diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp index 09713e77f37..d86fc6fe1ce 100644 --- a/src/Storages/S3Queue/S3QueueSource.cpp +++ b/src/Storages/S3Queue/S3QueueSource.cpp @@ -201,22 +201,26 @@ void StorageS3QueueSource::applyActionAfterProcessing(const String & path) } } -void StorageS3QueueSource::appendLogElement(const std::string & filename, const S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed) +void StorageS3QueueSource::appendLogElement(const std::string & filename, S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed) { if (!s3_queue_log) return; - S3QueueLogElement elem + S3QueueLogElement elem{}; { - .event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()), - .file_name = filename, - .rows_processed = processed_rows, - .status = processed ? S3QueueLogElement::S3QueueStatus::Processed : S3QueueLogElement::S3QueueStatus::Failed, - .counters_snapshot = file_status_.profile_counters.getPartiallyAtomicSnapshot(), - .processing_start_time = file_status_.processing_start_time, - .processing_end_time = file_status_.processing_end_time, - .exception = file_status_.last_exception, - }; + std::lock_guard lock(file_status_.metadata_lock); + elem = S3QueueLogElement + { + .event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()), + .file_name = filename, + .rows_processed = processed_rows, + .status = processed ? S3QueueLogElement::S3QueueStatus::Processed : S3QueueLogElement::S3QueueStatus::Failed, + .counters_snapshot = file_status_.profile_counters.getPartiallyAtomicSnapshot(), + .processing_start_time = file_status_.processing_start_time, + .processing_end_time = file_status_.processing_end_time, + .exception = file_status_.last_exception, + }; + } s3_queue_log->add(std::move(elem)); } diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index fa21b6cdd59..9bd2cad9c72 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -91,7 +91,7 @@ private: std::shared_ptr file_status; void applyActionAfterProcessing(const String & path); - void appendLogElement(const std::string & filename, const S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed); + void appendLogElement(const std::string & filename, S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed); }; } diff --git a/src/Storages/System/StorageSystemS3Queue.cpp b/src/Storages/System/StorageSystemS3Queue.cpp index ce76469b72a..235f4ff6f8f 100644 --- a/src/Storages/System/StorageSystemS3Queue.cpp +++ b/src/Storages/System/StorageSystemS3Queue.cpp @@ -47,7 +47,10 @@ void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, co size_t i = 0; res_columns[i++]->insert(zookeeper_path); res_columns[i++]->insert(file_name); - res_columns[i++]->insert(file_status->processed_rows); + + std::lock_guard lock(file_status->metadata_lock); + + res_columns[i++]->insert(file_status->processed_rows.load()); res_columns[i++]->insert(magic_enum::enum_name(file_status->state)); if (file_status->processing_start_time)