From 2371077ba6fba67428272afefda0e5b367e79c6f Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Wed, 3 Jul 2024 10:06:13 +0000 Subject: [PATCH] Backport #65928 to 24.6: Follow up to #65046 --- .../ObjectStorageQueue/ObjectStorageQueueSource.cpp | 13 ++++++------- .../ObjectStorageQueue/ObjectStorageQueueSource.h | 7 ++++++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp index 92c1b08da1a..1939ea0a66f 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp @@ -471,8 +471,6 @@ Chunk ObjectStorageQueueSource::generateImpl() LOG_ERROR(log, "Failed to set file {} as failed: {}", object_info->relative_path, getCurrentExceptionMessage(true)); } - - appendLogElement(reader.getObjectInfo()->getPath(), *file_status, processed_rows_from_file, false); } LOG_TEST(log, "Query is cancelled"); @@ -504,8 +502,6 @@ Chunk ObjectStorageQueueSource::generateImpl() object_info->relative_path, getCurrentExceptionMessage(true)); } - appendLogElement(path, *file_status, processed_rows_from_file, false); - /// Leave the file half processed. Table is being dropped, so we do not care. break; } @@ -550,7 +546,6 @@ Chunk ObjectStorageQueueSource::generateImpl() failed_during_read_files.push_back(file_metadata); file_status->onFailed(getCurrentExceptionMessage(true)); - appendLogElement(path, *file_status, processed_rows_from_file, false); if (processed_rows_from_file == 0) { @@ -569,8 +564,6 @@ Chunk ObjectStorageQueueSource::generateImpl() throw; } - appendLogElement(path, *file_status, processed_rows_from_file, true); - file_status->setProcessingEndTime(); file_status.reset(); @@ -665,10 +658,14 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio applyActionAfterProcessing(file_metadata->getPath()); } else + { file_metadata->setFailed( exception_message, /* reduce_retry_count */false, /* overwrite_status */true); + + } + appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */success); } for (const auto & file_metadata : failed_during_read_files) @@ -679,6 +676,8 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio file_metadata->getFileStatus()->getException(), /* reduce_retry_count */true, /* overwrite_status */false); + + appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */false); } } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h index 50428ed5f4b..ccd87e8a269 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h @@ -160,7 +160,12 @@ private: Chunk generateImpl(); void applyActionAfterProcessing(const String & path); - void appendLogElement(const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed); + void appendLogElement( + const std::string & filename, + ObjectStorageQueueMetadata::FileStatus & file_status_, + size_t processed_rows, + bool processed); + void lazyInitialize(size_t processor); };