From 781c5564643ce450f9dfc9f0d80d7240f19e10e6 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Tue, 19 Nov 2024 15:09:52 +0000 Subject: [PATCH] Backport #71946 to 24.9: Fix rows_processed column in system.s3/azure_queue_log broken in 24.6 --- .../ObjectStorageQueueSource.cpp | 7 +++---- .../ObjectStorageQueueSource.h | 1 - .../configs/s3queue_log.xml | 4 ++++ .../integration/test_storage_s3_queue/test.py | 19 ++++++++++++++++++- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp index 462953aaf27..979ce1eead1 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp @@ -631,7 +631,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio /* overwrite_status */true); } - appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */success); + appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), /* processed */success); } for (const auto & file_metadata : failed_during_read_files) @@ -643,7 +643,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio /* reduce_retry_count */true, /* overwrite_status */false); - appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */false); + appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), /* processed */false); } } @@ -664,7 +664,6 @@ void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path) void ObjectStorageQueueSource::appendLogElement( const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, - size_t processed_rows, bool processed) { if (!system_queue_log) @@ -679,7 +678,7 @@ void ObjectStorageQueueSource::appendLogElement( .table = storage_id.table_name, .uuid = toString(storage_id.uuid), .file_name = filename, - .rows_processed = processed_rows, + .rows_processed = file_status_.processed_rows, .status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed, .processing_start_time = file_status_.processing_start_time, .processing_end_time = file_status_.processing_end_time, diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h index c085287e4f3..45707eb823d 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h @@ -158,7 +158,6 @@ private: void appendLogElement( const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, - size_t processed_rows, bool processed); }; diff --git a/tests/integration/test_storage_s3_queue/configs/s3queue_log.xml b/tests/integration/test_storage_s3_queue/configs/s3queue_log.xml index 93a04269fa7..6236603f18f 100644 --- a/tests/integration/test_storage_s3_queue/configs/s3queue_log.xml +++ b/tests/integration/test_storage_s3_queue/configs/s3queue_log.xml @@ -1,4 +1,8 @@ + + system + azure_queue_log
+
system s3queue_log
diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index a6a110e055e..c5cd18dccbb 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -289,7 +289,9 @@ def generate_random_string(length=6): @pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"]) def test_delete_after_processing(started_cluster, mode, engine_name): node = started_cluster.instances["instance"] - table_name = f"delete_after_processing_{mode}_{engine_name}" + table_name = ( + f"delete_after_processing_{mode}_{engine_name}_{generate_random_string()}" + ) dst_table_name = f"{table_name}_dst" files_path = f"{table_name}_data" files_num = 5 @@ -332,6 +334,21 @@ def test_delete_after_processing(started_cluster, mode, engine_name): ).splitlines() ] == sorted(total_values, key=lambda x: (x[0], x[1], x[2])) + node.query("system flush logs") + + if engine_name == "S3Queue": + system_table_name = "s3queue_log" + else: + system_table_name = "azure_queue_log" + assert ( + int( + node.query( + f"SELECT sum(rows_processed) FROM system.{system_table_name} WHERE table = '{table_name}'" + ) + ) + == files_num * row_num + ) + if engine_name == "S3Queue": minio = started_cluster.minio_client objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))