better reporting of broken parts

This commit is contained in:
Anton Popov 2023-06-22 12:23:51 +00:00
parent b7ef782335
commit ee68e85d31
10 changed files with 72 additions and 41 deletions

View File

@ -66,6 +66,8 @@ public:
virtual const SerializationInfoByName & getSerializationInfos() const = 0;
virtual String getTableName() const = 0;
virtual void reportBroken() = 0;
};

View File

@ -278,4 +278,13 @@ void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const
"Expected {}, got {}", requested_columns.size(), num_columns_to_read);
}
String IMergeTreeReader::getMessageForDiagnosticOfBrokenPart(size_t from_mark, size_t max_rows_to_read) const
{
return fmt::format(
"(while reading from part {} in table {} from mark {} with max_rows_to_read = {})",
data_part_info_for_read->getDataPartStorage()->getFullPath(),
data_part_info_for_read->getTableName(),
from_mark, max_rows_to_read);
}
}

View File

@ -74,6 +74,8 @@ protected:
void checkNumberOfColumns(size_t num_columns_to_read) const;
String getMessageForDiagnosticOfBrokenPart(size_t from_mark, size_t max_rows_to_read) const;
/// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
ValueSizeMap avg_value_size_hints;
/// Stores states for IDataType::deserializeBinaryBulk

View File

@ -56,6 +56,8 @@ public:
SerializationPtr getSerialization(const NameAndTypePair & column) const override { return data_part->getSerialization(column.name); }
String getTableName() const override { return data_part->storage.getStorageID().getNameForLogs(); }
MergeTreeData::DataPartPtr getDataPart() const { return data_part; }
private:

View File

@ -179,8 +179,6 @@ namespace ErrorCodes
extern const int ZERO_COPY_REPLICATION_ERROR;
extern const int NOT_INITIALIZED;
extern const int SERIALIZATION_ERROR;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
extern const int TOO_MANY_MUTATIONS;
}
@ -1174,25 +1172,6 @@ static void preparePartForRemoval(const MergeTreeMutableDataPartPtr & part)
}
}
static bool isRetryableException(const Exception & e)
{
if (isNotEnoughMemoryErrorCode(e.code()))
return true;
if (e.code() == ErrorCodes::NETWORK_ERROR || e.code() == ErrorCodes::SOCKET_TIMEOUT)
return true;
#if USE_AWS_S3
const auto * s3_exception = dynamic_cast<const S3Exception *>(&e);
if (s3_exception && s3_exception->isRetryableError())
return true;
#endif
/// In fact, there can be other similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
return false;
}
static constexpr size_t loading_parts_initial_backoff_ms = 100;
static constexpr size_t loading_parts_max_backoff_ms = 5000;
static constexpr size_t loading_parts_max_tries = 3;

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/NestedUtils.h>
@ -10,7 +11,6 @@ namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int MEMORY_LIMIT_EXCEEDED;
}
@ -112,6 +112,12 @@ void MergeTreeReaderCompact::initialize()
compressed_data_buffer = non_cached_buffer.get();
}
}
catch (const Exception & e)
{
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
@ -207,11 +213,11 @@ size_t MergeTreeReaderCompact::readRows(
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
/// Better diagnostics.
e.addMessage("(while reading column " + columns_to_read[pos].name + ")");
e.addMessage(getMessageForDiagnosticOfBrokenPart(from_mark, max_rows_to_read));
throw;
}
catch (...)
@ -315,6 +321,7 @@ void MergeTreeReaderCompact::readData(
}
void MergeTreeReaderCompact::prefetchBeginOfRange(Priority priority)
try
{
if (!initialized)
{
@ -326,6 +333,17 @@ void MergeTreeReaderCompact::prefetchBeginOfRange(Priority priority)
seekToMark(all_mark_ranges.front().begin, 0);
data_buffer->prefetch(priority);
}
catch (const Exception & e)
{
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
throw;
}
void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index)
{

View File

@ -9,6 +9,7 @@
#include <Interpreters/Context.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
@ -20,11 +21,6 @@ namespace
constexpr auto DATA_FILE_EXTENSION = ".bin";
}
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
MergeTreeReaderWide::MergeTreeReaderWide(
MergeTreeDataPartInfoForReaderPtr data_part_info_,
NamesAndTypesList columns_,
@ -51,6 +47,12 @@ MergeTreeReaderWide::MergeTreeReaderWide(
for (size_t i = 0; i < columns_to_read.size(); ++i)
addStreams(columns_to_read[i], serializations[i], profile_callback_, clock_type_);
}
catch (const Exception & e)
{
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
@ -76,9 +78,9 @@ void MergeTreeReaderWide::prefetchBeginOfRange(Priority priority)
/// of range only once so there is no such problem.
/// 4. continue_reading == false, as we haven't read anything yet.
}
catch (Exception & e)
catch (const Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
throw;
}
@ -184,22 +186,16 @@ size_t MergeTreeReaderWide::readRows(
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
/// Better diagnostics.
const auto & part_storage = data_part_info_for_read->getDataPartStorage();
e.addMessage(
fmt::format(
"(while reading from part {} located on disk {} of type {}, from mark {} with max_rows_to_read = {})",
part_storage->getFullPath(), part_storage->getDiskName(), part_storage->getDiskType(),
toString(from_mark), toString(max_rows_to_read)));
e.addMessage(getMessageForDiagnosticOfBrokenPart(from_mark, max_rows_to_read));
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
throw;
}

View File

@ -422,7 +422,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (isNotEnoughMemoryErrorCode(e.code()))
if (isRetryableException(e))
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);

View File

@ -13,6 +13,7 @@
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/HashingReadBuffer.h>
#include <IO/S3Common.h>
#include <Common/CurrentMetrics.h>
@ -33,6 +34,8 @@ namespace ErrorCodes
extern const int CANNOT_MUNMAP;
extern const int CANNOT_MREMAP;
extern const int UNEXPECTED_FILE_IN_DATA_PART;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
}
@ -47,6 +50,25 @@ bool isNotEnoughMemoryErrorCode(int code)
|| code == ErrorCodes::CANNOT_MREMAP;
}
bool isRetryableException(const Exception & e)
{
if (isNotEnoughMemoryErrorCode(e.code()))
return true;
if (e.code() == ErrorCodes::NETWORK_ERROR || e.code() == ErrorCodes::SOCKET_TIMEOUT)
return true;
#if USE_AWS_S3
const auto * s3_exception = dynamic_cast<const S3Exception *>(&e);
if (s3_exception && s3_exception->isRetryableError())
return true;
#endif
/// In fact, there can be other similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
return false;
}
static IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part,
@ -302,7 +324,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
}
catch (const Exception & e)
{
if (isNotEnoughMemoryErrorCode(e.code()))
if (isRetryableException(e))
throw;
return drop_cache_and_check();

View File

@ -13,5 +13,6 @@ IMergeTreeDataPart::Checksums checkDataPart(
std::function<bool()> is_cancelled = []{ return false; });
bool isNotEnoughMemoryErrorCode(int code);
bool isRetryableException(const Exception & e);
}