Backport #71946 to 24.8: Fix rows_processed column in system.s3/azure_queue_log broken in 24.6

This commit is contained in:
robot-clickhouse 2024-11-20 17:06:40 +00:00
parent b1a405c2c4
commit 1fa9c18e82
4 changed files with 22 additions and 5 deletions

View File

@ -616,7 +616,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
/* overwrite_status */true); /* overwrite_status */true);
} }
appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */success); appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), /* processed */success);
} }
for (const auto & file_metadata : failed_during_read_files) for (const auto & file_metadata : failed_during_read_files)
@ -628,7 +628,7 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
/* reduce_retry_count */true, /* reduce_retry_count */true,
/* overwrite_status */false); /* overwrite_status */false);
appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */false); appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), /* processed */false);
} }
} }
@ -649,7 +649,6 @@ void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path)
void ObjectStorageQueueSource::appendLogElement( void ObjectStorageQueueSource::appendLogElement(
const std::string & filename, const std::string & filename,
ObjectStorageQueueMetadata::FileStatus & file_status_, ObjectStorageQueueMetadata::FileStatus & file_status_,
size_t processed_rows,
bool processed) bool processed)
{ {
if (!system_queue_log) if (!system_queue_log)
@ -664,7 +663,7 @@ void ObjectStorageQueueSource::appendLogElement(
.table = storage_id.table_name, .table = storage_id.table_name,
.uuid = toString(storage_id.uuid), .uuid = toString(storage_id.uuid),
.file_name = filename, .file_name = filename,
.rows_processed = processed_rows, .rows_processed = file_status_.processed_rows,
.status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed, .status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed,
.processing_start_time = file_status_.processing_start_time, .processing_start_time = file_status_.processing_start_time,
.processing_end_time = file_status_.processing_end_time, .processing_end_time = file_status_.processing_end_time,

View File

@ -158,7 +158,6 @@ private:
void appendLogElement( void appendLogElement(
const std::string & filename, const std::string & filename,
ObjectStorageQueueMetadata::FileStatus & file_status_, ObjectStorageQueueMetadata::FileStatus & file_status_,
size_t processed_rows,
bool processed); bool processed);
}; };

View File

@ -1,4 +1,8 @@
<clickhouse> <clickhouse>
<azure_queue_log>
<database>system</database>
<table>azure_queue_log</table>
</azure_queue_log>
<s3queue_log> <s3queue_log>
<database>system</database> <database>system</database>
<table>s3queue_log</table> <table>s3queue_log</table>

View File

@ -320,6 +320,21 @@ def test_delete_after_processing(started_cluster, mode, engine_name):
).splitlines() ).splitlines()
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2])) ] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
node.query("system flush logs")
if engine_name == "S3Queue":
system_table_name = "s3queue_log"
else:
system_table_name = "azure_queue_log"
assert (
int(
node.query(
f"SELECT sum(rows_processed) FROM system.{system_table_name} WHERE table = '{table_name}'"
)
)
== files_num * row_num
)
if engine_name == "S3Queue": if engine_name == "S3Queue":
minio = started_cluster.minio_client minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True)) objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))