Backport #65928 to 24.6: Follow up to #65046

This commit is contained in:
robot-clickhouse 2024-07-03 10:06:13 +00:00
parent 4e6cf4c75a
commit 2371077ba6
2 changed files with 12 additions and 8 deletions

View File

@ -471,8 +471,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
LOG_ERROR(log, "Failed to set file {} as failed: {}",
object_info->relative_path, getCurrentExceptionMessage(true));
}
appendLogElement(reader.getObjectInfo()->getPath(), *file_status, processed_rows_from_file, false);
}
LOG_TEST(log, "Query is cancelled");
@ -504,8 +502,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
object_info->relative_path, getCurrentExceptionMessage(true));
}
appendLogElement(path, *file_status, processed_rows_from_file, false);
/// Leave the file half processed. Table is being dropped, so we do not care.
break;
}
@ -550,7 +546,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
failed_during_read_files.push_back(file_metadata);
file_status->onFailed(getCurrentExceptionMessage(true));
appendLogElement(path, *file_status, processed_rows_from_file, false);
if (processed_rows_from_file == 0)
{
@ -569,8 +564,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
throw;
}
appendLogElement(path, *file_status, processed_rows_from_file, true);
file_status->setProcessingEndTime();
file_status.reset();
@ -665,10 +658,14 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
applyActionAfterProcessing(file_metadata->getPath());
}
else
{
file_metadata->setFailed(
exception_message,
/* reduce_retry_count */false,
/* overwrite_status */true);
}
appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */success);
}
for (const auto & file_metadata : failed_during_read_files)
@ -679,6 +676,8 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
file_metadata->getFileStatus()->getException(),
/* reduce_retry_count */true,
/* overwrite_status */false);
appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */false);
}
}

View File

@ -160,7 +160,12 @@ private:
Chunk generateImpl();
void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void appendLogElement(
const std::string & filename,
ObjectStorageQueueMetadata::FileStatus & file_status_,
size_t processed_rows,
bool processed);
void lazyInitialize(size_t processor);
};