mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge pull request #65928 from ClickHouse/follow-up-to-65046
Follow up to #65046
This commit is contained in:
commit
3d67e6b127
@ -469,8 +469,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
|
||||
LOG_ERROR(log, "Failed to set file {} as failed: {}",
|
||||
object_info->relative_path, getCurrentExceptionMessage(true));
|
||||
}
|
||||
|
||||
appendLogElement(reader.getObjectInfo()->getPath(), *file_status, processed_rows_from_file, false);
|
||||
}
|
||||
|
||||
LOG_TEST(log, "Query is cancelled");
|
||||
@ -502,8 +500,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
|
||||
object_info->relative_path, getCurrentExceptionMessage(true));
|
||||
}
|
||||
|
||||
appendLogElement(path, *file_status, processed_rows_from_file, false);
|
||||
|
||||
/// Leave the file half processed. Table is being dropped, so we do not care.
|
||||
break;
|
||||
}
|
||||
@ -548,7 +544,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
|
||||
|
||||
failed_during_read_files.push_back(file_metadata);
|
||||
file_status->onFailed(getCurrentExceptionMessage(true));
|
||||
appendLogElement(path, *file_status, processed_rows_from_file, false);
|
||||
|
||||
if (processed_rows_from_file == 0)
|
||||
{
|
||||
@ -567,8 +562,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
|
||||
throw;
|
||||
}
|
||||
|
||||
appendLogElement(path, *file_status, processed_rows_from_file, true);
|
||||
|
||||
file_status->setProcessingEndTime();
|
||||
file_status.reset();
|
||||
|
||||
@ -663,10 +656,14 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
|
||||
applyActionAfterProcessing(file_metadata->getPath());
|
||||
}
|
||||
else
|
||||
{
|
||||
file_metadata->setFailed(
|
||||
exception_message,
|
||||
/* reduce_retry_count */false,
|
||||
/* overwrite_status */true);
|
||||
|
||||
}
|
||||
appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */success);
|
||||
}
|
||||
|
||||
for (const auto & file_metadata : failed_during_read_files)
|
||||
@ -677,6 +674,8 @@ void ObjectStorageQueueSource::commit(bool success, const std::string & exceptio
|
||||
file_metadata->getFileStatus()->getException(),
|
||||
/* reduce_retry_count */true,
|
||||
/* overwrite_status */false);
|
||||
|
||||
appendLogElement(file_metadata->getPath(), *file_metadata->getFileStatus(), processed_rows_from_file, /* processed */false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,12 @@ private:
|
||||
|
||||
Chunk generateImpl();
|
||||
void applyActionAfterProcessing(const String & path);
|
||||
void appendLogElement(const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
|
||||
void appendLogElement(
|
||||
const std::string & filename,
|
||||
ObjectStorageQueueMetadata::FileStatus & file_status_,
|
||||
size_t processed_rows,
|
||||
bool processed);
|
||||
|
||||
void lazyInitialize(size_t processor);
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user