From 5541461b991c8fe17414c7d663c7aa7ff28136ae Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 1 Jul 2024 16:04:21 +0200 Subject: [PATCH] Fix time of writing to _log table --- .../ObjectStorageQueue/ObjectStorageQueueSource.cpp | 13 ++++++------- .../ObjectStorageQueue/ObjectStorageQueueSource.h | 7 ++++++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp index 2effbe7e7c2..955e49bc2bf 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp @@ -469,8 +469,6 @@ Chunk ObjectStorageQueueSource::generateImpl() LOG_ERROR(log, "Failed to set file {} as failed: {}", object_info->relative_path, getCurrentExceptionMessage(true)); } - - appendLogElement(reader.getObjectInfo()->getPath(), *file_status, processed_rows_from_file, false); } LOG_TEST(log, "Query is cancelled"); @@ -502,8 +500,6 @@ Chunk ObjectStorageQueueSource::generateImpl() object_info->relative_path, getCurrentExceptionMessage(true)); } - appendLogElement(path, *file_status, processed_rows_from_file, false); - /// Leave the file half processed. Table is being dropped, so we do not care. break; } @@ -548,7 +544,6 @@ Chunk ObjectStorageQueueSource::generateImpl() failed_during_read_files.push_back(file_metadata); file_status->onFailed(getCurrentExceptionMessage(true)); - appendLogElement(path, *file_status, processed_rows_from_file, false); if (processed_rows_from_file == 0) { @@ -567,8 +562,6 @@ Chunk ObjectStorageQueueSource::generateImpl() throw; } - appendLogElement(path, *file_status, processed_rows_from_file, true); - file_status->setProcessingEndTime(); file_status.reset(); @@ -663,10 +656,14 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio applyActionAfterProcessing(file_metadata->getPath()); } else + { file_metadata->setFailed( exception_message, /* reduce_retry_count */false, /* overwrite_status */true); + + } + appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */success); } for (const auto & file_metadata : failed_during_read_files) @@ -677,6 +674,8 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio file_metadata->getFileStatus()->getException(), /* reduce_retry_count */true, /* overwrite_status */false); + + appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */false); } } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h index 50428ed5f4b..ccd87e8a269 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h @@ -160,7 +160,12 @@ private: Chunk generateImpl(); void applyActionAfterProcessing(const String & path); - void appendLogElement(const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed); + void appendLogElement( + const std::string & filename, + ObjectStorageQueueMetadata::FileStatus & file_status_, + size_t processed_rows, + bool processed); + void lazyInitialize(size_t processor); };