From 019f6d3a760c2b0f3cfe834c90194467d64ad4a2 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Tue, 4 Aug 2020 00:10:09 +0800 Subject: [PATCH] Report broken parts in compact MergeTree --- .../MergeTree/MergeTreeReaderCompact.cpp | 123 ++++++++++-------- 1 file changed, 70 insertions(+), 53 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index b1fd67005d8..c01f17eb91f 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -11,6 +11,7 @@ namespace ErrorCodes { extern const int CANNOT_READ_ALL_DATA; extern const int ARGUMENT_OUT_OF_BOUND; + extern const int MEMORY_LIMIT_EXCEEDED; } @@ -43,66 +44,74 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( settings.save_marks_in_cache, data_part->getColumns().size()) { - size_t columns_num = columns.size(); - - column_positions.resize(columns_num); - read_only_offsets.resize(columns_num); - auto name_and_type = columns.begin(); - for (size_t i = 0; i < columns_num; ++i, ++name_and_type) + try { - const auto & [name, type] = getColumnFromPart(*name_and_type); - auto position = data_part->getColumnPosition(name); + size_t columns_num = columns.size(); - if (!position && typeid_cast(type.get())) + column_positions.resize(columns_num); + read_only_offsets.resize(columns_num); + auto name_and_type = columns.begin(); + for (size_t i = 0; i < columns_num; ++i, ++name_and_type) { - /// If array of Nested column is missing in part, - /// we have to read its offsets if they exist. - position = findColumnForOffsets(name); - read_only_offsets[i] = (position != std::nullopt); + const auto & [name, type] = getColumnFromPart(*name_and_type); + auto position = data_part->getColumnPosition(name); + + if (!position && typeid_cast(type.get())) + { + /// If array of Nested column is missing in part, + /// we have to read its offsets if they exist. + position = findColumnForOffsets(name); + read_only_offsets[i] = (position != std::nullopt); + } + + column_positions[i] = std::move(position); } - column_positions[i] = std::move(position); + /// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data. + auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges); + if (!buffer_size || settings.max_read_buffer_size < buffer_size) + buffer_size = settings.max_read_buffer_size; + + const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION; + if (uncompressed_cache) + { + auto buffer = std::make_unique( + fullPath(data_part->volume->getDisk(), full_data_path), + [this, full_data_path, buffer_size]() + { + return data_part->volume->getDisk()->readFile( + full_data_path, + buffer_size, + 0, + settings.min_bytes_to_use_direct_io, + settings.min_bytes_to_use_mmap_io); + }, + uncompressed_cache); + + if (profile_callback_) + buffer->setProfileCallback(profile_callback_, clock_type_); + + cached_buffer = std::move(buffer); + data_buffer = cached_buffer.get(); + } + else + { + auto buffer = + std::make_unique( + data_part->volume->getDisk()->readFile( + full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io)); + + if (profile_callback_) + buffer->setProfileCallback(profile_callback_, clock_type_); + + non_cached_buffer = std::move(buffer); + data_buffer = non_cached_buffer.get(); + } } - - /// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data. - auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges); - if (!buffer_size || settings.max_read_buffer_size < buffer_size) - buffer_size = settings.max_read_buffer_size; - - const String full_data_path = data_part->getFullRelativePath() + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION; - if (uncompressed_cache) + catch (...) { - auto buffer = std::make_unique( - fullPath(data_part->volume->getDisk(), full_data_path), - [this, full_data_path, buffer_size]() - { - return data_part->volume->getDisk()->readFile( - full_data_path, - buffer_size, - 0, - settings.min_bytes_to_use_direct_io, - settings.min_bytes_to_use_mmap_io); - }, - uncompressed_cache); - - if (profile_callback_) - buffer->setProfileCallback(profile_callback_, clock_type_); - - cached_buffer = std::move(buffer); - data_buffer = cached_buffer.get(); - } - else - { - auto buffer = - std::make_unique( - data_part->volume->getDisk()->readFile( - full_data_path, buffer_size, 0, settings.min_bytes_to_use_direct_io, settings.min_bytes_to_use_mmap_io)); - - if (profile_callback_) - buffer->setProfileCallback(profile_callback_, clock_type_); - - non_cached_buffer = std::move(buffer); - data_buffer = non_cached_buffer.get(); + storage.reportBrokenPart(data_part->name); + throw; } } @@ -155,10 +164,18 @@ size_t MergeTreeReaderCompact::readRows(size_t from_mark, bool continue_reading, } catch (Exception & e) { + if (e.code() != ErrorCodes::MEMORY_LIMIT_EXCEEDED) + storage.reportBrokenPart(data_part->name); + /// Better diagnostics. e.addMessage("(while reading column " + name + ")"); throw; } + catch (...) + { + storage.reportBrokenPart(data_part->name); + throw; + } } ++from_mark;