diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp index cde41b4afff..efc565295eb 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp @@ -616,7 +616,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio /* overwrite_status */true); } - appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */success); + appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), /* processed */success); } for (const auto & file_metadata : failed_during_read_files) @@ -628,7 +628,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio /* reduce_retry_count */true, /* overwrite_status */false); - appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */false); + appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), /* processed */false); } } @@ -649,7 +649,6 @@ void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path) void ObjectStorageQueueSource::appendLogElement( const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, - size_t processed_rows, bool processed) { if (!system_queue_log) @@ -664,7 +663,7 @@ void ObjectStorageQueueSource::appendLogElement( .table = storage_id.table_name, .uuid = toString(storage_id.uuid), .file_name = filename, - .rows_processed = processed_rows, + .rows_processed = file_status_.processed_rows, .status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed, .processing_start_time = file_status_.processing_start_time, .processing_end_time = file_status_.processing_end_time, diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h index c085287e4f3..45707eb823d 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h @@ -158,7 +158,6 @@ private: void appendLogElement( const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, - size_t processed_rows, bool processed); }; diff --git a/tests/integration/test_storage_s3_queue/configs/s3queue_log.xml b/tests/integration/test_storage_s3_queue/configs/s3queue_log.xml index 93a04269fa7..6236603f18f 100644 --- a/tests/integration/test_storage_s3_queue/configs/s3queue_log.xml +++ b/tests/integration/test_storage_s3_queue/configs/s3queue_log.xml @@ -1,4 +1,8 @@ + + system + azure_queue_log
+
system s3queue_log
diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 66dad88ccbc..7629c1a4cc3 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -320,6 +320,21 @@ def test_delete_after_processing(started_cluster, mode, engine_name): ).splitlines() ] == sorted(total_values, key=lambda x: (x[0], x[1], x[2])) + node.query("system flush logs") + + if engine_name == "S3Queue": + system_table_name = "s3queue_log" + else: + system_table_name = "azure_queue_log" + assert ( + int( + node.query( + f"SELECT sum(rows_processed) FROM system.{system_table_name} WHERE table = '{table_name}'" + ) + ) + == files_num * row_num + ) + if engine_name == "S3Queue": minio = started_cluster.minio_client objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))