diff --git a/dbms/include/DB/Common/ProfileEvents.h b/dbms/include/DB/Common/ProfileEvents.h index 4ebf38f2f27..4f521a42168 100644 --- a/dbms/include/DB/Common/ProfileEvents.h +++ b/dbms/include/DB/Common/ProfileEvents.h @@ -33,6 +33,8 @@ M(ObsoleteReplicatedParts, "Replicated parts rendered obsolete by fetches") \ M(ReplicatedPartMerges, "Replicated part merges") \ M(ReplicatedPartFetchesOfMerged, "Replicated part merges replaced with fetches") \ + M(ReplicatedPartChecks, "Replicated part checks") \ + M(ReplicatedPartChecksFailed, "Replicated part checks failed") \ \ M(END, "") diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index 288eb999dea..64396a5aeef 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -109,9 +109,10 @@ protected: if (!reader) { UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL; - reader.reset(new MergeTreeReader(path, columns, uncompressed_cache, storage, all_mark_ranges)); + reader.reset(new MergeTreeReader(path, owned_data_part->name, columns, uncompressed_cache, storage, all_mark_ranges)); if (prewhere_actions) - pre_reader.reset(new MergeTreeReader(path, pre_columns, uncompressed_cache, storage, all_mark_ranges)); + pre_reader.reset(new MergeTreeReader(path, owned_data_part->name, pre_columns, uncompressed_cache, storage, + all_mark_ranges)); } if (prewhere_actions) @@ -266,9 +267,9 @@ protected: if (remaining_mark_ranges.empty()) { /** Закрываем файлы (ещё до уничтожения объекта). - * Чтобы при создании многих источников, но одновременном чтении только из нескольких, - * буферы не висели в памяти. - */ + * Чтобы при создании многих источников, но одновременном чтении только из нескольких, + * буферы не висели в памяти. + */ reader.reset(); pre_reader.reset(); part_columns_lock.reset(); diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 8d019c9b31d..db340c80ac2 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -118,6 +118,9 @@ struct MergeTreeSettings class MergeTreeData : public ITableDeclaration { public: + /// Функция, которую можно вызвать, если есть подозрение, что данные куска испорчены. + typedef std::function BrokenPartCallback; + /// Описание куска с данными. struct DataPart : public ActiveDataPartSet::Part { @@ -531,6 +534,8 @@ public: Aggregating, }; + static void doNothing(const String & name) {} + /** Подцепить таблицу с соответствующим именем, по соответствующему пути (с / на конце), * (корректность имён и путей не проверяется) * состоящую из указанных столбцов. @@ -550,8 +555,8 @@ public: const String & sign_column_, const MergeTreeSettings & settings_, const String & log_name_, - bool require_part_metadata_ - ); + bool require_part_metadata_, + BrokenPartCallback broken_part_callback_ = &MergeTreeData::doNothing); std::string getModePrefix() const; @@ -651,6 +656,12 @@ public: /// Нужно вызывать под залоченным lockStructureForAlter(). void setColumnsList(const NamesAndTypesList & new_columns) { columns = new NamesAndTypesList(new_columns); } + /// Нужно вызвать, если есть подозрение, что данные куска испорчены. + void reportBrokenPart(const String & name) + { + broken_part_callback(name); + } + ExpressionActionsPtr getPrimaryExpression() const { return primary_expr; } SortDescription getSortDescription() const { return sort_descr; } @@ -679,6 +690,8 @@ private: NamesAndTypesListPtr columns; + BrokenPartCallback broken_part_callback; + String log_name; Logger * log; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index 7a6b936b8e9..35ce03059a7 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -37,12 +37,20 @@ class MergeTreeReader typedef std::map OffsetColumns; public: - MergeTreeReader(const String & path_, /// Путь к куску + MergeTreeReader(const String & path_, const String & part_name_, /// Путь к куску const NamesAndTypesList & columns_, bool use_uncompressed_cache_, MergeTreeData & storage_, const MarkRanges & all_mark_ranges) - : path(path_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_) + : path(path_), part_name(part_name_), columns(columns_), use_uncompressed_cache(use_uncompressed_cache_), storage(storage_) { - for (const NameAndTypePair & column : columns) - addStream(column.name, *column.type, all_mark_ranges); + try + { + for (const NameAndTypePair & column : columns) + addStream(column.name, *column.type, all_mark_ranges); + } + catch (...) + { + storage.reportBrokenPart(part_name); + throw; + } } /** Если столбцов нет в блоке, добавляет их, если есть - добавляет прочитанные значения к ним в конец. @@ -109,10 +117,19 @@ public: } catch (const Exception & e) { + if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING) + storage.reportBrokenPart(part_name); + /// Более хорошая диагностика. throw Exception(e.message() + " (while reading from part " + path + " from mark " + toString(from_mark) + " to " + toString(to_mark) + ")", e.code()); } + catch (...) + { + storage.reportBrokenPart(part_name); + + throw; + } } /// Заполняет столбцы, которых нет в блоке, значениями по умолчанию. @@ -291,6 +308,7 @@ private: typedef std::map > FileStreams; String path; + String part_name; FileStreams streams; NamesAndTypesList columns; bool use_uncompressed_cache; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 9e1b6de2c06..9d78dedd5a8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -27,15 +27,17 @@ MergeTreeData::MergeTreeData( const String & sign_column_, const MergeTreeSettings & settings_, const String & log_name_, - bool require_part_metadata_) + bool require_part_metadata_, + BrokenPartCallback broken_part_callback_) : context(context_), date_column_name(date_column_name_), sampling_expression(sampling_expression_), index_granularity(index_granularity_), mode(mode_), sign_column(sign_column_), settings(settings_), primary_expr_ast(primary_expr_ast_->clone()), require_part_metadata(require_part_metadata_), - full_path(full_path_), columns(columns_), log_name(log_name_), - log(&Logger::get(log_name + " (Data)")) + full_path(full_path_), columns(columns_), + broken_part_callback(broken_part_callback_), + log_name(log_name_), log(&Logger::get(log_name + " (Data)")) { /// создаём директорию, если её нет Poco::File(full_path).createDirectories(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 9786eeed435..239c67bc6c2 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -36,7 +36,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_), replica_name(replica_name_), data( full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_, - index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true), + index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true, + std::bind(&StorageReplicatedMergeTree::enqueuePartForCheck, this, std::placeholders::_1)), reader(data), writer(data), merger(data), fetcher(data), log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")), shutdown_event(false), permanent_shutdown_event(false) @@ -894,6 +895,10 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro return false; } } + + /// Если ни у кого нет куска, и в очереди нет слияний с его участием, проверим, есть ли у кого-то покрывающий его. + if (replica.empty()) + enqueuePartForCheck(entry.new_part_name); } catch (...) { @@ -1315,6 +1320,7 @@ void StorageReplicatedMergeTree::partCheckThread() } LOG_WARNING(log, "Checking part " << part_name); + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks); auto part = data.getContainingPart(part_name); String part_path = replica_path + "/parts/" + part_name; @@ -1327,6 +1333,7 @@ void StorageReplicatedMergeTree::partCheckThread() { LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. " "Removing from ZooKeeper and queueing a fetch."); + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); removePartAndEnqueueFetch(part_name); } @@ -1362,6 +1369,7 @@ void StorageReplicatedMergeTree::partCheckThread() */ LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. " << "There might or might not be a data loss."); + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); /// Если ни у кого нет такого куска, удалим его из нашей очереди и добавим его в block_numbers. @@ -1432,7 +1440,8 @@ void StorageReplicatedMergeTree::partCheckThread() { tryLogCurrentException(__PRETTY_FUNCTION__); - LOG_INFO(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch."); + LOG_ERROR(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch."); + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); removePartAndEnqueueFetch(part_name); @@ -1443,6 +1452,8 @@ void StorageReplicatedMergeTree::partCheckThread() /// Если куска нет в ZooKeeper, удалим его локально. else { + ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); + /// Если этот кусок еще и получен в результате слияния, это уже чересчур странно. if (part->left != part->right) {