Report broken parts in compact MergeTree

This commit is contained in:
Amos Bird 2020-08-04 00:10:09 +08:00
parent 1ee0fa7d57
commit 019f6d3a76
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4

View File

@ -11,6 +11,7 @@ namespace ErrorCodes
{ {
extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_READ_ALL_DATA;
extern const int ARGUMENT_OUT_OF_BOUND; extern const int ARGUMENT_OUT_OF_BOUND;
extern const int MEMORY_LIMIT_EXCEEDED;
} }
@ -43,66 +44,74 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
settings.save_marks_in_cache, settings.save_marks_in_cache,
data_part->getColumns().size()) data_part->getColumns().size())
{ {
size_t columns_num = columns.size(); try
column_positions.resize(columns_num);
read_only_offsets.resize(columns_num);
auto name_and_type = columns.begin();
for (size_t i = 0; i < columns_num; ++i, ++name_and_type)
{ {
const auto & [name, type] = getColumnFromPart(*name_and_type); size_t columns_num = columns.size();
auto position = data_part->getColumnPosition(name);
if (!position && typeid_cast<const DataTypeArray *>(type.get())) column_positions.resize(columns_num);
read_only_offsets.resize(columns_num);
auto name_and_type = columns.begin();
for (size_t i = 0; i < columns_num; ++i, ++name_and_type)
{ {
/// If array of Nested column is missing in part, const auto & [name, type] = getColumnFromPart(*name_and_type);
/// we have to read its offsets if they exist. auto position = data_part->getColumnPosition(name);
position = findColumnForOffsets(name);
read_only_offsets[i] = (position != std::nullopt); if (!position && typeid_cast<const DataTypeArray *>(type.get()))
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
position = findColumnForOffsets(name);
read_only_offsets[i] = (position != std::nullopt);
}
column_positions[i] = std::move(position);
} }
column_positions[i] = std::move(position); /// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges);
if (!buffer_size || settings.max_read_buffer_size < buffer_size)
buffer_size = settings.max_read_buffer_size;
const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
if (uncompressed_cache)
{
auto buffer = std::make_unique<CachedCompressedReadBuffer>(
fullPath(data_part->volume->getDisk(), full_data_path),
[this, full_data_path, buffer_size]()
{
return data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
},
uncompressed_cache);
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
cached_buffer = std::move(buffer);
data_buffer = cached_buffer.get();
}
else
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->volume->getDisk()->readFile(
full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io));
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
non_cached_buffer = std::move(buffer);
data_buffer = non_cached_buffer.get();
}
} }
catch (...)
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges);
if (!buffer_size || settings.max_read_buffer_size < buffer_size)
buffer_size = settings.max_read_buffer_size;
const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
if (uncompressed_cache)
{ {
auto buffer = std::make_unique<CachedCompressedReadBuffer>( storage.reportBrokenPart(data_part->name);
fullPath(data_part->volume->getDisk(), full_data_path), throw;
[this, full_data_path, buffer_size]()
{
return data_part->volume->getDisk()->readFile(
full_data_path,
buffer_size,
0,
settings.min_bytes_to_use_direct_io,
settings.min_bytes_to_use_mmap_io);
},
uncompressed_cache);
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
cached_buffer = std::move(buffer);
data_buffer = cached_buffer.get();
}
else
{
auto buffer =
std::make_unique<CompressedReadBufferFromFile>(
data_part->volume->getDisk()->readFile(
full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io));
if (profile_callback_)
buffer->setProfileCallback(profile_callback_, clock_type_);
non_cached_buffer = std::move(buffer);
data_buffer = non_cached_buffer.get();
} }
} }
@ -155,10 +164,18 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading,
} }
catch (Exception & e) catch (Exception & e)
{ {
if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED)
storage.reportBrokenPart(data_part->name);
/// Better diagnostics. /// Better diagnostics.
e.addMessage("(while reading column " + name + ")"); e.addMessage("(while reading column " + name + ")");
throw; throw;
} }
catch (...)
{
storage.reportBrokenPart(data_part->name);
throw;
}
} }
++from_mark; ++from_mark;