Merge pull request #67479 from ClickHouse/backport/24.3/66898

Backport #66898 to 24.3: [CI Fest] Better processing of broken parts and their projections (fixes rare cases of lost forever)
This commit is contained in:
Alexander Tokmakov 2024-09-03 15:08:27 +02:00 committed by GitHub
commit fee3ca8942
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 22 deletions

View File

@ -14,6 +14,9 @@
#include <Storages/MergeTree/ReplicatedFetchList.h> #include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h> #include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/NetException.h> #include <Common/NetException.h>
#include <Disks/IO/createReadBufferFromFileBase.h> #include <Disks/IO/createReadBufferFromFileBase.h>
@ -210,13 +213,17 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
} }
catch (const Exception & e) catch (const Exception & e)
{ {
if (e.code() != ErrorCodes::ABORTED && e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM) if (e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM
&& !isRetryableException(std::current_exception()))
{
report_broken_part(); report_broken_part();
}
throw; throw;
} }
catch (...) catch (...)
{ {
if (!isRetryableException(std::current_exception()))
report_broken_part(); report_broken_part();
throw; throw;
} }

View File

@ -14,16 +14,11 @@
#include <Processors/QueryPlan/FilterStep.h> #include <Processors/QueryPlan/FilterStep.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h> #include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Storages/MergeTree/checkDataPart.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
/// Lightweight (in terms of logic) stream for reading single part from /// Lightweight (in terms of logic) stream for reading single part from
/// MergeTree, used for merges and mutations. /// MergeTree, used for merges and mutations.
/// ///
@ -275,7 +270,7 @@ try
catch (...) catch (...)
{ {
/// Suspicion of the broken part. A part is added to the queue for verification. /// Suspicion of the broken part. A part is added to the queue for verification.
if (getCurrentExceptionCode() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) if (!isRetryableException(std::current_exception()))
storage.reportBrokenPart(data_part); storage.reportBrokenPart(data_part);
throw; throw;
} }

View File

@ -38,11 +38,13 @@ namespace ErrorCodes
extern const int CANNOT_ALLOCATE_MEMORY; extern const int CANNOT_ALLOCATE_MEMORY;
extern const int CANNOT_MUNMAP; extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP; extern const int CANNOT_MREMAP;
extern const int CANNOT_SCHEDULE_TASK;
extern const int UNEXPECTED_FILE_IN_DATA_PART; extern const int UNEXPECTED_FILE_IN_DATA_PART;
extern const int NO_FILE_IN_DATA_PART; extern const int NO_FILE_IN_DATA_PART;
extern const int NETWORK_ERROR; extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT; extern const int SOCKET_TIMEOUT;
extern const int BROKEN_PROJECTION; extern const int BROKEN_PROJECTION;
extern const int ABORTED;
} }
@ -88,11 +90,11 @@ bool isRetryableException(std::exception_ptr exception_ptr)
} }
catch (const Exception & e) catch (const Exception & e)
{ {
if (isNotEnoughMemoryErrorCode(e.code())) return isNotEnoughMemoryErrorCode(e.code())
return true; || e.code() == ErrorCodes::NETWORK_ERROR
|| e.code() == ErrorCodes::SOCKET_TIMEOUT
if (e.code() == ErrorCodes::NETWORK_ERROR || e.code() == ErrorCodes::SOCKET_TIMEOUT) || e.code() == ErrorCodes::CANNOT_SCHEDULE_TASK
return true; || e.code() == ErrorCodes::ABORTED;
} }
catch (const Poco::Net::NetException &) catch (const Poco::Net::NetException &)
{ {
@ -335,17 +337,22 @@ static IMergeTreeDataPart::Checksums checkDataPart(
projections_on_disk.erase(projection_file); projections_on_disk.erase(projection_file);
} }
if (throw_on_broken_projection && !broken_projections_message.empty()) if (throw_on_broken_projection)
{
if (!broken_projections_message.empty())
{ {
throw Exception(ErrorCodes::BROKEN_PROJECTION, "{}", broken_projections_message); throw Exception(ErrorCodes::BROKEN_PROJECTION, "{}", broken_projections_message);
} }
/// This one is actually not broken, just redundant files on disk which
/// MergeTree will never use.
if (require_checksums && !projections_on_disk.empty()) if (require_checksums && !projections_on_disk.empty())
{ {
throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART, throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART,
"Found unexpected projection directories: {}", "Found unexpected projection directories: {}",
fmt::join(projections_on_disk, ",")); fmt::join(projections_on_disk, ","));
} }
}
if (is_cancelled()) if (is_cancelled())
return {}; return {};